目? ? 录
1. queue 类使用场景
2. queue 类
2.1?queue 类构造函数
2.2?queue 类属性和方法
3. queue.Queue 使用场景
3.1 创建队列
3.2 使用?empty()、full()、qsize() 方法查看队列状态
3.3 使用 put() 、put_nowait() 方法插入队列
3.4 使用 get() 、get_nowait() 方法读取队列
3.5 查看队列的三个共享变量
3. queue 类使用示范
3.1 生产者-消费者经典模型
3.2 更清楚的显示队列的有序性
Python 多线程编程目录
Python 多线程编程-01-threading 模块初识
Python 多线程编程-02-threading 模块-锁的使用
Python 多线程编程-03-threading 模块 - Condition
Python 多线程编程-04-threading 模块 - Event
Python 多线程编程-05-threading 模块 - Semaphore 和 BoundedSemaphore
Python 多线程编程-06-threading 模块 - Timer
Python 多线程编程-07-threading 模块 - Barrier
Python 多线程编程-08-threading 复习
1. queue 类使用场景
????????Queue/queue 模块中的 queue 类是该模块最经典的类,它实现的是 FIFO?队列,先入先出队列,其添加的第一个任务是第一个检索的任务,这也是日常生活中最经典的场景之一。
2. queue 类
2.1?queue 类构造函数
????????Queue.Queue(maxsize = 0?)
FIFO 队列的构造函数。?maxsize 是一个整数,用于设置可以放入队列的项目数的上限。达到此大小后,将不再延长队列长度,直到消耗队列项。如果?maxsize 小于或等于0,则队列大为无限队列。
2.2?queue 类属性和方法
queue.Queue 的属性和方法
序号 | 属性和方法 | 描述 | 1 | 属性?all_tasks_done | 一个共享变量, threading.Condition 类对象 | 2 | 方法?empty() | 返回一个布尔值,判断队列是否为空。 | 3 | 方法?full() | 返回一个布尔值,判断队列是否满了。 | 4 | 方法?get(block=True, timeout=None)? | 从队列里取数据。如果为空的话,blocking = False 直接报 empty异常。如果blocking = True,就是等一会,timeout必须为 0 或正数。None为一直等下去,0为不等,正数n为等待n秒还不能读取,报empty异常 | 5 | 方法?get_nowait() | 相当于get(False) | 6 | 方法 join() | 在队列中所有元素执行完毕并且调用 task_done()?信号之前,保持阻塞 | 7 | 属性?max_size | 队列的最大长度 | 8 | 属性 mutex | 互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。共有两种操作::require 获取锁,release释放锁。同时该互斥锁被三个共享变量同时享有,即操作 condition 时的 require 和 release 操作也就是操作了该互斥锁。 | 9 | 属性 not_empty | 一个共享变量, threading.Condition 类对象 | 10 | 属性 not_full | 一个共享变量, threading.Condition 类对象 | 11 | 方法 put(item, block=True, timeout=None) | 将 item 放入队列。如果 block 为 True(默认) 且 timeout 为 None,则在有可用空间前将其阻塞;如果 timeout 为正值,则最多阻塞?timeout 秒;如果 block 为 False,则抛出 Empty 异常。 | 12 | 方法 put_nowait(item) | 相当于put(item,False) | 13 | 方法 qsize() | 返回队列的大小 | 14 | 属性 queue | 一个 collections.deque 类对象 | 15 | 方法 task_done() | 用于表示队列中的某个元素已经执行完毕,该方法会被 join() 方法使用。 | 16 | 属性 unfinished_tasks | 每当有 item 被 put 到队列时,未完成任务的计数(Queue 对象 的 unfinished_tasks 属性)就会加 1。 每当调用 task_done()方法时,计数就会减 1。当未完成任务的数量降至 0 时,Queue.join()的阻塞消失。 |
3. queue.Queue 使用场景
3.1 创建队列
? ? ??首先创建一个无限或者有限的 FIFO 队列?
q=queue.Queue(maxsize=0) #无限 FIFO 队列
q=queue.Queue() #无限 FIFO 队列
q=queue.Queue(maxsize=5) #长度为 5 的 FIFO 队列
??
3.2 使用?empty()、full()、qsize() 方法查看队列状态
? ? ??使用?empty()、full()、qsize() 查看队列是空,是满,以及队列确切长度
q=queue.Queue(5)
print(q.empty())
print(q.full())
print(q.qsize())
? ? ? ? 运行结果:
???????????? ?
3.3 使用 put() 、put_nowait() 方法插入队列
# 创建空队列
q=queue.Queue(5)
#查看队列
print(q.empty())
print(q.full())
print(q.qsize())
#依次插入 5 个数据进入队列
items=list(range(5))
for item in items:
if(item%2==0):
q.put(item)
else:
q.put_nowait(item)
#查看队列
print(q.empty())
print(q.full())
print(q.qsize())
?运行结果如下:

3.4 使用 get() 、get_nowait() 方法读取队列
# 创建空队列
q=queue.Queue(5)
#查看队列
print("*"*20)
print(q.empty())
print(q.full())
print(q.qsize())
#依次插入 5 个数据进入队列
items=list(range(5))
for item in items:
if(item%2==0):
q.put(item)
else:
q.put_nowait(item)
print("*"*20)
for item in items:
if(item%2==0):
print(q.get())
else:
print(q.get_nowait())
#查看队列
print("*"*20)
print(q.empty())
print(q.full())
print(q.qsize())
?运行结果如下:

3.5 查看队列的三个共享变量
????????这三个共享变量分别是属性?all_tasks_done、属性 not_empty、属性 not_full,它们都是threading.Condition 类对象。
????????从下面的代码可以看出,当队列是满的时候,先 acquire?all_tasks_done,得到 True 的结果后,acquire?not_empty ,则被阻塞。其实反过来也是这样,这是因为对这三个共享变量的操作都需要获得队列的互斥锁?mutex,在这三个任意一个 Condition 变量没有释放 notify/notifyAll 时候,另外两个共享变量是无法操作的。
import queue
# 创建空队列
q=queue.Queue(5)
#查看队列
print("*"*20)
print(q.empty())
print(q.full())
print(q.qsize())
#依次插入 5 个数据进入队列
items=list(range(5))
for item in items:
if(item%2==0):
q.put(item)
else:
q.put_nowait(item)
# 查看各项属性
print("*"*20)
print("About all_tasks_done")
print(q.all_tasks_done)
print(q.all_tasks_done.acquire())
print("About not_empty")
print(q.not_empty)
print(q.not_empty.acquire())
print("About not_full")
print(q.not_full)
print(q.not_full.acquire())

3. queue 类使用示范
3.1 生产者-消费者经典模型
下面的代码是生产者-消费者的经典模型,每行代码解释请看代码中的注释。
import random
import queue
import threading
import time
# 创建一个 MyThread 类,改写 run 方法
class MyThread(threading.Thread):
def __init__(self,func,args,name):
super().__init__()
self.func=func
self.args=args
self.name=name
def run(self):
self.func(*self.args)
# 从 fifo 队列中获得一个元素
def get_fifo_queue(fifo_queue):
print("Now is {0},Coming in get_fifo_queue ".format(time.ctime()))
val=fifo_queue.get(1)
print("get val is {0}".format(val))
print("After get one from fifo queue,now its size is {0}".format(fifo_queue.qsize()))
# 往 fifo 队列中推进一个元素
def put_fifo_queue(fifo_queue,val):
print("Now is {0},Coming in put_fifo_queue ".format(time.ctime()))
print("put in {0}".format(val))
fifo_queue.put(val)
print("After put one from fifo queue,now its size is {0}".format(fifo_queue.qsize()))
# 生产者函数,根据给定的物品个数不断地往 fifo 中推进元素,并且随机休息 1~5 秒
def producer(fifo_queue,goods_num):
for tmp in range(goods_num):
put_fifo_queue(fifo_queue,random.randint(1,20))
time.sleep(random.randint(1,5))
# 消费者函数,根据给定的物品个数不断地从 fifo 中获得元素,并且随机休息 1~5 秒
def consumer(fifo_queue,goods_num):
for tmp in range(goods_num):
get_fifo_queue(fifo_queue)
time.sleep(random.randint(1,5))
# 此处是生产者-消费者函数列表,请注意此时是先生产者,后消费者。
funcs=[producer,consumer]
test_goods_num=5
test_fifo_queue=queue.Queue(10)
threads=[]
# 根据生产者-消费者函数列表个数,创建相应的 MyThread
for func in funcs:
t=MyThread(func,(test_fifo_queue,test_goods_num),func.__name__)
threads.append(t)
# 启动线程
for t in threads:
t.start()
# 等待线程完成
for t in threads:
t.join()
print("All done!")
根据下面的运行结果可以很清楚的看到,虽然整篇代码中没有明确用到锁,但是生产者-消费者是有序的按照 FIFO 顺序进行了相应活动。?

3.2 更清楚的显示队列的有序性
3.1 的代码是先生成、启动生产者线程,后生成、启动消费者线程。如果我把这两个顺序换一下呢?
# 调换生产者、消费者的顺序
funcs=[consumer,producer]
test_goods_num=5
test_fifo_queue=queue.Queue(10)
threads=[]
for func in funcs:
t=MyThread(func,(test_fifo_queue,test_goods_num),func.__name__)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
print("All done!")
????????根据以下代码可以看到,即使是先调用了消费者线程,但是在尚没有物品的情况下,会自动将队列的主动权交给生产者线程,等生产者线程推进队列一个元素后,又会自动的进行消费活动,此时代码工作者不需要什么判断、获得锁工作,这一系列行为是隐式完成的。

'''
要是大家觉得写得还行,麻烦点个赞或者收藏吧,想个博客涨涨人气,非常感谢!
更欢迎大家一起来讨论,共同学习进步!
'''
|