完整代码如下:
import logging
import random
import re
import time
from concurrent.futures import ThreadPoolExecutor # 线程池
from use_proxy import send_request_by_proxy # 如果没有该模块,打开本文最后链接,保存为文件,并命名为use_proxy即可
import gevent # 协程
import requests
from pymongo import MongoClient
from fake_useragent import UserAgent
from lxml import etree
class Spider(object):
"""使用线程池ThreadPoolExecutor或协程gevent爬取代理IP并保存到MongoDB
注意:1、没有大量代理IP之前不建议使用线程池爬取,请求过快会导致更多异常,建议先使用协程进行爬取
2、高匿代理IP中包括HTTP的代理IP和HTTPS的代理IP,测试时需要分开测试
"""
def __init__(self):
self.col = None
self.is_http = None
self.is_pool = None
self.num = 0 # 统计数据库新增的IP数量
def send_request(self, url):
"""发送请求"""
headers = {'User-Agent': UserAgent().random}
try:
response = requests.get(url, headers=headers, timeout=9)
except:
return send_request_by_proxy(url) # 使用代理IP发送请求
else:
if response.status_code == 200:
return response
else:
return send_request_by_proxy(url) # 使用代理IP发送请求
def parse(self, page_url):
"""解析每页数据"""
try:
response = self.send_request(page_url).content # 发送请求
except:
logging.warning('异常页面,需要另外下载:' + page_url) # 提示异常页码地址
else:
if self.is_pool: # 选择线程池时执行
time.sleep(random.choice([1.5, 1.8])) # 随机延迟
else: # 选择协程时执行
time.sleep(random.choice([0.8, 1])) # 随机延迟
print('爬取网页并保存数据:', page_url)
html = etree.HTML(response)
tr_list = html.xpath(r'//table[@class="fl-table"]/tbody/tr') # 解析tr标签
for tr in tr_list:
item = {}
item['ip'] = tr.xpath(r'./td[1]/text()')[0] # 代理IP
item['transparency'] = tr.xpath(r'./td[3]/text()')[0] # 高匿代理
item['agreement'] = tr.xpath(r'./td[2]/text()')[0][:-2] # 协议
if item:
self.col.insert_one(item) # 添加到数据库
self.num += 1
def remove_repetition(self):
"""去除数据库中重复的ip"""
result = self.col.find() # 查询数据库所有数据
data = []
for doc in result:
data.append(doc['ip']) # 把所有IP添加到一个列表中
data_remove = list(set(data)) # 去重
len_data = len(data) # 原IP列表长度
len_data_remove = len(data_remove) # 去重后IP列表长度
if len_data_remove < len_data: # 为真时,说明有重复IP
for ip in data_remove:
while data.count(ip) > 1: # 通过循环处理重复IP
logging.warning('存在重复ip:' + ip) # 提示重复IP
data.remove(ip) # 删除列表中的重复IP
self.col.delete_one({'ip': ip}) # 删除重复IP在数据库中的文档
else:
pass
print('数据库新增IP数量{}个,删除重复IP数量{}个,剩余IP数量{}个'.format(str(self.num), len_data - len_data_remove, len_data_remove))
def start_crawl(self):
"""执行爬虫,并添加数据到数据库"""
client = MongoClient()
db = client.proxy # 数据库
self.col = db.高匿代理 # 集合
# 获取页码范围内的地址
page_urls = [] # 所有页码地址
for page in range(1, 359): # 设置页码范围,1-358页,其他页码的页面是空
page_url = f'http://www.xiladaili.com/gaoni/{page}/'
page_urls.append(page_url)
# 功能选择:使用线程池还是协程
print('1、有大量代理IP,推荐使用线程池爬取数据')
print('2、无大量代理IP,推荐使用协程爬取数据')
select_num = input('请输入对应操作的编号:')
if select_num == '1':
self.is_pool = True # 是线程池
# 执行线程池,把爬取的数据添加到MongoDB
pool = ThreadPoolExecutor(3) # 创建线程池,设置线程数
pool.map(self.parse, page_urls) # 执行线程池
pool.shutdown() # 等待所有任务完成
elif select_num == '2':
self.is_pool = False # 是协程
# 执行协程
job_list = [] # 保存所有协程任务
for page_url in page_urls:
job = gevent.spawn(self.parse, page_url) # 执行一个协程任务
job_list.append(job) # 把每个协程任务放进一个列表中保存
gevent.joinall(job_list) # 等待所有协程结束
else:
logging.warning('输入错误!')
return
# 去除数据库集合中重复的ip
self.remove_repetition()
def main_1():
"""执行爬虫"""
spider = Spider()
spider.start_crawl()
class TestIP(object):
"""测试MongoDB中代理IP的有效性"""
def __init__(self):
self.col_ip = None # 数据库集合
self.col_valid_ip = None # 数据库集合
self.is_http = None
self.test_url = None # 测试url
def send_request_by_proxy(self, doc):
"""使用代理ip发送请求"""
ip, transparency = doc['ip'], doc['transparency'] # 分别是IP和IP透明度
headers = {'User-Agent': UserAgent().random} # 随机请求头
proxy = {'http': ip, 'https': ip} # 代理
try:
response = requests.get(self.test_url, headers=headers, proxies=proxy, timeout=5) # 设置一定的超时
except:
logging.warning('异常代理IP:%s' % ip) # 提示异常代理IP
else:
# 判断响应的状态码是否为200,并且是正确的响应数据
# 这里是查找页面中的固有字符串:'User-Agent'
if response.status_code == 200 and re.findall('User-Agent', response.text):
print('有效代理IP:', ip) # 提示有效代理
item = {'ip': ip, 'transparency': transparency, 'status': 'true', 'pass_count': 0, 'fail_count': 0}
self.col_valid_ip.insert_one(item) # 新增文档
else:
logging.warning('其他异常代理IP:%s' % ip) # 提示异常IP
def start_test(self):
"""执行测试"""
# 功能选择:测试http或https的IP有效性
print('1、测试http的代理IP有效性')
print('2、测试https的代理IP有效性')
select_num = input('请输入对应操作的编号:')
client = MongoClient()
db = client['proxy'] # 数据库
self.col_ip = db.高匿代理 # http的IP集合
if select_num == '1':
self.is_http = True # 是http协议
self.col_valid_ip = db.有效http代理 # http的有效IP集合
self.test_url = 'http://httpbin.org/get' # 测试http的地址
elif select_num == '2':
self.is_http = False # 是https协议
self.col_valid_ip = db.有效https代理 # https的有效IP集合
self.test_url = 'https://httpbin.org/get' # 测试https的地址
else:
logging.warning('输入错误!')
return
# 判断数据库集合中是否有数据,没有数据执行爬虫
while True:
result = list(self.col_ip.find()) # 查找所有数据
if result: # 有数据
break # 终止循环
else: # 无数据
logging.warning('数据库没有数据,请先进行爬虫!')
time.sleep(1)
main_1() # 执行爬虫
# 获取需要进行测试IP
data = []
for doc in result:
if self.is_http: # 测试http的IP时执行
if 'S' not in doc['agreement']:
data.append(doc)
else: # 测试https的IP执行
if 'S' in doc['agreement']:
data.append(doc)
# 使用线程池进行测试
pool = ThreadPoolExecutor(6) # 创建线程池,设置线程数
pool.map(self.send_request_by_proxy, data)
pool.shutdown() # 等待所有任务完成
# 删除有效代理IP集合中重复的ip
result = self.col_valid_ip.find() # 查询数据库所有数据
data = []
for doc in result: # 遍历数据列表
data.append(doc['ip']) # 把所有IP添加到列表中
data_remove = list(set(data)) # 去重
if len(data_remove) < len(data):
for ip in data_remove:
while data.count(ip) > 1: # 处理重复的IP
logging.warning('存在重复ip:' + ip) # 提示重复IP
data.remove(ip) # 删除列表中的重复IP
self.col_valid_ip.delete_one({'ip': ip}) # 删除对应的文档
def main_2():
"""执行测试"""
test = TestIP()
test.start_test()
def exe():
"""主程序"""
print('1、爬取高匿代理IP到MongoDB')
print('2、测试代理IP有效性')
select_num = input('输入对应操作的编号:')
if select_num == '1':
main_1()
elif select_num == '2':
main_2()
else:
logging.warning('输入错误!')
if __name__ == '__main__':
exe()
|