IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> python多线程 -> 正文阅读

[Python知识库]python多线程

1、介绍

1、Python并发编程简介

image-20211025145327009 image-20211025145412412 image-20211025145501099

2、选择多线程多进程多协程

Python并发变成有三种方式:多线程Thread、多进程Process、多协程Coroutine。

image-20211025145848883 image-20211025145940261 image-20211025150131523

3、全局解释器锁GIL

image-20211025150246179 image-20211025150404636

由于GIL的存在,即使电脑上有多核CPU,单个时刻也只能使用一个,相比于并发加速的C++/Java所以慢。

image-20211025150542923 image-20211025150717763

2、多线程

1、使用多线程,Python爬虫加速10倍

1、创建多线程的方式

image-20211025150910699

join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后。

2、代码

blog_spider.py

# @Time : 2021/10/25 15:13
# @Author : Li Kunlun
# @Description : 对博客程序进行爬虫

import requests

urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)]


def craw(url):
    r = requests.get(url)
    print(url, len(r.text))


if __name__ == '__main__':
    # https://www.cnblogs.com/#p1 70243
    # craw(urls[0])
    pass

python01_multi_thread_craw.py

# @Time : 2021/10/25 15:16
# @Author : Li Kunlun
# @Description : 多线程爬虫
import chapter08.spider.blog_spider as blog_spider
import threading
import time


def single_thread():
    print("single_thread begin")
    for url in blog_spider.urls:
        blog_spider.craw(url)
    print("single_thread end")


def multi_thread():
    print("multi_thread begin")
    threads = []
    for url in blog_spider.urls:
        # 创建线程
        threads.append(threading.Thread(target=blog_spider.craw, args=(url,)))

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    print("multi_thread end")


if __name__ == '__main__':
    start = time.time()
    single_thread()
    end = time.time()
    # single thread cost: 8.11093521118164 seconds
    print("single thread cost:", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    # multi thread cost: 0.4513895511627197 seconds
    print("multi thread cost:", end - start, "seconds")
    pass

通过控制台进行简单计算:

image-20211025152741438

2、生产者消费者爬虫模式

image-20211025152923809 image-20211025153010962

blog_spider.py函数

# @Time : 2021/10/25 15:13
# @Author : Li Kunlun
# @Description : 对博客程序进行爬虫

import requests
from bs4 import BeautifulSoup

urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)]


# 生产者
def craw(url):
    r = requests.get(url)
    return r.text


# 消费者
def parse(html):
    # 对html进行解析,拿到title内容
    soup = BeautifulSoup(html, "html.parser")
    # "a"表示 a标签
    links = soup.find_all("a", class_="post-item-title")
    # 返回一个元组, href和标签
    return [(link["href"], link.get_text()) for link in links]
    pass


if __name__ == '__main__':
    # 解析出该网页中的href和标签
    for result in parse(craw(urls[2])):
        print(result)
    pass

python01_producer_consumer_spider.py

# @Time : 2021/10/25 15:16
# @Author : Li Kunlun
# @Description : 生产者和消费者模块
import chapter08.spider02.blog_spider as blog_spider
import threading
import time
import random
import queue


def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = blog_spider.craw(url)
        # 将下载的html结果放到html_queue队列中
        html_queue.put(html)
        print(threading.current_thread().name, f"craw {url}", f"url_queue.size=", url_queue.qsize())
        # 随机休息1-2秒
        time.sleep(random.randint(1, 2))


def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()
        # 获取对象之后进行解析
        results = blog_spider.parse(html)
        for result in results:
            # 文件对象fout
            fout.write(str(result) + "\n")
        print(threading.current_thread().name, f"results.size=", len(results), "html_queue.size=", html_queue.qsize())
        # 随机休息1-2秒
        time.sleep(random.randint(1, 2))


if __name__ == '__main__':
    url_queue = queue.Queue()
    html_queue = queue.Queue()
    for url in blog_spider.urls:
        url_queue.put(url)

    for idx in range(3):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
        t.start()

    fout = open("data.txt", "w")

    for idx in range(2):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
        t.start()

总结:对于一个复杂的爬虫程序,可以将其分成很多个模块,每个模块又可以启动不同的线程组进行处理,线程之间通过队列Queue来进行交互。

3、线程安全问题以及解决方案

image-20211025161723443 image-20211025194714081

4、线程池ThreadPoolExecutor

image-20211025200438121 image-20211025200719678 image-20211025200740119

有两种创建线程池的方式。

代码:

# @Time : 2021/10/25 20:29
# @Author : Li Kunlun
# @Description : 线程池测试
import concurrent.futures
import chapter08.py02_spider02.blog_spider as blog_spider

# craw
with concurrent.futures.ThreadPoolExecutor() as pool:
    htmls = pool.map(blog_spider.craw, blog_spider.urls)
    htmls = list(zip(blog_spider.urls, htmls))  # 构成元组
    for url, html in htmls:
        # 按照顺序执行
        # https://www.cnblogs.com/#p1 69918
        # https://www.cnblogs.com/#p2 69918
        # https://www.cnblogs.com/#p3 69918
        # ...
        print(url, len(html))
print("craw over")

# parse
with concurrent.futures.ThreadPoolExecutor() as pool:
    futures = {}  # 字典
    for url, html in htmls:
        future = pool.submit(blog_spider.parse, html)
        futures[future] = url
    # 方式1
    # for future, url in futures.items():
    #     # 按照顺序执行
    #     print(url, future.result())

    # 方式2
    for future in concurrent.futures.as_completed(futures):
        url = futures[future]
        # 不按顺序执行
        print(url, future.result())

5、使用多线程在Web服务器中实现加速

image-20211025205250033 image-20211025205327596

python02_web.py:

# @Time : 2021/10/25 20:52
# @Author : Li Kunlun
# @Description : 使用线程池在web服务中实现加速

import flask
import json
import time

app = flask.Flask(__name__)


def read_file():
    time.sleep(0.1)
    return "read_file result"


def read_db():
    time.sleep(0.2)
    return "read_db result"


def read_api():
    time.sleep(0.3)
    return "read_api result"


@app.route("/")
def index():
    result_file = read_file()
    result_db = read_db()
    result_api = read_api()
    return json.dumps({
        "result_file": result_file,
        "result_db": result_db,
        "result_api": result_api
    })


if __name__ == '__main__':
    app.run()

启动后,控制台输出:

image-20211025210719002

浏览器访问该网址:

image-20211025210743273

postman测试查看网络调用消耗的时间:

image-20211025210826868

对代码进行下面的改造(添加线程池):

# @Time : 2021/10/25 20:52
# @Author : Li Kunlun
# @Description : 使用线程池在web服务中实现加速

import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor

app = flask.Flask(__name__)
# 初始化全局线程池
pool = ThreadPoolExecutor()


def read_file():
    time.sleep(0.1)  # 相当于io操作
    return "read_file result"


def read_db():
    time.sleep(0.2)
    return "read_db result"


def read_api():
    time.sleep(0.3)
    return "read_api result"


@app.route("/")
def index():
    # 返回的不再是字符串,而是三个future对象
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)
    return json.dumps({
        "result_file": result_file.result(),
        "result_db": result_db.result(),
        "result_api": result_api.result()
    })


if __name__ == '__main__':
    app.run()

此时,程序整个运行时间以最长的io操作为准,实现了加速:

image-20211025211340842

相当于3个io操作并发运行,以最长的时间为准。

参考网址:

https://blog.csdn.net/han2529386161/article/details/103592862
https://blog.csdn.net/daijiguo/article/details/78042309
https://blog.csdn.net/qq_35869630/article/details/105876923?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522163516814816780271565862%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=163516814816780271565862&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-5-105876923.pc_search_result_control_group&utm_term=python%E5%A4%9A%E7%BA%BF%E7%A8%8B%E9%81%87%E5%88%B0io%E6%97%B6%E9%87%8A%E6%94%BEgil+%E4%BC%9A%E5%8A%A0%E5%BF%AB%E6%89%A7%E8%A1%8C%E9%80%9F%E5%BA%A6&spm=1018.2226.3001.4187

3、多进程

1、多进程模块加速程序运行

image-20211025212511134 image-20211025212710171 image-20211025212841712

python01_thread_process_cpu_bound.py:

# @Time : 2021/10/25 21:29
# @Author : Li Kunlun
# @Description : 单线程、多线程、多进程对cpu密集计算速度比较
import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

prime = [112272535095293] * 100


# 判断是否为素数(cpu计算,没有涉及io)
def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


# 单线程
def single_thread():
    for number in prime:
        is_prime(number)


# 多线程
def multi_thread():
    with ThreadPoolExecutor() as pool:
        pool.map(is_prime, prime)


# 多进程
def multi_process():
    with ProcessPoolExecutor() as pool:
        pool.map(is_prime, prime)


if __name__ == '__main__':
    start = time.time()
    single_thread()
    end = time.time()
    print(f"single_thread cost:", end - start, "seconds")

    start = time.time()
    multi_thread()
    end = time.time()
    print(f"multi_thread cost:", end - start, "seconds")

    start = time.time()
    multi_process()
    end = time.time()
    print(f"multi_process cost:", end - start, "seconds")
    """
    程序执行结果:
        single_thread cost: 42.00479984283447 seconds
        multi_thread cost: 42.01970911026001 seconds
        multi_process cost: 9.357584953308105 seconds
    """

2、在Flask服务中使用多进程池加速程序运行

# @Time : 2021/10/25 20:52
# @Author : Li Kunlun
# @Description : 使用进程池在web服务中实现加速

import flask
import json
import time
from concurrent.futures import ProcessPoolExecutor
import math
import json

app = flask.Flask(__name__)


# 判断是否为素数(cpu计算,没有涉及io)
def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


# numbers表示一个参数
@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
    number_list = [int(x) for x in numbers.split(",")]
    results = process_pool.map(is_prime, number_list)
    return json.dumps(dict(zip(number_list, results)))


if __name__ == '__main__':
    # 初始化全局进程池-放到这个位置才能够正常使用
    process_pool = ProcessPoolExecutor()
    app.run()

程序执行结果:

image-20211025215713656

4、莫烦Python

1、多线程

1、添加线程

# @Time : 2021/10/26 10:39
# @Author : Li Kunlun
# @Description : 添加线程


import threading


def show():
    # 获取已激活的线程数,输出结果1
    print(threading.active_count())
    # 查看所有线程信息,输出的结果是一个<_MainThread(...)>带多个<Thread(...)>
    # [<_MainThread(MainThread, started 59304)>]
    # <_MainThread(MainThread, started 59304)>
    print(threading.enumerate())  # see the thread list
    print(threading.current_thread())


def thread_job():
    print('This is a thread of %s' % threading.current_thread())


def main():
    # 添加线程,threading.Thread()接收参数target代表这个线程要完成的任务,需自行定义
    thread = threading.Thread(target=thread_job, )
    thread.start()
    show()


if __name__ == '__main__':
    main()

2、join功能

不添加join()的结果:

# @Time : 2021/10/26 10:50
# @Author : Li Kunlun
# @Description : 没有添加join的结果

import threading
import time


def thread_job():
    print("T1 start\n")
    for i in range(10):
        time.sleep(0.1)  # 任务间隔0.1s
    print("T1 finish\n")


added_thread = threading.Thread(target=thread_job, name='T1')
added_thread.start()
print("all done\n")
"""
预期输出结果:
    T1 start
    T1 finish
    all done
实际结果:
    T1 start
    all done
    T1 finish
"""

线程任务还未完成便输出all done。如果要遵循顺序,可以在启动线程后对它调用join

使用join()函数,按照顺序正常输出。

# @Time : 2021/10/26 10:48
# @Author : Li Kunlun
# @Description :join功能测试

# https://blog.csdn.net/nanhuaibeian/article/details/100160953
import threading
import time


def T1_job():
    print('T1 start\n')
    for i in range(10):
        time.sleep(0.1)
    print('T1 finish\n')


# 任务量相对于T1来说较小
def T2_job():
    print('T2 start\n')
    print('T2 finish\n')


def main():
    added_thread = threading.Thread(target=thread_job, name='T1')
    thread2 = threading.Thread(target=T2_job, name='T2')

    # 推荐以这种方式进行排布
    added_thread.start()
    thread2.start()
    thread2.join() # 等待thread2运行完成之后才会运行下面的语句
    added_thread.join()

    print('all done\n')


if __name__ == '__main__':
    main()

"""
可以接收的输出结果(按照顺序进行输出):
    T1 start
    T2 start
    T2 finish
    T1 finish
    all done
"""

3、队列

# @Time : 2021/10/26 11:02
# @Author : Li Kunlun
# @Description :

import threading
import time

from queue import Queue


# 函数的参数是一个列表l和一个队列q,函数的功能是,
# 对列表的每个元素进行平方计算,将结果保存在队列中
def job(l, q):
    for i in range(len(l)):
        l[i] = l[i] ** 2
    q.put(l)


def multithreading():
    # 在多线程函数中定义一个Queue,用来保存返回值,代替return,
    # 定义一个多线程列表,初始化一个多维数据列表。
    q = Queue()  # q中存放返回值,代替return的返回值
    threads = []

    data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]

    # 在多线程函数中定义四个线程,启动线程,将每个线程添加到多线程的列表中
    for i in range(4):
        t = threading.Thread(target=job, args=(data[i], q))
        t.start()
        threads.append(t)  # todo 把每个线程append到线程列表中

    # 分别join四个线程到主线程
    for thread in threads:
        thread.join()

    # 定义一个空的列表results,将四个线程运行后保存在队列中的结果返回给空列表results
    results = []
    for _ in range(4):
        results.append(q.get())
    print(results)


if __name__ == '__main__':
    multithreading()

2、多进程

1、queue

# @Time : 2021/10/26 13:55
# @Author : Li Kunlun
# @Description : 存储进程输出

import multiprocessing as mp

"""
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后
再从队列中取出结果, 继续加载运算。
因为:多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果
"""


# 该函数没有返回值,将结果存放到队列中
def job(q):
    res = 0
    for i in range(1000):
        res += i + i ** 2 + i ** 3
    q.put(res)  # queue


if __name__ == '__main__':
    # 定义一个多进程队列,用以存储结果
    q = mp.Queue()

    # 定义两个线程函数,用来处理同一个任务, args 的参数只要一个值的时候,
    # 参数后面需要加一个逗号,表示args是可迭代的,不加逗号会报错
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    # 上面是分两批处理的,所以这里分两批输出,将结果分别保存
    res1 = q.get()
    res2 = q.get()
    # 计算最终的结果
    print(res1 + res2)

2、pool

# @Time : 2021/10/26 14:07
# @Author : Li Kunlun
# @Description : 进程池


import multiprocessing as mp


def job(x):
    return x * x


def multicore():
    # Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
    pool = mp.Pool(processes=2)
    # map() 放入迭代参数,返回多个结果
    res = pool.map(job, range(10))
    print(res)

    # apply_async()只能放入一组参数,并返回一个结果
    res = pool.apply_async(job, (2,))
    print(res.get())

    # 如果想得到map()的效果apply_async需要通过迭代
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    print([res.get() for res in multi_res])


if __name__ == '__main__':
    multicore()

3、lock

# @Time : 2021/10/26 14:15
# @Author : Li Kunlun
# @Description : 进程锁

import multiprocessing as mp
import time


def job(v, num, l):
    l.acquire()
    for _ in range(4):
        time.sleep(0.1)
        # 两个进程对共享变量进行操作,没有加锁的会有冲突
        v.value += num  # 获取共享变量值
        print(v.value)
    l.release()


def multicore():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1, l))
    p2 = mp.Process(target=job, args=(v, 3, l))
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

"""
控制台输出:
    1
    2
    3
    4
    7
    10
    13
    16
"""
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-10-27 12:47:35  更:2021-10-27 12:48:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/16 0:00:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码