Python 线程threading
线程比进程更轻量化,系统开销一般也更低,所以大家更倾向于用多线程的方式处理并发的情况。
Python 提供多线程编程的方式。
本文基于 Python3 讲解,Python 实现多线程编程需要借助于 threading 模块。
Python多线程适用于I/O密集型
GIL的全称是Global Interpreter Lock(全局解释器锁),为了数据安全,GIL保证同一时间只能有一个线程拿到数据。所以,在python中,同时只能执行一个线程。
而IO密集型,多线程能够有效提升效率( 单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率 )。所以python多线程对IO密集型代码比较友好。
而CPU密集型( 各种循环处理、计算等等 ),由于计算工作多,计时器很快就会达到阈值,然后触发GIL的释放与再竞争( 多个线程来回切换当然是需要消耗资源的 ),所以python多线程对CPU密集型代码并不友好。
Python多线程的工作过程
Python在使用多线程的时候,调用的是c语言的原生线程。
- 拿到公共数据
- 申请GIL(全局解释器锁)
- python解释器调用os原生线程
- os操作cpu执行运算
- 当该线程执行时间到后,无论运算是否已经执行完,GIL都被要求释放
- 由其他进程重复上面的过程
- 等其他进程执行完后,又会切换到之前的线程(从他记录的上下文继续执行),整个过程是每个线程执行自己的运算,当执行时间到就进行切换(context switch)。
笔记:有些没看懂?
threading 模块中最核心的内容是 Thread 这个类。
值得注意的是,程序运行时默认就是在主线程上
创建 Thread 对象有 2 种手段。
- 直接创建 Thread ,将一个 callable 对象从类的构造器传递进去,这个 callable 就是回调函数,用来处理任务。
- 编写一个自定义类继承 Thread,然后复写 run() 方法,在 run() 方法中编写任务处理代码,然后创建这个 Thread 的子类。
Thread 的构造方法中,最重要的参数是 target,所以我们需要将一个 callable 对象赋值给它,线程才能正常运行。
如果要让一个 Thread 对象启动,调用它的 start() 方法就好了。
构造函数
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
调用这个构造函数时,必需带有关键字参数。参数如下:
-
group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。 -
target 是用于 run() 方法调用的可调用对象(一个函数)。默认是 None,表示不需要调用任何方法。 -
name 是线程名称。默认情况下,由 “Thread-N” 格式构成一个唯一的名称,其中 N 是小的十进制数。多个线程可以赋予相同的名称。 -
args 是用于调用目标函数的参数元组。默认是 ()。 -
kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。 -
daemon 参数如果不是 None,将显式地设置该线程是否为守护模式。 如果是 None (默认值),线程将继承当前线程的守护模式属性。 3.3 版及以上才具有该属性。 注意:一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。 初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。 当没有存活的非守护线程时,整个Python程序才会退出。 -
如果子类型**重载了构造函数,**它一定要确保在做任何事前,先发起调用基类构造器(Thread.init())。
名词解释:守护模式
有一种线程,它是在后台运行的,它的任务是为其他线程提供服务,这种线程被称为“后台线程(Daemon Thread)”,又称为“守护线程”或“精灵线程”。Python 解释器的垃圾回收线程就是典型的后台线程。 后台线程有一个特征,如果所有的前台线程都死亡了,那么后台线程会自动死亡。
线程函数
start()
开始线程活动。
它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。
如果同一个线程对象中调用这个方法的次数大于一次,会抛出 RuntimeError 。
join(timeout=None)
等待,直到线程终结。
这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结。
- 不管是正常终结
- 还是抛出未处理异常
- 或者直到发生超时,超时选项是可选的。
当 timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。 因为 join() 总是返回 None ,所以你一定要在 join() 后调用is_alive()才能判断是否发生超时 ,如果线程仍然存活,则 join() 超时。
当 timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。
一个线程可以被 join() 很多次。
- 如果尝试加入当前线程会导致死锁, join() 会引起 RuntimeError 异常。
- 如果尝试 join() 一个尚未开始的线程,也会抛出相同的异常。
创建线程
import threading
import time
def run(n):
print("task", n)
time.sleep(1)
print('2s')
time.sleep(1)
print('3s')
if __name__ == '__main__':
th = threading.Thread(target=run,name="thread_1" args=("thread 1",), daemon=True)
th.setDaemon(True)
th.start()
th.join()
print("end")
Thread 的名字
每一个 Thread 都有一个 name 的属性,代表的就是线程的名字,这个可以在构造方法中赋值。
如果在构造方法中没有个 name 赋值的话,默认就是 “Thread-N” 的形式,N 是数字。
import threading
import time
def test():
for i in range(5):
print(threading.current_thread().name+' test ',i)
time.sleep(1)
thread = threading.Thread(target=test)
thread.start()
for i in range(5):
print(threading.current_thread().name+' main ', i)
time.sleep(1)
通过 thread.current_thread() 方法可以返回线程本身,然后就可以访问它的 name 属性。
上面代码运行结果如下:
Thread-1 test 0
MainThread main 0
Thread-1 test 1
MainThread main 1
Thread-1 test 2
MainThread main 2
Thread-1 test 3
MainThread main 3
Thread-1 test 4
MainThread main 4
Thread 的生命周期
- 创建对象时,代表 Thread 内部被初始化。
- 调用 start() 方法后,thread 会开始运行。(实际调用了C语言线程的run()方法)
- thread 代码正常运行结束或者是遇到异常,线程会终止。
可以通过 Thread 的 is_alive() 方法查询线程是否还在运行。
值得注意的是,is_alive() 返回 True 的情况是 Thread 对象被正常初始化,start() 方法被调用,然后线程的代码还在正常运行。
import threading
import time
def test():
for i in range(5):
print(threading.current_thread().name+' test ',i)
time.sleep(0.5)
thread = threading.Thread(target=test,name='TestThread')
thread.start()
for i in range(5):
print(threading.current_thread().name+' main ', i)
print(thread.name+' is alive ', thread.isAlive())
time.sleep(1)
在上面的代码中,我让 TestThread 比 MainThread 早一点结束,代码运行结果如下。
TestThread test 0
MainThread main 0
TestThread is alive True
TestThread test 1
MainThread main 1
TestThread is alive True
TestThread test 2
TestThread test 3
MainThread main 2
TestThread is alive True
TestThread test 4
MainThread main 3
TestThread is alive False
MainThread main 4
TestThread is alive False
线程加锁
由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据(笔记:Python基础数据类型、列表、元组、字典都是线程安全的,因此不会导致程序崩溃,但会导致数据出现未知值,即脏数据),
所以出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,像下面的代码,当需要独占某一个资源时,任何一个锁都可以锁定这个资源,就好比你用不同的锁都可以把这个相同的门锁住一样。
由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,并且没有很好地保护该对象,会造成程序结果的不可预期,我们因此也称为“线程不安全”。 为了防止上面情况的发生,就出现了锁。
5种线程锁
我在5种Python线程锁中有详细讲述。
join()函数-线程阻塞
上面代码两个线程是同时运行的,但如果让一个先运行,一个后运行,怎么做呢?
调用一个 Thread 的 join() 方法,可以阻塞自身所在的线程。
概念补充:
- 在Python多线程编程中,join方法的作用是线程同步
- 守护线程,是为守护别人而存在,当设置为守护线程后,被守护的主线程不存在后,守护线程也自然不存在
以下分5种不同的形式解释join在多线程编程中的用处
第一种:Python多线程默认情况(非守护线程)
Python多线程的默认情况(设置线程setDaemon(False)),主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束
笔记:setDaemon(False) 即 该线程被设置为非守护线程;主程序退出后,子线程不会自动退出。
import threading, time
def doWaiting1():
print('start waiting1: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(3)
print("线程1奉命报道")
print('stop waiting1: ' + time.strftime('%H:%M:%S') + "\n")
def doWaiting2():
print('start waiting2: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(8)
print("线程2奉命报道")
print('stop waiting2: ', time.strftime('%H:%M:%S') + "\n")
tsk = []
thread1 = threading.Thread(target = doWaiting1)
thread1.start()
tsk.append(thread1)
thread2 = threading.Thread(target = doWaiting2)
thread2.start()
tsk.append(thread2)
print('start join: ' + time.strftime('%H:%M:%S') )
print('end join: ' + time.strftime('%H:%M:%S') )
运行结果
start waiting1: 20:03:30
start waiting2: 20:03:30
start join: 20:03:30
end join: 20:03:30
线程1奉命报道
stop waiting1: 20:03:33
线程2奉命报道
stop waiting2: 20:03:38
结论:
- 计时程序属于主线程,整个主线程在开启线程1和线程2后,进入计时模块,主线程结束
- 主线程结束,但并没有影响线程1和线程2的运行,故后面线程1和线程2仍然跑来报道,至此整个程序才完全结束
第二种:守护线程
开启线程的**setDaemon(True),**设置子线程为守护线程,实现主程序结束,子程序立马全部结束功能
import threading, time
def doWaiting1():
print('start waiting1: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(3)
print("线程1奉命报道")
print('stop waiting1: ' + time.strftime('%H:%M:%S') + "\n")
def doWaiting2():
print('start waiting2: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(8)
print("线程2奉命报道")
print('stop waiting2: ', time.strftime('%H:%M:%S') + "\n")
tsk = []
thread1 = threading.Thread(target = doWaiting1)
thread1.setDaemon(True)
thread1.start()
tsk.append(thread1)
thread2 = threading.Thread(target = doWaiting2)
thread2.setDaemon(True)
thread2.start()
tsk.append(thread2)
print('start join: ' + time.strftime('%H:%M:%S') )
print('end join: ' + time.strftime('%H:%M:%S') )
运行结果:
start waiting1: 20:10:04
start waiting2: 20:10:04
start join: 20:10:04
end join: 20:10:04
结论:
- 主线程结束后,无论子线程1,2是否运行完成,都结束不再往下继续运行
第三种:加入join方法设置同步(非守护线程,且join不设置超时)
非守护线程,主程序将一直等待子程序全部运行完成才结束
import threading, time
def doWaiting1():
print('start waiting1: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(3)
print("线程1奉命报道")
print('stop waiting1: ' + time.strftime('%H:%M:%S') + "\n")
def doWaiting2():
print('start waiting2: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(8)
print("线程2奉命报道")
print('stop waiting2: ', time.strftime('%H:%M:%S') + "\n")
tsk = []
thread1 = threading.Thread(target = doWaiting1)
thread1.start()
tsk.append(thread1)
thread2 = threading.Thread(target = doWaiting2)
thread2.start()
tsk.append(thread2)
print('start join: ' + time.strftime('%H:%M:%S') )
for t in tsk:
print('%s线程到了'%t)
t.join()
print('end join: ' + time.strftime('%H:%M:%S') )
运行结果:
start waiting1: 20:14:35
start waiting2: 20:14:35
start join: 20:14:35
<Thread(Thread-1, started 19648)>线程到了
线程1奉命报道
stop waiting1: 20:14:38
<Thread(Thread-2, started 24056)>线程到了
线程2奉命报道
stop waiting2: 20:14:43
end join: 20:14:43
结论:
- 使用join函数,主线程将被阻塞(即:主线程,指定是创建子线程的线程),一直等待被使用了join方法的线程运行完成
- start join 是在35秒,stop waiting1在38秒,刚好sleep了3秒,stop waiting2是43秒,刚好sleep了8秒,这也说明,线程1和2是基本同时运行的,但由于执行所消耗的时间不一致,所以阻塞所用的时间也是不一样的,最终end join时间是最后线程运行完,整个程序就中止在43秒
- 将所有的线程放入一个列表,通过循环对列表中的所有线程使用join方法判断,也是为了保证全部子线程都能全部运行完成,主线程才退出
第四种:加入join方法设置同步(非守护线程,join设置超时)
给join设置timeout数值,判断等待多后子线程还没有完成,则主线程不再等待
笔记:join设置超时后,判断依据为 子线程执行完毕 | 超时 (逻辑或的关系),即两个条件谁先为真,就向下执行。
import threading, time
def doWaiting1():
print('start waiting1: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(2)
print("线程1奉命报道")
print('stop waiting1: ' + time.strftime('%H:%M:%S') + "\n")
def doWaiting2():
print('start waiting2: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(8)
print("线程2奉命报道")
print('stop waiting2: ', time.strftime('%H:%M:%S') + "\n")
tsk = []
thread1 = threading.Thread(target = doWaiting1)
thread1.start()
tsk.append(thread1)
thread2 = threading.Thread(target = doWaiting2)
thread2.start()
tsk.append(thread2)
print('start join: ' + time.strftime('%H:%M:%S') )
for t in tsk:
print("开始:"+time.strftime('%H:%M:%S'))
print('%s线程到了'%t)
t.join(5)
print("结束:" + time.strftime('%H:%M:%S'))
print('end join: ' + time.strftime('%H:%M:%S') )
运行结果:
start waiting1: 21:14:25
start waiting2: 21:14:25
start join: 21:14:25
开始:21:14:25
<Thread(Thread-1, started 22348)>线程到了
线程1奉命报道
stop waiting1: 21:14:27
结束:21:14:27
开始:21:14:27
<Thread(Thread-2, started 13164)>线程到了
结束:21:14:32
end join: 21:14:32
线程2奉命报道
stop waiting2: 21:14:33
结论:
- 给join设置等待时间后,超过了等待时间后,主线程终止,但不影响子线程继续运行,等子线程全部运行完毕整个程序终止
- 所有线程可能出现的最大等待时间 timeout_total ≤ timeout * 线程数量
- 虽然timeout设置的是5s,但是线程1只需要2s,所以循环从开始到结束,只需消耗2s(21:14:27 - 21:14:25),到此循环就进入第二次,第二次等待仍可以分配5s(21:14:32 - 21:14:27),所以两次总共的等待是时间2+5=7s,但是线程2运行所需要时间是8s,而且8s是从21:14:25开始的,结束时间是21:14:33,因为join等待时间完了主程序结束了,但不影响线程2继续运行,所以在end join后,线程2仍然输出了报道结果(是因为没有开启守护线程)
- 个别文章解释join这个时间为:主线程会等待多个线程的timeout累加和,这个说法不准确,由3的推理可以得出,并非会一定等待“线程数* timeout”这么多时间,而是≤“线程数*timeout”,
第五种:加入join方法设置同步(守护线程,join设置超时)
超时且未处理完毕的子线程将被直接终止(符合守护线程的特性)
import threading, time
def doWaiting1():
print('start waiting1: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(2)
print("线程1奉命报道")
print('stop waiting1: ' + time.strftime('%H:%M:%S') + "\n")
def doWaiting2():
print('start waiting2: ' + time.strftime('%H:%M:%S') + "\n")
time.sleep(8)
print("线程2奉命报道")
print('stop waiting2: ', time.strftime('%H:%M:%S') + "\n")
tsk = []
thread1 = threading.Thread(target = doWaiting1)
thread1.setDaemon(True)
thread1.start()
tsk.append(thread1)
thread2 = threading.Thread(target = doWaiting2)
thread2.setDaemon(True)
thread2.start()
tsk.append(thread2)
print('start join: ' + time.strftime('%H:%M:%S') )
for t in tsk:
print("开始:"+time.strftime('%H:%M:%S'))
print('%s线程到了'%t)
t.join(5)
print("结束:" + time.strftime('%H:%M:%S'))
print('end join: ' + time.strftime('%H:%M:%S') )
运行结果:
start waiting1: 21:24:14
start waiting2: 21:24:14
start join: 21:24:14
开始:21:24:14
<Thread(Thread-1, started daemon 9060)>线程到了
线程1奉命报道
stop waiting1: 21:24:16
结束:21:24:16
开始:21:24:16
<Thread(Thread-2, started daemon 13912)>线程到了
结束:21:24:21
end join: 21:24:21
结论:
- 相比第四种,超时后主线程运行到end join则结束了,子线程2已经被终止停止运行
自定义类继承 Thread
前面讲过,直接初始化一个 Thread,然后,现在还有一种方式就是自定义一个 Thread 的子类,然后复写它的 run() 方法。
import threading
import time
class TestThread(threading.Thread):
def __init__(self,name=None):
threading.Thread.__init__(self,name=name)
def run(self):
for i in range(5):
print(threading.current_thread().name + ' test ', i)
time.sleep(1)
thread = TestThread(name='TestThread')
thread.start()
for i in range(5):
print(threading.current_thread().name+' main ', i)
print(thread.name+' is alive ', thread.isAlive())
time.sleep(1)
上面的代码,我们自定义了 TestThread 这个类,然后继承了 threading.Thread。
只有在 run() 方法中处理逻辑。最终代码运行结果如下:
TestThread test 0
MainThread main 0
TestThread is alive True
TestThread test 1
MainThread main 1
TestThread is alive True
TestThread test 2
MainThread main 2
TestThread is alive True
MainThread main 3
TestThread is alive True
TestThread test 3
MainThread main 4
TestThread test 4
TestThread is alive True
这与之前的效果并无差异,但我还是推荐用这种方法,毕竟面向对象编程嘛。
自此,Python 多线程编码技术就大致介绍完毕,大家可以进行实际代码编写了。
但是,多线程编程的难点在于多个线程之间共享数据的同步,这是非常容易出错的地方,我将分别编写相应的博文去介绍一些高级的技术点。
|