线程
线程和程序其他部分并行地调用函数(或其他可调用的对象类型),所有线程均在一个进程中运行。线程经常用于非阻塞的输出调用和GUI中长时间运行的任务。线程还可作为表达成独立运行任务的算法的模型。
线程拥有多个优点:
线程也有三点缺陷:
- 线程运行程序里的函数
- 线程同步化和队列
- 全局解释器锁(DIL)
_thread 模块
基本用法
示例:thread_1.py
"派生出子线程,直到输入“q”"
import _thread
def child(tid_int):
"子线程"
print('Hello from child!', tid_int)
def parent():
"父线程"
tid_int = 0
while True:
_thread.start_new_thread(child, (tid_int,))
if input() == 'q':
break
tid_int += 1
if __name__ == '__main__':
parent()
输出:thread_1.py
Hello from child! 0
Hello from child! 1
Hello from child! 2
q
- 关于线程的代码只有两行:载入
_thread 模块和创建线程的调用。 _thread.start_new_thread 函数用于开始一个新线程。它接受一个函数对象(或其他可调用对象)和一个参数元组。它本身返回一个没有用的值,它派生出来的线程在运行的函数返回后安静地退出。此外,如果线程中的函数抛出未获捕捉的异常,则将打印出堆栈跟踪记录并退出线程,但程序的其他部分仍将运行。
用_thread 编写线程的其他方法
线程也可以运行lambda 函数或者某个对象的绑定方法(或其他可调用对象):
示例:thread_alt.py
"所有三个线程均打印“4294967296”"
import _thread
import time
def power(a_int):
"打印a_int的32次方"
print(a_int ** 32)
class Power:
"次方"
def __init__(self, a_int):
self.a_int = a_int
def act(self):
"打印a_int的32次方"
print(self.a_int ** 32)
def main():
_thread.start_new_thread(power, (2,))
_thread.start_new_thread(lambda: power(2), ())
obj_Power = Power(2)
_thread.start_new_thread(obj_Power.act, ())
time.sleep(0.01)
if __name__ == '__main__':
main()
运行多个线程
示例:thread_count.py
"五个数数的线程"
import _thread
import time
def counter(id_int, count_int):
"从0数到count_int"
for i_int in range(count_int + 1):
print('[{}] -> {}'.format(id_int, i_int))
time.sleep(1)
def main():
for i_int in range(5):
_thread.start_new_thread(counter, (i_int,5))
time.sleep(7)
if __name__ == '__main__':
main()
输出:thread_count.py
[1] -> 0
[0] -> 0
[2] -> 0
[4] -> 0
[3] -> 0
[1] -> 1
[4] -> 1[2] -> 1
[3] -> 1
[0] -> 1
[1] -> 2
[4] -> 2
[3] -> 2
[2] -> 2
[0] -> 2[1] -> 3
[4] -> 3
[2] -> 3[0] -> 3
[3] -> 3
[1] -> 4
[4] -> 4
[3] -> 4
[0] -> 4[2] -> 4
[1] -> 5
[4] -> 5
[3] -> 5
[0] -> 5
[2] -> 5
- 输出之所以看起来很不整齐,是因为所有线程共享一个
stdout ,会导致随机混杂。
同步访问共享对象和名称
在线程生命期间一直持续存在的进程中,对象和命名空间在所有派生线程间共享。因此我们需时刻小心,避免数个线程同时改变全局对象和名称(结果就是会丢失某个改变或把共享对象的状态完全损坏)。
_thread 模块提供了相关工具,这些工具基于锁的概念:想要修改一个全局对象,线程需要获得一把锁,然后进行修改,之后释放这把锁,后者再为其他线程所获取。Python确保任何时间点只有一个线程持有锁;如果在持有期间其他线程请求获得锁,那么这些请求将被一直阻塞,直到释放出锁。锁对象的分配和处理由自由且可跨平台移植的_thread 模块中的调用执行,后者自动映射到底层操作平台的线程锁定机制。
示例:thread_count_mutex.py
"五个数数的线程,并同步化对stdout的访问"
import _thread
import time
MUTEX = _thread.allocate_lock()
def counter(id_int, count_int):
"从0数到count_int"
for i_int in range(count_int + 1):
MUTEX.acquire()
print('[{}] -> {}'.format(id_int, i_int))
MUTEX.release()
time.sleep(1)
def main():
for i_id_int in range(5):
_thread.start_new_thread(counter, (i_id_int, 5))
time.sleep(7)
print('Main thread exiting ...')
if __name__ == '__main__':
main()
输出:thread_count_mutex.py
[0] -> 0
[2] -> 0
[3] -> 0
[4] -> 0
[1] -> 0
[0] -> 1
[2] -> 1
[3] -> 1
[4] -> 1
[1] -> 1
[2] -> 2
[0] -> 2
[3] -> 2
[4] -> 2
[1] -> 2
[2] -> 3
[0] -> 3
[3] -> 3
[4] -> 3
[1] -> 3
[2] -> 4
[0] -> 4
[3] -> 4
[1] -> 4
[4] -> 4
[0] -> 5
[3] -> 5
[2] -> 5
[1] -> 5
[4] -> 5
Main thread exiting ...
等待派生的线程退出
利用全局锁列表
示例:thread_count_wait_1.py
"利用mutex在父线程中探知子线程何时结束"
import _thread as thread
MUTEX = thread.allocate_lock()
NUMS_THREAD_INT = 10
LISTMUTEX = [thread.allocate_lock() for i_int in range(NUMS_THREAD_INT)]
def counter(id_int, count_int):
"数数"
for i_int in range(count_int + 1):
MUTEX.acquire()
print('[{}] -> {}'.format(id_int, i_int))
MUTEX.release()
LISTMUTEX[id_int].acquire()
def main():
for i_id_int in range(NUMS_THREAD_INT):
thread.start_new_thread(counter, (i_id_int, 100))
for i_mutex in LISTMUTEX:
while not i_mutex.locked():
pass
print('Main thread exiting...')
if __name__ == '__main__':
main()
利用bool列表
示例:thread_count_wait_2.py
"利用bool列表在父线程中探知子线程何时结束"
import _thread as thread
MUTEX = thread.allocate_lock()
NUMS_THREAD_INT = 10
LISTBOOL = [False for i_int in range(NUMS_THREAD_INT)]
def counter(id_int, count_int):
"数数"
for i_int in range(count_int + 1):
MUTEX.acquire()
print('[{}] -> {}'.format(id_int, i_int))
MUTEX.release()
LISTBOOL[id_int] = True
def main():
for i_id_int in range(NUMS_THREAD_INT):
thread.start_new_thread(counter, (i_id_int, 100))
while False in LISTBOOL:
pass
print('Main thread exiting...')
if __name__ == '__main__':
main()
输出:thread_count_wait_1.py 或 thread_count_wait_1.py
...省略...
[5] -> 93
[5] -> 94
[5] -> 95
[5] -> 96
[5] -> 97
[5] -> 98
[5] -> 99
[5] -> 100
[1] -> 43
[1] -> 44
[1] -> 45
[1] -> 46
[1] -> 47
[1] -> 48
[1] -> 49
...省略...
[2] -> 98
[2] -> 99
[2] -> 100
Main thread exiting...
编码替代方案:忙碌的循环、参数和上下文管理器
忙碌的循环
主线程在繁忙地派生进程后,通过一个循环来等待,结果是导致显著地性能下降。只需在等待循环中加入一个time.sleep 的调用,就可释放主线程的CPU以处理其他任务。
参数
将锁作为参数而非在全局作用域中引用,这会使传入线程的函数可能更加一致。
上下文管理器
线程锁的上下文管理器在with 语句这一行获取锁,然后在语句结束时释放锁。
示例替代编码
示例:thread_count_wait_3.py
"thread_count_wait_2.py的替代方案"
import _thread as thread
import time
NUMS_THREAD_INT = 5
EXIT_LISTBOOL = [False for i_int in range(NUMS_THREAD_INT)]
def counter(id_int, count_int, mutex):
"数数的子进程"
for i_count_int in range(count_int + 1):
time.sleep(0.1 / (id_int + 1))
with mutex:
print('[{}] -> {}'.format(id_int, i_count_int))
EXIT_LISTBOOL[id_int] = True
def main():
mutex = thread.allocate_lock()
for i_id_int in range(NUMS_THREAD_INT):
thread.start_new_thread(counter, (i_id_int, 5, mutex))
while not all(EXIT_LISTBOOL):
time.sleep(1)
print('Main thread exiting...')
if __name__ == '__main__':
main()
输出:thread_count_wait_3.py
[4] -> 0
[3] -> 0
[2] -> 0
[4] -> 1
[1] -> 0
[3] -> 1
[4] -> 2
[2] -> 1
[3] -> 2
[4] -> 3
[0] -> 0
[1] -> 1
[2] -> 2
[4] -> 4
[3] -> 3
[4] -> 5
[3] -> 4
[2] -> 3
[1] -> 2
[3] -> 5
[2] -> 4
[0] -> 1
[1] -> 3
[2] -> 5
[1] -> 4
[0] -> 2
[1] -> 5
[0] -> 3
[0] -> 4
[0] -> 5
Main thread exiting...
- 在脚本运行时,每个线程都有不同的休眠时长,可以让它们更独立地运行。
threading 模块
threading 模块是基于对象和类的较高层面的接口。
示例:thread_classes.py
"使用threading模块做一个线程计数器"
import threading
class CountThread(threading.Thread):
"线程计数器"
def __init__(self, id_int, count_int, mutex):
self.id_int = id_int
self.count_int = count_int
self.mutex = mutex
threading.Thread.__init__(self)
def run(self):
for i_count_int in range(self.count_int + 1):
with self.mutex:
print('[{}] -> {}'.format(self.id_int, i_count_int))
def main():
mutex = threading.Lock()
threads_listCountThread = []
for i_id_int in range(10):
thread_CountThread = CountThread(i_id_int, 100, mutex)
thread_CountThread.start()
threads_listCountThread.append(thread_CountThread)
for i_thread_CountThread in threads_listCountThread:
i_thread_CountThread.join()
print('Main thread exiting...')
if __name__ == '__main__':
main()
输出:thread_classes.py
...省略...
[4] -> 100
[6] -> 90
[6] -> 91
[6] -> 92
[6] -> 93
[6] -> 94
[6] -> 95
[6] -> 96
[6] -> 97
[6] -> 98
[6] -> 99
[6] -> 100
Main thread exiting...
- 用此方式使用
threading 模块,大部分时间在定制类。 start 方法:在Thread 类框架下新的线程中执行run 方法。run 方法:提供线程逻辑业务。join 方法:等待线程退出。threading.Lock() :与_thread.allocate_lock() 相同
用threading 模块编码线程的其他方法
Thread 类还可以用来其实简单的函数或其他可调用函数,完全不用编码子类。如果没有重新定义,Thread 类的默认run 方法直接调用传给构造器的target 参数的调用对象,其参数为任意传给arg (默认是(),代表无)的参数。
示例:four_threads.py
"编写线程的4种方法"
import threading
import _thread as thread
import time
def power(in_int, stdout_mutex):
stdout_mutex.acquire()
print('func |', in_int ** 32)
stdout_mutex.release()
class PowerThread(threading.Thread):
"带有状态的子类"
def __init__(self, in_int, stdout_mutex):
self.in_int = in_int
self.stdout_mutex = stdout_mutex
threading.Thread.__init__(self)
def run(self):
self.stdout_mutex.acquire()
print('child class |', self.in_int ** 32)
self.stdout_mutex.release()
def main():
stdout_mutex = threading.Lock()
listThread = []
thread.start_new_thread(power, (2, stdout_mutex))
my_PowerThread = PowerThread(2, stdout_mutex)
listThread.append(my_PowerThread)
my_PowerThread.start()
my_1_Thread = threading.Thread(
target=(lambda: power(2, stdout_mutex))
)
listThread.append(my_1_Thread)
my_1_Thread.start()
my_2_Thread = threading.Thread(
target=power, args=(2, stdout_mutex)
)
listThread.append(my_2_Thread)
my_2_Thread.start()
for i_Thread in listThread:
i_Thread.join()
print('Main thread exiting...')
if __name__ == "__main__":
main()
输出:four_threads.py
func | 4294967296
child class | 4294967296
func | 4294967296
func | 4294967296
Main thread exiting...
如果你的线程要求因线程而异的状态或者可以通过OOP享受其带来的种种方便,那么基于类的线程可能是个更好的选择。
示例:thread_of_class.py
"基于类的线程"
import threading
import _thread as thread
import time
class Power:
"带有状态的非线程类,OOP方式"
def __init__(self, in_int, stdout_mutex):
self.in_int = in_int
self.stdout_mutex = stdout_mutex
def act(self):
self.stdout_mutex.acquire()
print('Power |', self.in_int ** 32)
self.stdout_mutex.release()
def act(in_int, stdout_mutex):
"利用嵌套作用域保留状态"
def power():
stdout_mutex.acquire()
print('act |', 'power |', in_int ** 32)
stdout_mutex.release()
return power
def main():
stdout_mutex = threading.Lock()
obj_Power = Power(2, stdout_mutex)
my_1_Thread = threading.Thread(target=obj_Power.act)
my_1_Thread.start()
my_2_Thread = threading.Thread(target=act(2, stdout_mutex))
my_2_Thread.start()
thread.start_new_thread(obj_Power.act, ())
thread.start_new_thread(act(2, stdout_mutex), ())
time.sleep(0.1)
print('main | Main thread exiting...')
if __name__ == '__main__':
main()
输出:thread_of_class.py
Power | 4294967296
act | power | 4294967296
act | power | 4294967296
Power | 4294967296
main | Main thread exiting...
再次探讨同步获取共享对象和名称
线程需要通过它们对任何可能在同一进程中的线程共享的项(对象和命名空间)的更改进行同步化。根据给定的程序的目的,这些对象可能包括:
- 内存中的可变对象(传入或以其他方式进行引用的对象,其生命周期跨越线程的持续时长)。
- 全局作用域中的名称(线程函数和类以外的可更改变量)。
- 模块中的内容(每个模块仅在系统的模块表格中拥有一份共享的副本)。
如果有可能进行并发更新,那么即使是简单的全局变量也需要协调工作。
示例:thread_add_random.py
"在windows 7下每次运行打印不同的结果"
import threading
import time
COUNT_INT = 0
def adder():
"间隔地给COUNT_INT加1"
global COUNT_INT
COUNT_INT += 1
time.sleep(0.5)
COUNT_INT += 1
def main():
global COUNT_INT
listThread = []
for i_int in range(100):
Thread = threading.Thread(target=adder)
listThread.append(Thread)
Thread.start()
for i_Thread in listThread:
i_Thread.join()
print(COUNT_INT)
if __name__ == "__main__":
main()
在Windows 7下用Python 3.1运行时,每次运行都会产生不同的结果。这个现象的产生是因为线程在时间上的交叠是随机的:一个线程更新全局变量时,可能使用了其他线程正进行操作的结果,这些结果往往是不完全的。
我们需要锁来对这些更新做同步化。
示例:thread_add_syn_change.py
"每次都打印200,因为共享资源的访问已经同步化"
import threading
import time
COUNT_INT = 0
def adder(count_int_mutex):
"间隔地给COUNT_INT加1"
global COUNT_INT
with count_int_mutex:
COUNT_INT += 1
time.sleep(0.5)
with count_int_mutex:
COUNT_INT += 1
def main():
global COUNT_INT
count_int_mutex = threading.Lock()
listThread = []
for i_int in range(100):
Thread = threading.Thread(target=adder, args=(count_int_mutex,))
listThread.append(Thread)
Thread.start()
for i_Thread in listThread:
i_Thread.join()
print(COUNT_INT)
if __name__ == "__main__":
main()
线程计时器
threading 模块中有一个线程计时器类Timer ,可以用来在某个计时器到期失效后运行另一个函数。
Timer(M.N, somefunc).start()
Timer 对象的start 方法用于设置计时器,而cancel 方法用于取消规划好的事件。
>>> from threading import Timer
>>>
>>> my_Timer = Timer(5.5, lambda: print('Spam!'))
>>> my_Timer.start()
>>> Spam!
>>> my_Timer.start()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/alone/anaconda3/lib/python3.7/threading.py", line 843, in start
raise RuntimeError("threads can only be started once")
RuntimeError: threads can only be started once
queue 模块
实际工作中带有线程的程序通常有一系列生产者和消费者线程组成,它们通过将数据存入一个共享的队列或者从中取出来进行通信。只要队列同步化对自身的访问,线程间的交互将自动同步。
Python的queue 模块可实现存储设备的功能。该模块提供一个标准的队列数据结构——一个先进先出(first-in first-out,FIFO)的Python对象的列表,可包含任意类型的Python对象,新添的对象附加到列表的一端而自另一端开始移除。队列对象自动由线程锁获取和释放操作控制。
下面的示例派生两个消费者线程来关注在共享队列中出现的数据,以及4个生产者线程周期性地在休眠期隔期后将数据放入队列(休眠时间各不相同,模拟真实情况下的长时任务)。
示例:queue_test.py
"生产者和消费者线程与共享队列进行通信"
import threading
import time
import queue
NUM_CONSUMERS_INT = 2
NUM_PRODUCERS_INT = 4
NUM_MESSAGES_INT = 4
STDOUT_MUTEX = threading.Lock()
DATA_QUEUE = queue.Queue()
EXIT_LISTBOOL = [False for i_int in range(NUM_PRODUCERS_INT)]
def produser(id_int):
"生产者线程"
for i_message_num_int in range(NUM_MESSAGES_INT):
time.sleep(id_int + 1)
DATA_QUEUE.put(
'[producer id={}, count={}]'.format(id_int, i_message_num_int)
)
EXIT_LISTBOOL[id_int] = True
def consumer(id_int):
"消费者线程"
while not all(EXIT_LISTBOOL):
time.sleep(0.1314)
try:
data_str = DATA_QUEUE.get(block=False)
except queue.Empty:
pass
else:
with STDOUT_MUTEX:
print('consumer', id_int, '| got ->', data_str)
def main():
listThread = []
for i_id_int in range(NUM_CONSUMERS_INT):
Thread = threading.Thread(target=consumer, args=(i_id_int,))
listThread.append(Thread)
Thread.start()
for i_id_int in range(NUM_PRODUCERS_INT):
Thread = threading.Thread(target=produser, args=(i_id_int,))
listThread.append(Thread)
Thread.start()
for i_Thread in listThread:
i_Thread.join()
print('Main thread exiting...')
if __name__ == '__main__':
main()
程序和子线程一同退出
要想实现同时退出,除了示例的方法,还可手动设置消费者线程对象的daemon 标识符,将其设置为守护进程。
示例:queue_test_2.py
"queue_test.py的替代版本"
import threading
import time
import queue
NUM_CONSUMERS_INT = 2
NUM_PRODUCERS_INT = 4
NUM_MESSAGES_INT = 4
STDOUT_MUTEX = threading.Lock()
DATA_QUEUE = queue.Queue()
def produser(id_int):
"生产者线程"
for i_message_num_int in range(NUM_MESSAGES_INT):
time.sleep(id_int + 1)
DATA_QUEUE.put(
'[producer id={}, count={}]'.format(id_int, i_message_num_int)
)
def consumer(id_int):
"消费者线程"
while True:
time.sleep(0.1314)
try:
data_str = DATA_QUEUE.get(block=False)
except queue.Empty:
pass
else:
with STDOUT_MUTEX:
print('consumer', id_int, '| got ->', data_str)
def main():
listThread = []
for i_id_int in range(NUM_CONSUMERS_INT):
Thread = threading.Thread(
target=consumer, args=(i_id_int,), daemon=True
)
Thread.start()
for i_id_int in range(NUM_PRODUCERS_INT):
Thread = threading.Thread(target=produser, args=(i_id_int,))
listThread.append(Thread)
Thread.start()
for i_Thread in listThread:
i_Thread.join()
time.sleep(0.1)
print('Main thread exiting...')
if __name__ == '__main__':
main()
运行脚本
输出:queue_test.py
consumer 1 | got -> [producer id=0, count=0]
consumer 1 | got -> [producer id=0, count=1]
consumer 0 | got -> [producer id=1, count=0]
consumer 1 | got -> [producer id=0, count=2]
consumer 0 | got -> [producer id=2, count=0]
consumer 0 | got -> [producer id=0, count=3]
consumer 1 | got -> [producer id=1, count=1]
consumer 0 | got -> [producer id=3, count=0]
consumer 1 | got -> [producer id=1, count=2]
consumer 0 | got -> [producer id=2, count=1]
consumer 1 | got -> [producer id=1, count=3]
consumer 0 | got -> [producer id=3, count=1]
consumer 0 | got -> [producer id=2, count=2]
consumer 1 | got -> [producer id=2, count=3]
consumer 0 | got -> [producer id=3, count=2]
consumer 1 | got -> [producer id=3, count=3]
Main thread exiting...
输出:queue_test_2.py
consumer 0 | got -> [producer id=0, count=0]
consumer 1 | got -> [producer id=1, count=0]
consumer 0 | got -> [producer id=0, count=1]
consumer 1 | got -> [producer id=0, count=2]
consumer 0 | got -> [producer id=2, count=0]
consumer 1 | got -> [producer id=1, count=1]
consumer 0 | got -> [producer id=3, count=0]
consumer 1 | got -> [producer id=0, count=3]
consumer 0 | got -> [producer id=2, count=1]
consumer 1 | got -> [producer id=1, count=2]
consumer 0 | got -> [producer id=1, count=3]
consumer 1 | got -> [producer id=3, count=1]
consumer 0 | got -> [producer id=2, count=2]
consumer 1 | got -> [producer id=2, count=3]
consumer 0 | got -> [producer id=3, count=2]
consumer 1 | got -> [producer id=3, count=3]
Main thread exiting...
😃 学完博客后,是不是有所启发呢?如果对此还有疑问,欢迎在评论区留言哦。 如果还想了解更多的信息,欢迎大佬们关注我哦,也可以查看我的个人博客网站BeacherHou
|