创建多线程
继承Thread类创建派生类,并重写__init__和run方法,实现自定义线程对象类:
import threading
import time
class myThread(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)
self.name = threadname
def run(self):
print("I'm", self.name)
time.sleep(3)
if __name__ == '__main__':
for i in range(3):
t = myThread("t%s" % i)
t.start()
可以直接使用Thread类创建线程,为其构造函数传递一个可调用对象即可:
import threading
def func1(num):
print('num:', num)
if __name__ == '__main__':
for i in range(3):
t = threading.Thread(target=func1, args=(i,))
t.start()
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。 target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。 name 是线程名称。 在默认情况下,会以 “Thread-N” 的形式构造唯一名称,其中 N 为一个较小的十进制数值,或是 “Thread-N (target)” 的形式,其中 “target” 为 target.name,如果指定了 target 参数的话。 args 是用于调用目标函数的参数元组。默认是 ()。 kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。 daemon如果不是 None,daemon 参数将显式地设置该线程是否为守护模式。 如果是 None (默认值),线程将继承当前线程的守护模式属性。
线程同步
Lock对象
原始锁,只要一个线程获取该锁,相关其他要获取该锁的线程都会阻塞,直到锁被释放。任何线程都可以释放它。
一个锁有两种状态:locked和unlocked。如果锁处于unclocked状态,acquire()方法将其修改为locked并立即返回;如果锁已处于locked状态,则阻塞当前线程并等待其他线程释放锁然后将其修改为locked并立即返回,或等待一定的时间后返回但不修改锁的状态。
定义
import threading
lock = threading.Lock()
acquire(blocking=True, timeout=-1):
获取锁,阻塞或非阻塞
blocking: 默认为True,阻塞直到锁被解锁。如果设置为False,则不阻塞,立刻返回
timeout: 最多阻塞timeout指定的秒数
release():
释放锁。将锁状态由locked修改为unlocked并立即返回,如果锁状态本来已经是unlocked,调用该方法将会抛出异常。
加锁示例
import threading
x = 0
def func(num):
global x
x = x + num
x = x - num
def run_func(num):
for i in range(2000000):
lock.acquire()
func(num)
lock.release()
if __name__ == '__main__':
lock = threading.Lock()
t1 = threading.Thread(target=run_func, args=(5,))
t2 = threading.Thread(target=run_func, args=(6,))
t1.start()
t2.start()
t1.join()
t2.join()
print(x)
"""
output:
0
"""
不加锁示例
class myThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global x
for i in range(3):
x = x + i
time.sleep(1)
print(x)
if __name__ == '__main__':
tl = []
for i in range(10):
t = myThread()
tl.append(t)
x = 0
for i in tl:
i.start()
"""
output:
29
"""
此处是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱。
两个线程同时一存一取就可能导致全局变量的结果不正确。
死锁
什么是死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信造成的一种阻塞的现象,若无外力左右,他们都将无法推进下去。
死锁出现的两种情况
(1) 当一个进程或者一个线程一直调用或者占用同一锁Lock而不释放资源而导致其他进程/线程无法获取锁,就会出现死锁的状况,会一直阻塞在aquire()处
(2) 当有两个进程同时想获取两个锁的时候(多个进程想要获取多个锁),由于两者都是竞争关系,谁也不让谁,谁快谁得手,但计算机中这种竞争关系很微妙,时间的差异性很小,于是就会出现两者都阻塞在同一个地方,都无法同时获得两个锁或者获取对方已经获取的但还没释放的锁。
为了解决死锁的问题,引入了RLock()对象
RLock对象
可重入锁对象,一旦一个线程获得了可重入锁,同一个线程就可以再次获得它而不会阻塞,即RLock允许在同一线程中被多次acquire,而Lock却不允许。线程每次获得它时必须释放它一次。
定义
import threading
rlock = threading.RLock()
acquire(blocking=True, timeout=-1):
获取锁,阻塞或非阻塞
blocking: 默认为True,阻塞直到锁被解锁。如果设置为False,则不阻塞,立刻返回
timeout: 最多阻塞timeout指定的秒数
release():
释放锁。将锁状态由locked修改为unlocked并立即返回,如果锁状态本来已经是unlocked,调用该方法将会抛出异常。
注意
如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁。
锁的获取是谁快谁得手,一旦一个进程请求了一个RLock,那么其他进程就无法再请求了,直到这个进程释放了RLock内部的所有锁。
示例
import threading
import time
rlocka = rlockb = threading.RLock()
def fn1():
rlocka.acquire()
time.sleep(1)
print('进程1获得A锁')
rlockb.acquire()
time.sleep(1)
print('进程1获得B锁')
rlocka.release()
print('进程1释放A锁')
rlockb.release()
print('进程1释放B锁')
def fn2():
rlocka.acquire()
time.sleep(1)
print('进程2获得A锁')
rlockb.acquire()
time.sleep(1)
print('进程2获得B锁')
rlocka.release()
print('进程2释放A锁')
rlockb.release()
print('进程2释放B锁')
t1 = threading.Thread(target=fn1)
t2 = threading.Thread(target=fn2)
t1.start()
t2.start()
t1.join()
t2.join()
"""
output:
进程1获得A锁
进程1获得B锁
进程1释放A锁
进程1释放B锁
进程2获得A锁
进程2获得B锁
进程2释放A锁
进程2释放B锁
"""
Condition对象
原理
它在内部维护了一个锁对象(默认是RLock),可以在创建Condition对象的时候把锁对象作为参数传入,其实它只是简单的调用内部锁对象的对应的方法而已。
Condition对象支持上下文管理语句with
acquire(blocking=True, timeout=-1):
获取锁,阻塞或非阻塞
blocking: 默认为True,阻塞直到锁被解锁。如果设置为False,则不阻塞,立刻返回
timeout: 最多阻塞timeout指定的秒数
release():
释放锁。将锁状态由locked修改为unlocked并立即返回,如果锁状态本来已经是unlocked,调用该方法将会抛出异常
wait(timeout=None):
释放内部所占用的锁,同时线程被挂起,直到接收到通知被唤醒或超时(timeout)。当线程被唤醒并重新占用锁的时候,程序才会继续执行下去
wait_for(predicate, timeout=None):
阻塞当前线程直到超时或者指定条件得到满足
notify(n=1):
唤醒被挂起的最多n个线程,但不会释放所占用的锁
notify_all():
唤醒被挂起的所有线程,但不会释放所占用的锁
生产者和消费者问题
有界缓冲区的生产者消费者问题
两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,将信息放入缓冲区;另一个是消费者,从缓冲区中取出信息。
无界缓冲区的生产者消费者问题
示例
class Producer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)
def run(self):
global x
while True:
time.sleep(1)
con.acquire()
if len(x) == 5:
print('Producer is waiting......')
con.wait()
else:
r = random.randint(1, 1000)
print('Produced:', r)
x.append(r)
con.notify()
con.release()
class Consumer(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name=threadname)
def run(self):
global x
while True:
time.sleep(3)
con.acquire()
if not x:
print('Consumer is waiting......')
con.wait()
else:
print('Consumed:', x.pop(0))
con.notify()
con.release()
if __name__ == '__main__':
con = threading.Condition()
x = []
p = Producer('Producer')
c = Consumer('Consumer')
p.start()
c.start()
Semaphore对象
定义
实现了信号量对象,可用于控制获取资源的线程数量。信号量控制一个内部计时器,调用acquire()方法时该计时器减1,调用release()方法时该计时器加1,适用于需要控制特定资源的并发访问线程数量的场合。
调用acquire()方法时,如果计数器已经为0则阻塞当前线程,直到有其他线程调用了release()方法,所以计数器的值永远不会小于0。
acquire(blocking=True, timeout=-1):
获取锁,阻塞或非阻塞
blocking: 默认为True,阻塞直到锁被解锁。如果设置为False,则不阻塞,立刻返回
timeout: 最多阻塞timeout指定的秒数
release():
释放锁。将锁状态由locked修改为unlocked并立即返回,如果锁状态本来已经是unlocked,调用该方法将会抛出异常
BoundedSemaphore对象
一个有界信号量会确保它当前的值不超过它的初始值。如果超过,则引发ValueError(如release太多次)
import threading
import time
def worker(value):
sema.acquire()
print('Thread: %s is running' % value)
time.sleep(2)
sema.release()
if __name__ == '__main__':
sema = threading.BoundedSemaphore(2)
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
t.start()
"""
每两个线程输出后会延时2秒
output:
Thread: 0 is running
Thread: 1 is running
Thread: 2 is running
Thread: 3 is running
Thread: 4 is running
Thread: 5 is running
Thread: 6 is running
Thread: 7 is running
Thread: 8 is running
Thread: 9 is running
"""
Event对象
定义
一个线程发出事件信号,其他线程等待它。
事件对象管理一个内部标志,可以使用set()将其设置为true并使用clear()该方法重置为false。该方法阻塞,直到标志为真。
is_set()
当内部标志为true时,返回True
set()
将内部标志设置为True,所有等待它变为真的线程都被唤醒
clear()
将内部标志重置为False。随后,线程调用wait()将阻塞,直到set()被调用以再次将内容标志设置
wait(timeout=None)
阻塞直到内部标志为真。如果内部标志在进入时为真,则立即返回。否则,阻塞直到另一个线程调用set()将标志设置为true,或者发送超时为止。
import threading
import time
event = threading.Event()
def light():
count = 1
while True:
if count < 5:
if count == 1:
print('\n')
print('The red light is on...', count)
event.clear()
elif count >= 5 and count < 10:
event.set()
else:
count = 0
count += 1
time.sleep(1)
def car():
num = 0
while True:
if event.isSet():
num += 1
print('The car of {} is running...'.format(num))
time.sleep(1)
else:
event.wait()
if __name__ == '__main__':
light = threading.Thread(target=light, )
light.start()
car = threading.Thread(target=car)
car.start()
Timer对象
定义
代表一个只有在经过一定时间后才应该运行的动作——一个计时器。通过调用它们的start()方法来启动,可以通过调用该cancel()方法来停止计时器。
threading.Timer(interval, function, args=None, kwargs=None)
cancel()
停止定时器,并取消定时器动作的执行。
示例
import threading
def hello():
print('Hello world')
t = threading.Timer(5, hello)
t.start()
Barrier对象
多个线程运行到某个时间点以后每个线程都需要等着其他线程准备好以后再同时进行下一步工作。
threading.Barrier(parties, action=None, timeout=None)
parties:线程计时器,记录线程数量,也称线程障碍数量
action:可调用函数,当等待的线程达到了parties的数量,其中一个线程会首先调用action对应函数,之后再执行线程自己内部的代码。
timeout:默认的超时时间
wait(timeout=None)
当Barrier的所有线程都调用了这个函数,他们都会被同时释放。如果提供了超时,它优先于提供给类构造函数的任何超时使用。
reset()
将障碍返回到默认的空状态。任何等待它的线程都会收到BrokenBarrierError异常
abort()
将屏障置于破碎状态。如果其中一个线程需要终止,请使用此选项,以避免死锁应用程序。
import random
import threading
import time
def worker(arg):
time.sleep(random.randint(1, 20))
r = b.wait(20)
if r == 0:
print(arg)
def printOk():
print('ok')
b = threading.Barrier(parties=3, action=printOk, timeout=20)
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
t.start()
|