1、概念
由于Python的GIL,多线程未必是CPU密集型程序的好的选择。
多进程可以完全独立的进程环境中运行程序,可以较充分地利用多处理器。
但是进程本身的隔离带来的数据不共享也是一个问题,而且线程比进程轻量级。
2、multiprocessing.Process 类
2.1 进程间同步
Python 在进程间同步提供了和线程同步一样的类,使用的方法一样,使用的效果也类似。
不过,进程间代价要高于线程间,而且系统底层实践是不同的,只不过是Python 屏蔽了这些不同之处,让用户简单使用多进程。
multiprocessing.Process 还提供共享内存、服务器进程来共享数据,还提供了用于进程间通讯的Queue 队列/Pipe 管道。
进程间的通信方式: 1、多进程就是启动多个解释器进程,进程间通信必须序列化、反序列化 2、数据的线程安全性问题,如果每个进程中没有实现多线程,GIL 也就没什么用了。
2.2 示例
import multiprocessing
import datetime
import logging
import threading
import time
FORMAT = "%(process)8s %(processName)12s %(thread)6d %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
def clac(i):
sum = 0
for _ in range(1000000000):
sum += 1
ret = (i, sum)
logging.info(ret)
return ret
if __name__ == '__main__':
start = datetime.datetime.now()
ps = []
for i in range(4):
p = multiprocessing.Process(target=clac, args=(i,), name='clac-{}'.format(i))
ps.append(p)
p.start()
for p in ps:
p.join()
logging.info("{} {} {}".format(p.name, p.exitcode, p.pid))
delta = (datetime.datetime.now() - start).total_seconds()
time.sleep(1)
print(delta)
print('ps ===>', ps)
print('end =========================')
print(threading.currentThread())
42272 clac-0 12752 (0, 1000000000)
1400 MainProcess 43224 clac-0 0 42272
21336 clac-2 48596 (2, 1000000000)
42048 clac-3 49296 (3, 1000000000)
45224 clac-1 46380 (1, 1000000000)
1400 MainProcess 43224 clac-1 0 45224
1400 MainProcess 43224 clac-2 0 21336
1400 MainProcess 43224 clac-3 0 42048
47.764683
ps ===> [<Process name='clac-0' pid=42272 parent=1400 stopped exitcode=0>, <Process name='clac-1' pid=45224 parent=1400 stopped exitcode=0>, <Process name='clac-2' pid=21336 parent=1400 stopped exitcode=0>, <Process name='clac-3' pid=42048 parent=1400 stopped exitcode=0>]
end =========================
<_MainThread(MainThread, started 43224)>
Process finished with exit code 0
3、multiprocessing.Pool 类
multiprocessing.Pool 是进程池类。
名称 | 含义 |
---|
apply(self, func, args=(), kwds={}) | 阻塞执行,导致主进程执行其他子进程就像一个个执行 | apply_async(self, func, args=(), kwds={}, callback=None,error_callback=None) | 与apply方法一致,非阻塞异步执行,得到结果后会执行回调 | close() | 关闭池,池不能再接受新的任务,所有任务完成后退出进程 | terminate() | 立即结束工作进程,不再处理未处理的任务 | join() | 主进程阻塞等待子进程的退出,join()方法要在close或terminate之后使用 |
3.1 同步调用
import multiprocessing
import datetime
import logging
FORMAT = "%(process)8s %(processName)18s %(thread)6d %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
def clac(i):
sum = 0
for _ in range(100000000):
sum += 1
logging.info(sum)
return i, sum
if __name__ == '__main__':
start = datetime.datetime.now()
pool = multiprocessing.Pool(4)
for i in range(4):
ret = pool.apply(clac, args=(i,))
logging.info(ret)
pool.close()
pool.join()
delta = (datetime.datetime.now() - start).total_seconds()
logging.info(delta)
logging.info('end =========================')
48312 SpawnPoolWorker-1 8216 100000000
28916 MainProcess 2176 (0, 100000000)
45884 SpawnPoolWorker-2 38392 100000000
28916 MainProcess 2176 (1, 100000000)
50860 SpawnPoolWorker-3 47644 100000000
28916 MainProcess 2176 (2, 100000000)
11308 SpawnPoolWorker-4 44452 100000000
28916 MainProcess 2176 (3, 100000000)
28916 MainProcess 2176 11.909838
28916 MainProcess 2176 end =========================
Process finished with exit code 0
3.2 异步调用
import multiprocessing
import datetime
import logging
FORMAT = "%(process)8s %(processName)18s %(thread)6d %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
def clac(i):
sum = 0
for _ in range(100000000):
sum += 1
logging.info(sum)
return i, sum
if __name__ == '__main__':
start = datetime.datetime.now()
pool = multiprocessing.Pool(4)
for i in range(4):
ret = pool.apply_async(clac, args=(i,),
callback=lambda ret:logging.info('{} in callback'.format(ret)))
logging.info('{} ~~~~~~~~~~~~~~'.format(ret))
pool.close()
logging.info('*******************************')
pool.join()
delta = (datetime.datetime.now() - start).total_seconds()
logging.info(delta)
logging.info('end =========================')
41372 MainProcess 33640 <multiprocessing.pool.ApplyResult object at 0x000001B40CBED9A0> ~~~~~~~~~~~~~~
41372 MainProcess 33640 <multiprocessing.pool.ApplyResult object at 0x000001B40CD36550> ~~~~~~~~~~~~~~
41372 MainProcess 33640 <multiprocessing.pool.ApplyResult object at 0x000001B40CD36760> ~~~~~~~~~~~~~~
41372 MainProcess 33640 <multiprocessing.pool.ApplyResult object at 0x000001B40CD368E0> ~~~~~~~~~~~~~~
41372 MainProcess 33640 *******************************
12044 SpawnPoolWorker-2 17296 100000000
41372 MainProcess 18352 (1, 100000000) in callback
31700 SpawnPoolWorker-1 46176 100000000
41372 MainProcess 18352 (0, 100000000) in callback
46804 SpawnPoolWorker-3 9028 100000000
41372 MainProcess 18352 (2, 100000000) in callback
25256 SpawnPoolWorker-4 48704 100000000
41372 MainProcess 18352 (3, 100000000) in callback
41372 MainProcess 33640 3.170857
41372 MainProcess 33640 end =========================
Process finished with exit code 0
4、多线程和多进程的选择
|