IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 线程池或协程爬取高匿代理IP到MongoDB -> 正文阅读

[网络协议]线程池或协程爬取高匿代理IP到MongoDB

完整代码如下:

import logging
import random
import re
import time
from concurrent.futures import ThreadPoolExecutor   # 线程池
from use_proxy import send_request_by_proxy  # 如果没有该模块,打开本文最后链接,保存为文件,并命名为use_proxy即可
import gevent  # 协程
import requests
from pymongo import MongoClient
from fake_useragent import UserAgent
from lxml import etree


class Spider(object):
    """使用线程池ThreadPoolExecutor或协程gevent爬取代理IP并保存到MongoDB
        注意:1、没有大量代理IP之前不建议使用线程池爬取,请求过快会导致更多异常,建议先使用协程进行爬取
             2、高匿代理IP中包括HTTP的代理IP和HTTPS的代理IP,测试时需要分开测试
    """

    def __init__(self):
        self.col = None
        self.is_http = None
        self.is_pool = None
        self.num = 0  # 统计数据库新增的IP数量

    def send_request(self, url):
        """发送请求"""
        headers = {'User-Agent': UserAgent().random}
        try:
            response = requests.get(url, headers=headers, timeout=9)
        except:
            return send_request_by_proxy(url)  # 使用代理IP发送请求
        else:
            if response.status_code == 200:
                return response
            else:
                return send_request_by_proxy(url)  # 使用代理IP发送请求

    def parse(self, page_url):
        """解析每页数据"""
        try:
            response = self.send_request(page_url).content  # 发送请求
        except:
            logging.warning('异常页面,需要另外下载:' + page_url)  # 提示异常页码地址
        else:
            if self.is_pool:  # 选择线程池时执行
                time.sleep(random.choice([1.5, 1.8]))  # 随机延迟
            else:  # 选择协程时执行
                time.sleep(random.choice([0.8, 1]))  # 随机延迟

            print('爬取网页并保存数据:', page_url)
            html = etree.HTML(response)
            tr_list = html.xpath(r'//table[@class="fl-table"]/tbody/tr')  # 解析tr标签
            for tr in tr_list:
                item = {}
                item['ip'] = tr.xpath(r'./td[1]/text()')[0]  # 代理IP
                item['transparency'] = tr.xpath(r'./td[3]/text()')[0]  # 高匿代理
                item['agreement'] = tr.xpath(r'./td[2]/text()')[0][:-2]  # 协议
                if item:
                    self.col.insert_one(item)  # 添加到数据库
                    self.num += 1

    def remove_repetition(self):
        """去除数据库中重复的ip"""
        result = self.col.find()  # 查询数据库所有数据
        data = []
        for doc in result:
            data.append(doc['ip'])  # 把所有IP添加到一个列表中
        data_remove = list(set(data))  # 去重
        len_data = len(data)  # 原IP列表长度
        len_data_remove = len(data_remove)   # 去重后IP列表长度
        if len_data_remove < len_data:  # 为真时,说明有重复IP
            for ip in data_remove:
                while data.count(ip) > 1:  # 通过循环处理重复IP
                    logging.warning('存在重复ip:' + ip)  # 提示重复IP
                    data.remove(ip)  # 删除列表中的重复IP
                    self.col.delete_one({'ip': ip})  # 删除重复IP在数据库中的文档
        else:
            pass
        print('数据库新增IP数量{}个,删除重复IP数量{}个,剩余IP数量{}个'.format(str(self.num), len_data - len_data_remove, len_data_remove))

    def start_crawl(self):
        """执行爬虫,并添加数据到数据库"""
        client = MongoClient()
        db = client.proxy  # 数据库
        self.col = db.高匿代理  # 集合

        # 获取页码范围内的地址
        page_urls = []  # 所有页码地址
        for page in range(1, 359):  # 设置页码范围,1-358页,其他页码的页面是空
            page_url = f'http://www.xiladaili.com/gaoni/{page}/'
            page_urls.append(page_url)
            
        # 功能选择:使用线程池还是协程
        print('1、有大量代理IP,推荐使用线程池爬取数据')
        print('2、无大量代理IP,推荐使用协程爬取数据')
        select_num = input('请输入对应操作的编号:')
        if select_num == '1':
            self.is_pool = True  # 是线程池
            # 执行线程池,把爬取的数据添加到MongoDB
            pool = ThreadPoolExecutor(3)  # 创建线程池,设置线程数
            pool.map(self.parse, page_urls)  # 执行线程池
            pool.shutdown()  # 等待所有任务完成
        elif select_num == '2':
            self.is_pool = False  # 是协程
            # 执行协程
            job_list = []  # 保存所有协程任务
            for page_url in page_urls:
                job = gevent.spawn(self.parse, page_url)  # 执行一个协程任务
                job_list.append(job)  # 把每个协程任务放进一个列表中保存
            gevent.joinall(job_list)  # 等待所有协程结束
        else:
            logging.warning('输入错误!')
            return

        # 去除数据库集合中重复的ip
        self.remove_repetition()


def main_1():
    """执行爬虫"""
    spider = Spider()
    spider.start_crawl()


class TestIP(object):
    """测试MongoDB中代理IP的有效性"""

    def __init__(self):
        self.col_ip = None  # 数据库集合
        self.col_valid_ip = None  # 数据库集合
        self.is_http = None
        self.test_url = None  # 测试url

    def send_request_by_proxy(self, doc):
        """使用代理ip发送请求"""
        ip, transparency = doc['ip'], doc['transparency']  # 分别是IP和IP透明度
        headers = {'User-Agent': UserAgent().random}  # 随机请求头
        proxy = {'http': ip, 'https': ip}  # 代理
        try:
            response = requests.get(self.test_url, headers=headers, proxies=proxy, timeout=5)  # 设置一定的超时
        except:
            logging.warning('异常代理IP:%s' % ip)  # 提示异常代理IP
        else:
            # 判断响应的状态码是否为200,并且是正确的响应数据
            # 这里是查找页面中的固有字符串:'User-Agent'
            if response.status_code == 200 and re.findall('User-Agent', response.text):
                print('有效代理IP:', ip)  # 提示有效代理
                item = {'ip': ip, 'transparency': transparency, 'status': 'true', 'pass_count': 0, 'fail_count': 0}
                self.col_valid_ip.insert_one(item)  # 新增文档
            else:
                logging.warning('其他异常代理IP:%s' % ip)  # 提示异常IP

    def start_test(self):
        """执行测试"""
        # 功能选择:测试http或https的IP有效性
        print('1、测试http的代理IP有效性')
        print('2、测试https的代理IP有效性')
        select_num = input('请输入对应操作的编号:')
        client = MongoClient()
        db = client['proxy']  # 数据库
        self.col_ip = db.高匿代理  # http的IP集合
        if select_num == '1':
            self.is_http = True  # 是http协议
            self.col_valid_ip = db.有效http代理  # http的有效IP集合
            self.test_url = 'http://httpbin.org/get'  # 测试http的地址
        elif select_num == '2':
            self.is_http = False  # 是https协议
            self.col_valid_ip = db.有效https代理  # https的有效IP集合
            self.test_url = 'https://httpbin.org/get'  # 测试https的地址
        else:
            logging.warning('输入错误!')
            return

        # 判断数据库集合中是否有数据,没有数据执行爬虫
        while True:
            result = list(self.col_ip.find())  # 查找所有数据
            if result:  # 有数据
                break  # 终止循环
            else:  # 无数据
                logging.warning('数据库没有数据,请先进行爬虫!')
                time.sleep(1)
                main_1()  # 执行爬虫

        # 获取需要进行测试IP
        data = []
        for doc in result:
            if self.is_http:  # 测试http的IP时执行
                if 'S' not in doc['agreement']:
                    data.append(doc)
            else:   # 测试https的IP执行
                if 'S' in doc['agreement']:
                    data.append(doc)

        # 使用线程池进行测试
        pool = ThreadPoolExecutor(6)  # 创建线程池,设置线程数
        pool.map(self.send_request_by_proxy, data)
        pool.shutdown()  # 等待所有任务完成

        # 删除有效代理IP集合中重复的ip
        result = self.col_valid_ip.find()  # 查询数据库所有数据
        data = []
        for doc in result:  # 遍历数据列表
            data.append(doc['ip'])  # 把所有IP添加到列表中
        data_remove = list(set(data))  # 去重
        if len(data_remove) < len(data):
            for ip in data_remove:
                while data.count(ip) > 1:  # 处理重复的IP
                    logging.warning('存在重复ip:' + ip)  # 提示重复IP
                    data.remove(ip)  # 删除列表中的重复IP
                    self.col_valid_ip.delete_one({'ip': ip})  # 删除对应的文档


def main_2():
    """执行测试"""
    test = TestIP()
    test.start_test()


def exe():
    """主程序"""
    print('1、爬取高匿代理IP到MongoDB')
    print('2、测试代理IP有效性')
    select_num = input('输入对应操作的编号:')
    if select_num == '1':
        main_1()
    elif select_num == '2':
        main_2()
    else:
        logging.warning('输入错误!')


if __name__ == '__main__':
    exe()

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-12-01 18:03:38  更:2021-12-01 18:04:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 8:26:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码