IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Python之路 34:万字总结:并发与并行、锁(GIL、同步锁、死锁与递归锁)、信号量、线程队列、生消模型、进程(基础使用、进程通信、进程池、回调函数)、协程 -> 正文阅读

[Python知识库]Python之路 34:万字总结:并发与并行、锁(GIL、同步锁、死锁与递归锁)、信号量、线程队列、生消模型、进程(基础使用、进程通信、进程池、回调函数)、协程

内容:

  • 同步锁

  • 死锁、递归锁

  • 信号量和同步对象(暂时了解即可)

  • 队列------生产者和消费者模型

  • 进程(基础使用、进程通信、进程池、回调函数)

  • 协程

一、并发并行与同步异步的概念

1.1、并发和并行

概念

并行处理(Parallel Processing)是计算机系统中能同时执行两个或更多个处理的一种计算方法。并行处理可同时工作于同一程序的不同方面。并行处理的主要目的是节省大型和复杂问题的解决时间。

并发处理(concurrency Processing):指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机(CPU)上运行,但任一个时刻点上只有一个程序在处理机(CPU)上运行

简单来说:

  • 并发:指系统具有处理多个任务的能力
  • 并行:指系统具有 同时 处理多个任务的能力

并发的关键是你有处理多个任务的能力,不一定要同时。

并行的关键是你有同时处理多个任务的能力。

所以说,并行是并发的子集

? 无论是并行还是并发,在用户看来都是’同时’运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)

并行: 并行:同时运行,只有具备多个cpu才能实现并行

? 单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的

? 有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,

? 一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术

? 而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行

1.2、同步与异步

概念

当进程执行到一个IO(等待外部数据)的时候

  • 同步:等
  • 异步:不等,一直到数据接受成功过,再回来处理

二、GIL(Global Interpreter Lock)

前言

'''
定义:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
'''
结论:在Cpython解释器中,同一个进程下开启的多线程,无论你开启多少个线程、你有多少个CPU,Python在执行的时候会淡定的在同一时刻只允许一个线程运行,无法利用多核优势

PS:

? 首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

这篇文章透彻的剖析了GIL对python多线程的影响,强烈推荐看一下:http://www.dabeaz.com/python/UnderstandingGIL.pdf

所以,你想用Python在一个进程下跑多个线程,不可能

可能大家会想:我靠,学毛的Python!!

且看下文

2.1、GIL的概念

? GIL:Global Interpreter Lock,又叫全局解释器锁,每个线程在执行的过程中都需要先获取GIL,保证同一时刻只有一个线程在运行,目的是解决多线程同时竞争程序中的全局变量而出现的线程安全问题。

? GIL其本质类似操作系统的Mutex。

? GIL的功能:在CPython解释器中执行的每一个Python线程,都会先锁住自己,以阻止别的线程执行

? CPython引进GIL,可以最大程度上规避内存管理这样复杂的竞争风险问题

? 当然,CPython不可能容忍一个线程一直独占解释器,它会轮流执行Python线程。这样一来,用户看到的就是“伪”并行,即Python线程在交替执行,来模拟真正并行的线程

2.2、关于 IO密集型任务 与 计算密集型任务

1、概念

计算密集型:

? 要进行大量的数值计算,例如进行上亿的数字计算、计算圆周率、对视频进行高清解码等等。这种计算密集型任务虽然也可以用多任务完成,但是花费的主要时间在任务切换的时间,此时CPU执行任务的效率比较低。

IO密集型:

? 涉及到网络请求(time.sleep())、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。

? 有了GIL的存在,同一时刻同一进程中只有一个线程被执行

? 听到这里,有的同学立马质问:进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,也就是说python没用了,php才是最牛逼的语言?

? No,No,No,要解决这个问题,我们需要在几个点上达成一致:

1. cpu到底是用来做计算的,还是用来做I/O的?

2. 多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能

3. 每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处 

引例:

? 一个工人相当于cpu,此时计算相当于工人在干活,I/O阻塞相当于为工人干活提供所需原材料的过程,工人干活的过程中如果没有原材料了,则工人干活的过程需要停止,直到等待原材料的到来。

如果你的工厂干的大多数任务都要有准备原材料的过程(I/O密集型),那么你有再多的工人,意义也不大,还不如一个人,在等材料的过程中让工人去干别的活,

反过来讲,如果你的工厂原材料都齐全,那当然是工人越多,效率越高

结论:

对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用

当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地

2、任务分析

#(1)分析:
我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程

#(2)单核情况下,分析结果: 
  如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
  如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜

#(3)多核情况下,分析结果:
  如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
  如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜

 
#(4)结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

3、总结

对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用

对于IO密集型任务:

? Python的多线程是有意义的,可以采用 多进程 + 协程

对于计算密集型任务:

? Python的多线程就不推荐,多进程效率相对较高,但本质上讲:Python就不适用了

应用:

多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析

三、同步锁

3.1、引例

下面我们通过几个小例子来了解同步锁

示例代码一:

我们设置一个全局变量num = 100,执行100次函数subtractNum,每次执行 num 都减去 1

import time
import threading

num = 100  # 设定一个共享变量
thread_list = []


def subtractNum():
    global num  # 在每个线程中都获取这个全局变量
    num -= 1

    # temp = num
    # # print('--get num:',num )
    # time.sleep(0.1)
    # num = temp - 1  # 对此公共变量进行-1操作


for i in range(100):
    t = threading.Thread(target=subtractNum)
    t.start()
    thread_list.append(t)

for t in thread_list:  # 等待所有线程执行完毕
    t.join()

print('final num:', num)

运行结果:

final num: 0

进程已结束,退出代码为 0

示例代码二:

对函数 subtractNum 稍作修改

我们添加temp = num 、 num = temp 、time.sleep

都是为了使得subtractNum()的执行时间变长

import time
import threading

num = 100  # 设定一个共享变量
thread_list = []


def subtractNum():
    global num  # 在每个线程中都获取这个全局变量
    # num -= 1

    temp = num
    print('--get num:', num)
    time.sleep(0.1)
    num = temp - 1  # 对此公共变量进行-1操作


for i in range(100):
    t = threading.Thread(target=subtractNum)
    t.start()
    thread_list.append(t)

for t in thread_list:  # 等待所有线程执行完毕
    t.join()

print('final num:', num)

运行结果:

--get num: 100
--get num: 100
--get num: 100
--get num: 100
--get num: 100
--get num: 100
--get num: 100
...........
final num: 99

进程已结束,退出代码为 0

为啥示例代码二 num 最终不是 0 了呢??

因为我们的减法是在一个子线程结束才能实现的,而在示例代码二中,(受time.sleep等的影响)第一个开启的线程还没结束,100个线程已经全部开启,每个线程拿到的num值都是100,而不是递减得到的结果

我们可以修改time.sleep的时间,num的最终值可以发生改变

如:我的电脑

time.sleep(0.0000000001)

运行结果:

--get num: 100
--get num: 100
.........
--get num: 100
--get num: 100
--get num: 100
--get num: 99
--get num: 99
final num: 98

示例代码一中实现了最终num = 0 是因为计算实在太快了,下一个线程还没有开始,上一个线程已经计算结束了

总结:

出现线程安全问题

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gKx8GPUO-1643895313065)(C:\Users\pc\AppData\Roaming\Typora\typora-user-images\image-20220202154623848.png)]

多个线程都在同时操作同一个共享资源,所以造成了资源破坏,怎么办呢?(join会造成串行,失去所线程的意义)

==> 同步锁

3.2、同步锁

对上面的示例代码稍做修改,添加三行代码即可

l = threading.Lock()
l.acquire()
l.release()
import time
import threading

num = 100  # 设定一个共享变量
thread_list = []
l = threading.Lock()


def subtractNum():
    global num  # 在每个线程中都获取这个全局变量
    l.acquire()
    # num -= 1

    temp = num
    print('--get num:', num)
    time.sleep(0.01)
    num = temp - 1  # 对此公共变量进行-1操作
    l.release()


for i in range(100):
    t = threading.Thread(target=subtractNum)
    t.start()
    thread_list.append(t)

for t in thread_list:  # 等待所有线程执行完毕
    t.join()

print('final num:', num)

运行结果:

--get num: 100
--get num: 99
......
--get num: 1
final num: 0

进程已结束,退出代码为 0

四、死锁与递归锁

4.1、死锁

死锁:

是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态

下面是一个死锁的例子:

from threading import Thread, Lock
import time

# 创建A、B两把锁
mutexA = Lock()
mutexB = Lock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('%s 拿到A锁' % self.name)  # self.name 对应线程的名字,系统会自动给线程赋名

        mutexB.acquire()
        print('%s 拿到B锁' % self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('%s 拿到B锁' % self.name)
        time.sleep(2)

        mutexA.acquire()
        print('%s 拿到A锁' % self.name)
        mutexA.release()

        mutexB.release()


if __name__ == '__main__':
    for i in range(5):
        t = MyThread()
        t.start()

运行结果:

Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-2 拿到A锁

然后程序就卡在这了

解决办法:使用递归锁

4.2、递归锁

在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁

示例代码:

from threading import Thread, Lock, RLock
import time

mutexA = mutexB = RLock()
'''
一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待
等待该线程释放所有锁,即counter递减到0为止
'''

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()  # count = 1
        print('%s 拿到A锁' % self.name)  # self.name 对应线程的名字,系统会自动给线程赋名

        mutexB.acquire()  # count = 2
        print('%s 拿到B锁' % self.name)
        mutexB.release()  # count = 1

        mutexA.release()  # count = 0

    def func2(self):
        mutexB.acquire()
        print('%s 拿到B锁' % self.name)
        time.sleep(2)

        mutexA.acquire()
        print('%s 拿到A锁' % self.name)
        mutexA.release()

        mutexB.release()


if __name__ == '__main__':
    for i in range(5):
        t = MyThread()
        t.start()

运行结果:

Thread-1 拿到A锁
Thread-1 拿到B锁
Thread-1 拿到B锁
Thread-1 拿到A锁
Thread-2 拿到A锁
Thread-2 拿到B锁
Thread-2 拿到B锁
Thread-2 拿到A锁
Thread-4 拿到A锁
Thread-4 拿到B锁
Thread-4 拿到B锁
Thread-4 拿到A锁
Thread-3 拿到A锁
Thread-3 拿到B锁
Thread-3 拿到B锁
Thread-3 拿到A锁
Thread-5 拿到A锁
Thread-5 拿到B锁
Thread-5 拿到B锁
Thread-5 拿到A锁

进程已结束,退出代码为 0

五、同步对象(Event)

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

event.isSet()  # 返回event的状态值;

event.wait()  # 如果 event.isSet()==False将阻塞线程;

event.set()  # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear()  # 恢复event的状态值为False。

示例代码:

我们创建一个老板与员工下班的故事(一下是废话,直接看代码就好):

老板说:*点才能下班 (此时event为Flase)

然后设置event为True

这时收到event为True

员工开始抱怨“哎……命苦啊!”,之后恢复event的状态值为False。

老板说:“下班了”

然后设置event为True

这时收到event为True

员工开始说“OhYeah!”

总之,老板通过event这个信号量控制员工是否可以说话(线程是否被阻塞)

import threading
import time


class Boss(threading.Thread):
    def run(self):
        print("BOSS:今晚大家都要加班到22:00。")
        print(event.isSet())  # False
        event.set()  # 开始打印“Worker:哎……命苦啊!”
        time.sleep(5)
        print("BOSS:<22:00>可以下班了。")
        print(event.isSet())
        event.set()  # 开始打印“Worker:OhYeah!”


class Worker(threading.Thread):
    def run(self):
        event.wait()  # event = False, 将阻塞线程, 一旦event被设定,它就是pass
        print("Worker:哎……命苦啊!")
        time.sleep(1)
        event.clear()
        event.wait()
        print("Worker:OhYeah!")


if __name__ == "__main__":
    # 创建五个Worker和一个Boss
    event = threading.Event()
    threads = []
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for t in threads:
        t.join()

运行结果:

BOSS:今晚大家都要加班到22:00False
Worker:哎……命苦啊!Worker:哎……命苦啊!Worker:哎……命苦啊!Worker:哎……命苦啊!

Worker:哎……命苦啊!


BOSS:<22:00>可以下班了。
False
Worker:OhYeah!
Worker:OhYeah!Worker:OhYeah!Worker:OhYeah!
Worker:OhYeah!



进程已结束,退出代码为 0

六、信号量(Semaphore)

信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。

计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)

BoundedSemaphore与Semaphore的唯一区别:

前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

示例代码:

import threading,time
class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():
            print(self.name)
            time.sleep(5)
            semaphore.release()
if __name__=="__main__":
    semaphore=threading.Semaphore(5)
    thrs=[]
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()

运行结果:

Thread-1
Thread-2
Thread-3
Thread-4
Thread-5
Thread-6Thread-7Thread-8

.....
Thread-95
Thread-94
Thread-99
Thread-98
Thread-100Thread-96Thread-97



进程已结束,退出代码为 0

七、线程队列

首先明白“队列”是多线程的利器,抛开多线程,列表就行了

7.1、对多线程,列表是不安全的数据结构

示例代码:

我们想用多线程不断remove一个列表的元素

我们每开一个线程,都让他去remove列表的最后一个元素

import threading
import time

l = [1, 2, 3, 4, 5]


def pri():
    while l:
        a = l[-1]
        print(a)
        time.sleep(1)
        try:
            l.remove(a)
        except Exception as e:
            print('----', a, e)


t1 = threading.Thread(target=pri, args=())
t1.start()
t2 = threading.Thread(target=pri, args=())
t2.start()

运行结果:

5
5
4---- 5 list.remove(x): x not in list

4
3
---- 4 list.remove(x): x not in list
3
2----
 3 list.remove(x): x not in list
2
1
---- 2 list.remove(x): x not in list
1
---- 1 list.remove(x): x not in list

进程已结束,退出代码为 0

很乱,按之前学的内容,我们肯定要加各种锁

但其实,完成上述功能,可以使用队列queue

7.2、队列

(1)方法介绍

import Queue
  • q = Queue.Queue(maxsize = 10)

    创建一个“队列对象“
    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

  • q.put(10)

    将一个值放入队列中

    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

  • q.get()

    将一个值从队列中取出
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

Python Queue模块有三种队列及构造函数:

1、Python Queue模块的FIFO队列先进先出。   class queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。               class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。        class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):

q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

(2)示例

示例代码一:

先进先出

import queue

# 先进后出
q = queue.LifoQueue()

q.put(34)
q.put(56)
q.put(12)

while True:
    data = q.get()
    print(data)

运行结果:

12
56
34

示例代码二:

优先级

import queue


# 优先级
q = queue.PriorityQueue()
q.put([5, 100])  # 第五级,内容为"100"
q.put([7, 200])  # 第七级,内容为"200"
q.put([3, "hello"])  # 第三级,内容为"hello"
q.put([4, {"name": "alex"}])

while True:
    data = q.get()
    print(data)
   

运行结果:

[3, 'hello']
[4, {'name': 'alex'}]
[5, 100]
[7, 200]

八、生产者消费者模型

8.1、为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

8.2、什么是生产者消费者模式

(1)概念

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个结耦的过程。

(2)示例

创建一个做包子的老板(Producer)和消费者(Consumer)的故事

import queue
import random
import threading
import time

q = queue.Queue()


def Producer(name):
    count = 0
    while count < 10:
        print("making........")
        time.sleep(random.randrange(3))
        q.put(count)
        print('Producer %s has produced %s baozi..' % (name, count))
        count += 1
        # q.task_done()
        # q.join()
        print("ok......")


def Consumer(name):
    count = 0
    while count < 10:
        time.sleep(random.randrange(4))
        if not q.empty():
            data = q.get()
            # q.task_done()
            # q.join()
            print(data)
            print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' % (name, data))
        else:
            print("-----no baozi anymore----")
        count += 1


p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()

运行结果:

making........
Producer A has produced 0 baozi..0
ok......
making........
Consumer B has eat 0 baozi...

Producer A has produced 1 baozi..
ok......
making........
1
Consumer B has eat 1 baozi...
-----no baozi anymore----
Producer A has produced 2 baozi..2
ok......
making........
Producer A has produced 3 baozi..

Consumer B has eat 2 baozi...
ok......
making........
Producer A has produced 4 baozi..
ok......
making........
3
Consumer B has eat 3 baozi...
Producer A has produced 5 baozi..
ok......
making........
4
Consumer B has eat 4 baozi...
Producer A has produced 6 baozi..
ok......
making........
Producer A has produced 7 baozi..
ok......
making........
5
Consumer B has eat 5 baozi...
Producer A has produced 8 baozi..
ok......
making........
Producer A has produced 9 baozi..
ok......
6
Consumer B has eat 6 baozi...
7
Consumer B has eat 7 baozi...
8
Consumer B has eat 8 baozi...

进程已结束,退出代码为 0

拓展:

添加 消费者C 与 消费者D

使用 q.task_done() 与 q.join() 完善代码

九、进程

仅看使用方法,跟线程几乎一样,还是那几个功能

5.1、进程的调用

(1)方法一:

from multiprocessing import Process
import time


def func(name):
    time.sleep(1)
    print('hello', name, time.ctime())


if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = Process(target=func, args=('coder',))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')

运行结果:

hello coder Thu Feb  3 14:45:55 2022
hello coder Thu Feb  3 14:45:55 2022
hello coder Thu Feb  3 14:45:55 2022
end

进程已结束,退出代码为 0

(2)方法二:

from multiprocessing import Process
import time


class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        # self.name = name

    def run(self):
        time.sleep(1)
        print('hello', self.name, time.ctime())


if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')

运行结果:

hello MyProcess-2 Thu Feb  3 14:48:28 2022
hello MyProcess-1 Thu Feb  3 14:48:28 2022
hello MyProcess-3 Thu Feb  3 14:48:28 2022
end

进程已结束,退出代码为 0

(3)用PID了解父进程子进程

from multiprocessing import Process
import os
import time


def info(title):
    print("title:", title)
    print('parent process:', os.getppid())  # os.getppid() ==》 父进程的PID
    print('process id:', os.getpid())  # os.getpid() ==》 本身的PID


def f(name):
    info('function f')
    print('hello', name)


if __name__ == '__main__':
    info('main process line')
    time.sleep(1)
    print("------------------")
    p = Process(target=info, args=('coder',))
    p.start()
    p.join()

运行结果:

title: main process line
parent process: 21960
process id: 7088
------------------
title: coder
parent process: 7088
process id: 14012

进程已结束,退出代码为 0

5.2、进程的相关方法

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

5.3、进程间通信

目的:

  • 进程间互传消息
  • 共享数据

内容:

通信:

  • 队列 Queue
  • 管道 Pipe

数据共享:

  • manager

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

(1)进程队列Queue

进程间通信(IPC)方式二:队列(推荐使用)

from multiprocessing import Process, Queue
import queue


def f(q, n):
    # q.put([123, 456, 'hello'])
    q.put(n)
    print("son process", id(q))


if __name__ == '__main__':
    q = Queue()  # try: q=queue.Queue()
    print("main process", id(q))

    for i in range(3):
        p = Process(target=f, args=(q, i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

运行结果:

main process 2126061918528
son process 1851808750752
0
son process 3030240795808
1
son process 2415566658720
2

进程已结束,退出代码为 0

(2)管道

进程间通信(IPC)方式二:管道(不推荐使用,了解即可)

1、介绍

创建管道的类:

Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道

参数介绍:

dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

主要方法:

    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

其他方法:

conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    

conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。

2、案例

示例代码:

我们通过创建一个双向管道,通过

  • conn.send() # 发送对象
  • conn.recv() # 接收对象

来实现简单的通信

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([12, {"name": "coder"}, 'hello'])  # 通过连接发送对象
    response = conn.recv()  # 接收conn2.send(obj)发送的对象。
    print("response", response)
    conn.close()  # 关闭连接
    print("q_ID2:", id(conn))


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 创建一个双向管道
    print("q_ID1:", id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # # 接收发送的对象。
    parent_conn.send("儿子你好!")
    p.join()

运行结果:

q_ID1: 1478665507024
[12, {'name': 'coder'}, 'hello']
response 儿子你好!
q_ID2: 2056262961424

进程已结束,退出代码为 0

(3)manage

1、介绍

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据(事实上Manager的功能远不止于此)。

他支持的类型非常多,包括:Value、Araay、list、dict、Queue、Lock等。

2、案例

示例一:

示例代码:

我们在主进程创建俩共享数据:d 和 l ,然后在子进程中修改他们

import multiprocessing


def worker(d, l):
    l += range(11, 16)
    for i in range(1, 6):
        key = "key{0}".format(i)
        val = "val{0}".format(i)
        d[key] = val


if __name__ == "__main__":
    # 创建Manager对象
    manager = multiprocessing.Manager()
    # 创建共享字典
    d = manager.dict()
    # 创建共享列表
    l = manager.list()
    p = multiprocessing.Process(target=worker, args=(d, l))
    p.start()
    p.join()
    print(d)
    print(l)

运行结果:

{'key1': 'val1', 'key2': 'val2', 'key3': 'val3', 'key4': 'val4', 'key5': 'val5'}
[11, 12, 13, 14, 15]

进程已结束,退出代码为 0

示例二:

示例代码:

from multiprocessing import Process, Manager


def f(d, l, n):
    d[n] = n
    d["name"] = "alvin"
    l.append(n)
    # print("l",l)


if __name__ == '__main__':

    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))
        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d, l, i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)

运行结果:

{0: 0, 'name': 'alvin', 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9}
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

进程已结束,退出代码为 0

5.4、进程池

1、概念

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

2、用法

(1)主要的方法:

  1. apply:从进程池里取一个进程并执行
  2. apply_async:apply的异步版本
  3. terminate:立刻关闭线程池
  4. join:主进程等待所有子进程执行完毕,必须在close或terminate之后
  5. close:等待所有进程结束后,才关闭线程池

(2)具体用法:

创建进程池的类:

? 如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

Pool([numprocess  [,initializer [, initargs]]]):创建进程池 

参数介绍:

1)numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
(2)initializer:是每个工作进程启动时要执行的可调用对象,默认为None3)initargs:是要传给initializer的参数组

方法介绍:

主要方法:

1)p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()2)p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。

(3)p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

(4)P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

其他方法(了解部分)

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
(1)obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
(2)obj.ready():如果调用完成,返回True3)obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
(4)obj.wait([timeout]):等待结果变为可用。
(5)obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

3、案例

示例代码:

我们在进程池中创建五个进程

现在需要执行10次函数foo()

我们利用 apply_async()不断的从进程池中取进程去执行foo()

from multiprocessing import Pool
import time


def foo(args):
    time.sleep(1)
    print(args)


if __name__ == '__main__':
    p = Pool(5)
    for i in range(10):
        p.apply_async(func=foo, args=(i,))
	
    # 注意join与close调运顺序是固定的,先close后join
    p.close()  # 等子进程执行完毕后关闭线程池
    # time.sleep(2)
    # p.terminate()  # 立刻关闭线程池
    p.join()
    

运行结果:

01

2
3
4
6
5
97
8


进程已结束,退出代码为 0

4、回调函数

(1)概念

百度百科说:回调函数就是一个通过函数指针调用的函数。如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,我们就说这是回调函数。回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。

通俗理解就是:把一个函数作为参数传给另一个函数,第一个函数称为回调函数。这个被传入的参数其实是函数指针,即指向一个函数的指针(地址)。

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

(2)案例

示例一:

示例代码:

from multiprocessing import Pool
import requests
import json
import os


def get_page(url):
    print('<进程%s> get %s' % (os.getpid(), url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {'url': url, 'text': respone.text}


def pasrse_page(res):
    print('<进程%s> parse %s' % (os.getpid(), res['url']))
    parse_res = 'url:<%s> size:[%s]\n' % (res['url'], len(res['text']))
    with open('db.txt', 'a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls = [
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p = Pool(3)
    res_l = []
    for url in urls:
        res = p.apply_async(get_page, args=(url,), callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l])  # 拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

运行结果:

<进程12944> get https://www.baidu.com
<进程4036> get https://www.python.org
<进程16732> get https://www.openstack.org
<进程12944> get https://help.github.com/
<进程19608> parse https://www.baidu.com
<进程4036> get http://www.sina.com.cn/
<进程19608> parse https://www.python.org
<进程19608> parse https://help.github.com/
<进程19608> parse http://www.sina.com.cn/
<进程19608> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n<!--STAT 
......

进程已结束,退出代码为 0

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

示例二:

示例代码:

import time
from multiprocessing import Pool


def work(n):
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = Pool()

    res_l = []
    for i in range(10):
        res = p.apply_async(work, args=(i,))
        res_l.append(res)

    p.close()
    p.join()  # 等待进程池中所有进程执行完毕

    nums = []
    for res in res_l:
        nums.append(res.get())  # 拿到所有结果
    print(nums)  # 主进程拿到所有的处理结果,可以在主进程中进行统一进行处理

运行结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

进程已结束,退出代码为 0

十、协程

协程,又称微线程,纤程。英文名Coroutine。

通常我们认为线程是轻量级的进程,因此我们也把协程理解为轻量级的线程即微线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

通常在Python中我们进行并发编程一般都是使用多线程或者多进程来实现的,对于计算型任务由于GIL的存在我们通常使用多进程来实现,而对于IO型任务我们可以通过线程调度来让线程在执行IO任务时让出GIL,从而实现表面上的并发。

其实对于IO型任务我们还有一种选择就是协程,协程是运行在单线程当中的"并发",协程相比多线程一大优势就是省去了多线程之间的切换开销,获得了更大的运行效率。

10.1、yield与协程

(1)回顾yield

yield是学习生成器时get到的知识点

生成器本质上也是迭代器,不过它比较特殊。

生成器的创建方式也比迭代器简单很多,大体分为以下 2 步:

  1. 定义一个以 yield 关键字标识返回值的函数;
  2. 调用刚刚创建的函数,即可创建一个生成器。

如果你忘记了yield怎么用,就先把yield当作”return“(在程序中返回某个值)

下面我们通过一段代码就可以搞懂yield的用法。

示例代码:

def intNum():
    print("开始执行")
    for i in range(5):
        temp = yield i
        print(temp)
        print("继续执行")


if __name__ == '__main__':
    num = intNum()
    # 调用 next() 内置函数
    print(next(num))
    # 调用 __next__() 方法
    print(num.__next__())
    # # 通过for循环遍历生成器
    # for i in num:
    #     print(i)

运行结果:

开始执行
0
None
继续执行
1

进程已结束,退出代码为 0

我们通过分析这段代码就可以了解yield的基础使用:

  1. 首先程序开始执行后,因为intNum()函数中有yield关键字,所以执行main函数中第一行num = intNum()时,intNum函数并不会真的执行,而是先得到一个生成器

  2. 直到调用next方法【print(next(num))】,intNum函数正式开始执行,先执行intNum函数中的print方法,然后进入循环

  3. 程序遇到yield关键字,然后把yield想想成return,return了一个值之后,程序停止,并且没有执行赋值给temp

  4. 所以,此时程序输出的结果是:

    开始执行
    0
    
  5. 后开始执行num.next()时,这个时候是从刚才那个next程序停止的地方开始执行的,也就是要执行temp的赋值操作,这时候要注意,这个时候赋值操作的右边是没有值的(因为刚才那个是return出去了,并没有给赋值操作的左边传参数),所以这个时候temp赋值是None,所以接着下面的输出就是:

    None
    继续执行
    1
    

(2)yield与协程(利用生成器实现协程)

我们通过“生产者-消费者”模型来看一下协程的应用,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产。

示例代码:

import time

"""
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
"""


# 注意到consumer函数是一个generator(生成器):
# 任何包含yield关键字的函数都会自动成为生成器(generator)对象

def consumer():
    r = ''
    while True:
        # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
        #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
        #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
        #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'


def produce(c):
    # 1、首先调用c.next()启动生成器
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
        cr = c.send(n)
        # 4、produce拿到consumer处理的结果,继续生产下一条消息;
        print('[PRODUCER] Consumer return: %s' % cr)
    # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
    c.close()


if __name__ == '__main__':
    # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
    c = consumer()
    produce(c)

运行结果:

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK

进程已结束,退出代码为 0

10.2、greenlet

(1)概念

Greenlet是python的一个C扩展,来源于Stackless python,旨在提供可自行调度的‘微线程’, 即协程。相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。

greentlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.

示例代码:

from greenlet import greenlet


def func1():
    print(12)
    gr2.switch()  # greenlet用switch来表示协程的切换
    print(34)
    gr2.switch()


def func2():
    print(56)
    gr1.switch()
    print(78)


if __name__ == '__main__':
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    gr1.switch()

运行结果:

12
56
34
78

进程已结束,退出代码为 0

(2)gevent模块实现协程

Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。

gevent是第三方库,通过greenlet实现协程,其基本思想是:

当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

示例代码:

import gevent
import time


def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")


def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")


start = time.time()

gevent.joinall(
    [gevent.spawn(foo),
     gevent.spawn(bar)]
)

print(time.time() - start)

运行结果:

running in foo
switch to bar
switch to foo again
switch to bar again
5.059036016464233

进程已结束,退出代码为 0

当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:

from gevent import monkey

monkey.patch_all()
import gevent
from urllib import request
import time


def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))


start = time.time()

gevent.joinall([
    gevent.spawn(f, 'https://www.baidu.com'),
    gevent.spawn(f, 'https://www.github.com/'),
    gevent.spawn(f, 'https://zhihu.com/'),
])

# f('https://www.baidu.com')
# f('https://www.github.com/')
# f('https://zhihu.com/')

print(time.time() - start)

运行结果:

GET: https://www.baidu.com
GET: https://www.github.com/
GET: https://zhihu.com/
227 bytes received from https://www.baidu.com.
62901 bytes received from https://zhihu.com/.
282535 bytes received from https://www.github.com/.
15.991364240646362

进程已结束,退出代码为 0

10.3、eventlet实现协程(了解)

eventlet 是基于 greenlet 实现的面向网络应用的并发处理框架,提供“线程”池、队列等与其他 Python 线程、进程模型非常相似的 api,并且提供了对 Python 发行版自带库及其他模块的超轻量并发适应性调整方法,比直接使用 greenlet 要方便得多。

其基本原理是调整 Python 的 socket 调用,当发生阻塞时则切换到其他 greenlet 执行,这样来保证资源的有效利用。需要注意的是:
eventlet 提供的函数只能对 Python 代码中的 socket 调用进行处理,而不能对模块的 C 语言部分的 socket 调用进行修改。对后者这类模块,仍然需要把调用模块的代码封装在 Python 标准线程调用中,之后利用 eventlet 提供的适配器实现 eventlet 与标准线程之间的协作。
虽然 eventlet 把 api 封装成了非常类似标准线程库的形式,但两者的实际并发执行流程仍然有明显区别。在没有出现 I/O 阻塞时,除非显式声明,否则当前正在执行的 eventlet 永远不会把 cpu 交给其他的 eventlet,而标准线程则是无论是否出现阻塞,总是由所有线程一起争夺运行资源。所有 eventlet 对 I/O 阻塞无关的大运算量耗时操作基本没有什么帮助。

10.4、总结

(1)协程的好处

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

(2)协程的缺点

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-02-04 10:59:51  更:2022-02-04 11:00:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 8:58:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码