目录
线程对象?threading.Thread
定时器 threading.Timer
线程锁 threading.Lock
线程锁使用with
信号量 threading.BoundedSemaphore
python的多线程 对 IO密集型(文件处理,爬虫)代码还是友好的,对CPU密集型(数据处理,循环等)代码不太友好
Python多线程相当于单核多线程。只能利用单核资源跑多个线程;要想利用多核资源可以使用进程来实现。
而Java的多线程能利用多核资源。
线程对象?threading.Thread
threading.Thread(group=None,?target=None,?name=None,?args=(),?kwargs={},?*,?daemon=None)
group?应该为?None ;为了日后扩展?ThreadGroup ?类实现而保留。
target?是用于?run() ?方法调用的可调用对象。默认是?None ,表示不需要调用任何方法。
name?是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中?N?是小的十进制数。
args?是用于调用目标函数的参数元组。默认是?() 。
kwargs?是用于调用目标函数的关键字参数字典。默认是?{} 。
如果你自己重写子类继承threading.Thead类的话,一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__() )。
线程对象方法
import threading
from time import sleep
def demo_func_one():
sleep(1)
print("demo one")
def demo_func_two():
for i in range(5):
sleep(1)
print("demo two")
print("当前线程: ", threading.current_thread()) # <Thread(Thread-2, started 11952)>
def demo_func_three():
sleep(1)
print("demo three")
print("当前活动线程", threading.active_count()) # 4, 三个子线程 一个主线程
func_list = [demo_func_one, demo_func_two, demo_func_three]
thread_list = []
print("主线程", threading.main_thread()) # <_MainThread(MainThread, started 3948)>
for func in func_list:
t = threading.Thread(target=func)
thread_list.append(t)
t.setDaemon(True) # 设置线程为守护线程. 主线程执行完毕,不管子线程是否执行结束,都结束程序
t.start() # 开始线程
print("线程是否存活:", t.is_alive()) # True,线程是否存活
for t in thread_list:
t.join() # 等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -
for t in thread_list:
print(t.is_alive()) # False. 线程是否存活
# 主线程默认不是守护线程,主线程创建的子线程默认也不是守护线程
定时器 threading.Timer
import threading
import time
def demo_one():
print("定时器 执行")
def demo_two():
for i in range(6):
print("非定时器")
time.sleep(1)
t_list = []
f_list = [demo_one, demo_two]
# 创建定时器线程,再间隔时间 interval 之后执行
t = threading.Timer(interval=5, function=demo_one)
t.start()
# 创建非定时器线程
tt = threading.Thread(target=demo_two)
tt.start()
# 线程等待。 这里所有的线程join() 方法都要在所有线程都调用start()开始之后再调用,否则将根据调用join()的顺序执行
t.join()
tt.join()
"""
非定时器
非定时器
非定时器
非定时器
非定时器
定时器 执行
非定时器
"""
线程锁 threading.Lock
一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它。
import threading
from threading import Lock
# 处理同一数据时,加线程锁,不会导致数据错乱
# 创建线程锁(互斥锁)对象
lock = Lock()
def demo_three():
lock.acquire() # 加锁,处理数据前
print(lock.locked()) # 线程锁状态,此时为 True
global num
time.sleep(1)
num -= 1
time.sleep(1)
print(num)
lock.release() # 解锁,处理完数据后
print(lock.locked()) # 线程锁状态,此时为 False
t_list = []
for i in range(100):
t = threading.Thread(target=demo_three)
t.start()
t_list.append(t)
for t in t_list:
t.join()
"""
执行结果:
-1
-2
-3
...
"""
-----------------------------------------------------------------------
# 不加锁的情况,多线程处理同一数据会有问题
import threading
from threading import Lock
def demo_three():
global num
time.sleep(1)
num -= 1
time.sleep(1)
print(num)
t_list = []
for i in range(100):
t = threading.Thread(target=demo_three)
t.start()
t_list.append(t)
for t in t_list:
t.join()
"""
执行结果:
-100
-100
-100
...
"""
线程锁使用with
import threading
from threading import Lock
import time
"""
使用以下加锁方式
with 锁对象:
do something
和以下方式一样的效果
lock.acquire()
try:
do something
finnaly:
lock.release()
"""
lock = Lock()
num = 0
# 此方法加锁,正常执行
def demo_one():
time.sleep(1)
with lock:
global num
num -= 1
print("locked" + str(num))
time.sleep(1)
# 此方法不加锁,100个线程会全部输出 -100
# def demo_one():
# global num
# time.sleep(1)
# num -= 1
# time.sleep(1)
# print("locked" + str(num))
for i in range(100):
t = threading.Thread(target=demo_one)
t.start()
信号量 threading.BoundedSemaphore
信号量 通常用于保护数量有限的资源.例如数据库服务器 在资源数量固定的任何情况下,都应该使用有界信号量。在生成任何工作线程前,应该在主线程中初始化信号量 互斥锁同时只允许一个线程操作数据,而 信号量 同时允许指定数量的线程操作数据
import threading
import time
max = 2
sem = threading.BoundedSemaphore(value=max) # 设置最大信号量(最多可以同时处理数据的线程)
num = 0
def demo_semaphore(num, semaphore):
semaphore.acquire()
print("now the threading is {}, {}".format(num, str(time.time())))
time.sleep(1)
semaphore.release()
for i in range(100):
t = threading.Thread(target=demo_semaphore, args=(i, sem))
t.start()
|