进程的使用
多进程完成多任务
# 导入进程模块
import multiprocessing
import time
# 编写代码
def coding():
for i in range(3):
print("coding...")
time.sleep(0.2)
# 听音乐
def music():
for i in range(3):
print("music...")
time.sleep(0.2)
if __name__ == '__main__':
# coding()
# music()
# 通过进程类创建进程对象
coding_process = multiprocessing.Process(target=coding)
# 通过进程类创建进程对象
music_process = multiprocessing.Process(target=music)
# 启动进程
coding_process.start()
music_process.start()
多任务中带有参数
# 导入进程模块
import multiprocessing
import time
# 编写代码
def coding(num, name):
for i in range(num):
print(name)
print("coding...")
time.sleep(0.2)
# 听音乐
def music(count):
for i in range(count):
print("music...")
time.sleep(0.2)
if __name__ == '__main__':
# 通过进程类创建进程对象
#方法一:通过args的方式按照任务中参数的次序依次传入,次序一定要对齐
coding_process = multiprocessing.Process(target=coding, args=(3, "python..."))
# 通过进程类创建进程对象
#方法二:通过kwargs的方式传入,任务中的每个参数按字典的方式传,
# key为对应的参数且与与任务中的参数一样
music_process = multiprocessing.Process(target=music, kwargs={"count": 3})
# 启动进程
coding_process.start()
music_process.start()
获取进程号
# 导入进程模块
import multiprocessing
import time
import os
# 编写代码
def coding():
print("coding>>>%d" % os.getpid())
for i in range(3):
print("coding...")
time.sleep(0.2)
# 听音乐
def music():
print("music>>>%d" % os.getpid())
for i in range(3):
print("music...")
time.sleep(0.2)
if __name__ == '__main__':
print("主进程>>>%d" % os.getpid())
# 通过进程类创建进程对象
coding_process = multiprocessing.Process(target=coding)
# 通过进程类创建进程对象
music_process = multiprocessing.Process(target=music)
# 启动进程
coding_process.start()
music_process.start()
进程间不共享全局变量
import multiprocessing
import time
# 全局变量
my_list = []
# 写入数据
def write_data():
for i in range(3):
my_list.append(i)
print("add:", i)
print("write_data", my_list)
# 读取数据
def read_data():
print("read_data", my_list)
if __name__ == '__main__':
# 创建写入数据进程
write_process = multiprocessing.Process(target=write_data)
# 创建读取数据进程
read_process = multiprocessing.Process(target=read_data)
# 启动进程执行相应任务
write_process.start()
time.sleep(1)
read_process.start()
主进程与子进程的结束顺序问题
import multiprocessing
import time
# task函数
def task():
for i in range(10):
print("working...")
time.sleep(0.2)
if __name__ == '__main__':
# 创建子进程
work_process = multiprocessing.Process(target=task)
# 启动子进程
work_process.start()
# 延时1秒
time.sleep(1)
print("主进程执行完毕")
'''
主进程内的代码执行完后要等子进程执行结束后才结束
'''
如何使主进程结束后子进程也结束
方法一
import multiprocessing
import time
# task函数
def task():
for i in range(10):
print("working...")
time.sleep(0.2)
if __name__ == '__main__':
# 创建子进程
work_process = multiprocessing.Process(target=task)
# 方法一:设置守护主进程
work_process.daemon = True
# 启动子进程
work_process.start()
# 延时1秒
time.sleep(1)
print("主进程执行完毕")
方法二
import multiprocessing
import time
# task函数
def task():
for i in range(10):
print("working...")
time.sleep(0.2)
if __name__ == '__main__':
# 创建子进程
work_process = multiprocessing.Process(target=task)
# 启动子进程
work_process.start()
# 延时1秒
time.sleep(1)
# 方法二:主动杀死子进程
work_process.terminate()
print("主进程执行完毕")
进程间的通信
pipe管道:单工模式
import multiprocessing
import time
def process_receiver(pipe):
while True:
data = pipe.recv()
if data==None:
break
# print("pipe -> recv: \33[42;1m 接收 \033[0m", data)
print("pipe -> recv: ", data)
def process_sender(pipe):
for i in range(10):
# print("send -> pipe: \33[41;1m 发送 \033[0m",i)
print("send -> pipe: ",i)
pipe.send(i)
time.sleep(1)
pipe.send(None)
if __name__ == '__main__':
# duplex双工模式(双工模式, 单工模式),默认为True 双工
pipe = multiprocessing.Pipe(duplex=False)
p_sender = multiprocessing.Process(target=process_sender, args=(pipe[1],))
p_receiver = multiprocessing.Process(target=process_receiver, args=(pipe[0],))
p_sender.start()
p_receiver.start()
# print('结束')
pipe管道:双工模式
"""
双工
"""
import multiprocessing
import time
def process_receiver(pipe):
while True:
data = pipe.recv()
if data==None:
break
print("pipe -> recv: \33[42;1m 接收 \033[0m", data)
pipe.send("报告:{}".format(data))
def process_sender(pipe):
for i in range(10):
print("send -> pipe: \33[41;1m 发送 \033[0m",i)
pipe.send(i)
resp = pipe.recv()
print("收到回复-> {}".format(resp))
time.sleep(1)
pipe.send(None)
if __name__ == '__main__':
# duplex双工模式,默认为True
pipe = multiprocessing.Pipe(duplex=True)
p_receiver = multiprocessing.Process(target=process_receiver, args=(pipe[0],))
p_sender = multiprocessing.Process(target=process_sender, args=(pipe[1],))
p_receiver.start()
p_sender.start()
通过queue队列进行通信
import multiprocessing
import time
import os
def write_queue(queue):
print("write_queue>>>%d" % os.getpid())
# 循环写入数据
for i in range(10):
if queue.full():
print("队列已满!")
break
# 向队列中放入消息
queue.put(i+100)
time.sleep(0.5)
def read_queue(queue):
print("read_queue>>>%d" % os.getpid())
# 循环读取队列消息
while True:
# 队列为空,停止读取
if queue.empty():
print("---队列已空---")
break
# 读取消息并输出
result = queue.get()
print(result)
if __name__ == '__main__':
# 创建消息队列
queue = multiprocessing.Queue(5)
# 创建子进程
p1 = multiprocessing.Process(target=write_queue, args=(queue,))
p1.start()
# 等待p1写数据进程执行结束后,再往下执行
p1.join()
p2 = multiprocessing.Process(target=read_queue, args=(queue,))
p2.start()
线程的使用
多线程完成多任务
import threading
import time
# 编写代码
def coding():
for i in range(3):
print("coding...")
time.sleep(0.2)
# 听音乐
def music():
for i in range(3):
print("music...")
time.sleep(0.2)
if __name__ == '__main__':
# 创建子线程
coding_thread = threading.Thread(target=coding)
music_thread = threading.Thread(target=music)
# 启动子线程执行任务
coding_thread.start()
music_thread.start()
多任务中带有参数
import threading
import time
# 编写代码
def coding(num):
for i in range(num):
print("coding...")
time.sleep(0.2)
# 听音乐
def music(count):
for i in range(count):
print("music...")
time.sleep(0.2)
if __name__ == '__main__':
# 创建子线程
#注意:单个参数传入时记得加逗号,表明以元组的形式传入
coding_thread = threading.Thread(target=coding, args=(2,))
# 创建子线程
music_thread = threading.Thread(target=music, kwargs={"count" : 2})
# 启动子线程执行任务
coding_thread.start()
music_thread.start()
主线程与子线程的结束顺序问题
import threading
import time
# task函数
def task():
for i in range(10):
print("work...")
time.sleep(0.2)
if __name__ == '__main__':
work_thread = threading.Thread(target=task)
work_thread.start()
# 延时一秒
time.sleep(1)
print("主线程执行完毕")
如何使主线程结束后子进程也结束
方法一
import threading
import time
# task函数
def task():
for i in range(10):
print("work...")
time.sleep(0.2)
if __name__ == '__main__':
# 创建子线程
# 方法一: 参数方式设置守护主线程
work_thread = threading.Thread(target=task, daemon=True)
# 启动线程
work_thread.start()
# 延时一秒
time.sleep(1)
print("主线程执行完毕")
方法二
import threading
import time
# task函数
def task():
for i in range(10):
print("work...")
time.sleep(0.2)
if __name__ == '__main__':
# 创建子线程
work_thread = threading.Thread(target=task)
# 启动线程
# 方法二:
work_thread.setDaemon(True)
work_thread.start()
# 延时一秒
time.sleep(1)
print("主线程执行完毕")
线程间的执行顺序
import threading
import time
# 获取线程信息函数
def get_info():
time.sleep(0.5)
# 获取线程信息
current_thread = threading.current_thread()
print(current_thread)
if __name__ == '__main__':
# 创建10个子线程
for i in range(10):
sub_thread = threading.Thread(target=get_info)
sub_thread.start()
线程间是共享全局变量的
import threading
import time
# 全局变量
my_list = []
# 写入数据
def write_data():
for i in range(3):
# print("add:", i)
my_list.append(i)
print("write:", my_list)
# 读取数据
def read_data():
print("read:", my_list)
if __name__ == '__main__':
# 创建子线程
write_thread = threading.Thread(target=write_data)
read_thread = threading.Thread(target=read_data)
# 启动线程
write_thread.start()
# 延时一秒
time.sleep(1)
read_thread.start()
线程之间共享全局变量出现的错误问题
import threading
# 全局变量
num = 0
# 对g_num进行加操作
def sum_num1():
for i in range(10000000):
global num
num += 1
print("num1:", num)
# 对g_num进行加操作
def sum_num2():
for i in range(10000000):
global num
num += 1
print("num2:", num)
if __name__ == '__main__':
# 创建子线程
sum1_thread = threading.Thread(target=sum_num1)
sum2_thread = threading.Thread(target=sum_num2)
# 启动线程
sum1_thread.start()
sum2_thread.start()
'''1
理论上来说,不管是哪个子线程先执行结束,后结束运行的线程最后
的结果应该是20000000,但是实际运行效果不是20000000,而且每次
运行的结果都不同,原因在于如下代码段:
for i in range(10000000):
global num
num += 1
为非原子性操作
1'''
线程锁
import threading
# 全局变量
num = 0
# 对g_num进行加操作
def sum_num1():
# 上锁
lock.acquire()
for i in range(10000000):
global num
num += 1
# 解锁
lock.release()
print("num1:", num)
# 对g_num进行加操作
def sum_num2():
# 上锁
lock.acquire()
for i in range(10000000):
global num
num += 1
# 解锁
lock.release()
print("num2:", num)
if __name__ == '__main__':
# 创建锁
lock = threading.Lock()
# 创建子线程
sum1_thread = threading.Thread(target=sum_num1)
sum2_thread = threading.Thread(target=sum_num2)
# 启动线程
sum1_thread.start()
sum2_thread.start()
死锁问题
import threading
# 全局变量
num = 0
# 对g_num进行加操作
def sum_num1():
print('sum_num1...')
# 上锁
lock.acquire()
for i in range(10000000):
global num
num += 1
print("num1:", num)
# 对g_num进行加操作
def sum_num2():
print('sum_num2...')
# 上锁
lock.acquire()
for i in range(10000000):
global num
num += 1
print("num2:", num)
if __name__ == '__main__':
# 创建锁
lock = threading.Lock()
# 创建子线程
sum1_thread = threading.Thread(target=sum_num1)
sum2_thread = threading.Thread(target=sum_num2)
# 启动线程
sum1_thread.start()
sum2_thread.start()
'''2
死锁问题分析:
两个子线程分别执行sum_num1()和sum_num2(),在未上锁之前分别
执行了print('sum_num1...'),print('sum_num2...'),此时假定
sum_num1()先执行了lock = threading.Lock()获得了上锁,而后
sum_num2()才执行到该语句,发现已经被sum_num1()上锁了,从而
不在继续执行下面的代码,sum_num1则继续执行后续的代码直至该
线程运行结束,此时注意sum_num2()仍在等待获取上锁,但sum_num1()
没有释放锁,所以一直等待不会执行后续代码
2'''
进程池
import multiprocessing
import time
def copy_work(i):
print(f"拷贝中,第{i}号进程....",multiprocessing.current_process().pid)
time.sleep(2)
if __name__ == '__main__':
# 创建进程池
# Pool(3) 表示创建容量为3个进程的进程池
pool = multiprocessing.Pool(3)
for i in range(10):
# 利用进程池同步拷贝文件,进程池中的进程会必须等上一个进程退出才能执行下一个进程
# pool.apply(copy_work,args=(i,))
#添加任务
pool.apply_async(copy_work,args=(i,))
pool.close()
# 注意:如果使用异步方式执行copy_work任务,主进程不再等待子进程执行完毕再退出!
pool.join()#使并行执行变为串行执行
print('程序结束')
'''
运行结果:
拷贝中,第0号进程.... 13068
拷贝中,第1号进程.... 12680
拷贝中,第2号进程.... 5292
拷贝中,第3号进程.... 13068
拷贝中,第4号进程.... 5292
拷贝中,第5号进程.... 12680
拷贝中,第6号进程.... 13068
拷贝中,第7号进程.... 5292
拷贝中,第8号进程.... 12680
拷贝中,第9号进程.... 13068
程序结束
'''
线程池
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def task(a):
print('子线程:', threading.current_thread().name)
print("my: ", a)
time.sleep(1.5)
if __name__ == '__main__':
print('主线程:', threading.current_thread().name)
# 创建线程池,3表示最大任务数(线程数)
executor = ThreadPoolExecutor(3)
for i in range(10):
#添加任务到线程池
executor.submit(task, i)
print('结束')
|