import requests
import json
import random
import pymysql
import threading
#自行查找user agents
uas = []
ua = random.choice(uas)
#自行查找代理 格式 'ip:port'
proxies_list = []
def get_proxies():
#设置需要验证密码的ip代理
proxy = f'账号:密码@{random.choice(proxies_list)}'
#只有http本机ip被封了而且切换ip也被拦截,但是添加了https ok了
proxies = {
'http':'http://'+ proxy,
'https':'http://'+ proxy,
}
print(proxies)
return proxies
这里获取列表,里面包含所有user的主页的url?
urls = []
for m in range(5000,5300):
for i in range(m*100,(m+1)*100):
url = 'https://space.bilibili.com/' + str(i)
urls.append(url)
接下来定义获取user信息的函数?
def get_user_info(url):
mid = url.replace('https://space.bilibili.com/', '')
head = {
'User-Agent': ua,
'Referer': url + '?from=search&seid=' + str(random.randint(10000, 50000))
}
while True:
n = 1
try:
proxies = get_proxies()
response = requests.get('https://api.bilibili.com/x/space/acc/info?mid=%s&jsonp=jsonp'\
%mid,headers=head,proxies=proxies,timeout=5)
if response.status_code == 200:
break
else:
pass
except Exception as e:
print('Proxy Error', e)
try:
proxies = get_proxies()
response = requests.get('https://api.bilibili.com/x/space/acc/info?mid=%s&jsonp=jsonp'\
%mid,headers=head,proxies=proxies,timeout=5)
if response.status_code == 200:
break
except Exception as e:
print('Proxy Error 2',e)
接上部分,接下来对获取的数据进行处理,这里通过json提取出mid、name、sex以及level。当然还有其他信息也可以自行提取。?
收集的数据存入mysql。需要提前创建database和table。
content = response.text
semaphore.release()
print(content)
try:
info_dict = json.loads(content)
status_code = info_dict['code'] if 'code' in info_dict.keys() else False
if status_code == 0:
if 'data' in info_dict.keys():
user_data = info_dict['data']
mid = user_data['mid']
name = user_data['name']
sex = user_data['sex']
level = user_data['level']
else:
print('no data')
try:
conn = pymysql.connect(host='localhost',port=3306,user='root',password='123123123',database='bilibili',charset='utf8')
cursor = conn.cursor()
cursor.execute("insert into bilibili_user(mid,name,sex,level) values (%s,'%s','%s',%s)" % (mid,name,sex,level))
conn.commit()
except Exception as e:
print('mysql Error',e)
else:
print('error: ', url)
pass
except Exception as e:
print(e)
主函数。这里试用多线程模块threading,boundedsemaphore控制并发数量。
if __name__ == '__main__':
semaphore = threading.BoundedSemaphore(5)
for url in urls:
semaphore.acquire()
t1 = threading.Thread(target=get_user_info,args=(url,))
t1.start()
成果展示
?
?
?
?
?
?
?
|