learn from 《Python高性能(第2版)》
1. 异步编程
阻塞、回调
import time
def wait_and_print(msg):
time.sleep(1)
print(msg)
import threading
def wait_and_print_async(msg):
def callback():
print(msg)
timer = threading.Timer(1, callback)
timer.start()
if __name__ == '__main__':
t0 = time.time()
wait_and_print('第一次')
wait_and_print('第二次')
print(f'After call, takes: {time.time() - t0} seconds')
输出
第一次
第二次
After call, takes: 2.017909049987793 seconds
t0 = time.time()
wait_and_print_async('第一次')
wait_and_print_async('第二次')
print(f'After call, takes: {time.time() - t0} seconds')
输出
After call, takes: 0.0020036697387695312 seconds
第二次第一次
把返回结果当参数传递给回调函数
def network_request_async(num, on_done):
def timer_done():
on_done({'success': True, 'result': num**2})
timer = threading.Timer(1, timer_done)
timer.start()
def on_done(result):
print(result)
network_request_async(2, on_done)
异步代码需要层层编写回调函数,很麻烦
future
future 更便利,可用来跟踪异步调用的结果
from concurrent.futures import Future
fut = Future()
print(fut)
pending 表示还未确定
可以使用 fut.set_result() 使结果可用
fut.set_result("hello michael")
print(fut, fut.result())
还可以通过 add_done_callback 指定回调函数,当结果可用时,调用它(第一参数为 future obj)
fut1 = Future()
fut1.add_done_callback(lambda future_obj: print(future_obj.result(), flush=True))
fut1.set_result("hello michael")
import threading
from concurrent.futures import Future
def network_request_async(number):
future = Future()
result = {
'success': True,
'result': number**2
}
timer = threading.Timer(1, lambda: future.set_result(result))
timer.start()
return future
if __name__ == '__main__':
fut = network_request_async(2)
print(fut)
上面的函数什么也没有返回,还处于 pending
添加回调函数
def fetch_square(number):
fut = network_request_async(number)
def on_done_future(future):
response = future.result()
if response['success']:
print(f'result is {response["result"]}')
fut.add_done_callback(on_done_future)
事件循环
不断监视各种资源的状态,并在事件发生时执行相应的回调函数
事件循环:每个执行单元都不会与其他执行单元同时运行。(能规避同时写一个数据的风险?)
import time
class Timer:
def __init__(self, timeout):
self.timeout = timeout
self.start_time = time.time()
def done(self):
return time.time() - self.start_time > self.timeout
if __name__ == '__main__':
timer = Timer(3)
while True:
if timer.done():
print('Timer finished')
break
流程不会被阻塞,可以在 while 循环中执行其他操作,通过循环不断轮询等待事件发生称为 busy-waiting
import time
class Timer:
def __init__(self, timeout):
self.timeout = timeout
self.start_time = time.time()
def done(self):
return time.time() - self.start_time > self.timeout
def on_timer_done(self, callback):
self.callback = callback
if __name__ == '__main__':
timer = Timer(1)
timer.on_timer_done(lambda: print('timer done from callback'))
while True:
if timer.done():
timer.callback()
break
if __name__ == '__main__':
timer = Timer(1)
timer.on_timer_done(lambda: print('timer done from callback'))
timer1 = Timer(2)
timer1.on_timer_done(lambda: print('timer1 done from callback'))
timers = [timer, timer1]
while True:
for timer in timers:
if timer.done():
timer.callback()
timers.remove(timer)
if len(timers) == 0:
break
2. asyncio 框架
import asyncio
loop = asyncio.get_event_loop()
def callback():
print("hello michael")
loop.stop()
loop.call_later(1, callback)
loop.run_forever()
协程
回调函数很繁琐,协程 像编写同步代码一样,来编写异步代码,更自然优雅(可将协程看做可停止和恢复执行的函数)
使用 yield 定义一个生成器
def range_gen(n):
i = 0
while i < n:
print(f'generating value {i}')
yield i
i += 1
range_gen(5)
代码没有执行,只返回一个生成器对象 使用 next(gen) 取结果
gen = range_gen(5)
next(gen)
程序会停在 yield 处,并保持内部状态
yield 接收值
def parrot():
while True:
message = yield
print(f'parrot says: {message}')
generator = parrot()
generator.send(None)
generator.send('hello')
generator.send({'hello': 'world'})
生成器可仅在相关资源就绪时才往前推进,不需要使用回调函数
asyncio 定义协程
async def hello():
await asyncio.sleep(1)
print("hello michael")
coro = hello()
print(coro)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
await 给事件循环提供了一个断点,等待资源期间,事件循环可继续管理其他协程
async def network_request(number):
await asyncio.sleep(1)
return {'success': True, 'result': number**2}
async def fetch_square(number):
response = await network_request(number)
if response['success']:
print(response['result'])
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_square(5))
asyncio.ensure_future() 调度协程和 future
asyncio.ensure_future(fetch_square(2))
asyncio.ensure_future(fetch_square(3))
asyncio.ensure_future(fetch_square(4))
loop.run_forever()
阻塞代码 -> 非阻塞 ThreadPoolExecutor
- 将阻塞代码放在一个独立的线程(OS层级实现的,允许代码并行执行)中运行
import time
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=3)
def wait_and_return(msg):
time.sleep(1)
return msg
print(executor.submit(wait_and_return, "i am parameters: msg"))
或者
import asyncio
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(executor, wait_and_return, "i am parameters: msg")
print(fut)
import requests
async def fetch_urls(urls):
responses = []
for url in urls:
responses.append(await loop.run_in_executor(executor, requests.get, url))
return responses
res = loop.run_until_complete(fetch_urls(["https://www.baidu.com",
"https://www.csdn.net"]))
print(res)
def fetch_urls_1(urls):
return asyncio.gather(*[loop.run_in_executor(executor, requests.get, url) for url in urls])
res = loop.run_until_complete(fetch_urls_1(["https://www.baidu.com",
"https://www.csdn.net"]))
print(res)
为避免 executor worker 数量限制,应当使用 非阻塞库 aiohttp
3. 响应式编程
旨在打造出色的并发系统
- 响应速度快
- 伸缩性高,处理各种负载
- 富有弹性,应对故障
- 消息驱动,不阻塞
ReactiveX 是一个项目,实现了用于众多语言的响应式编程工具,RxPy 是其中一个库
https://reactivex.io/languages.html
pip install reactivex
被观察者
import reactivex as rx
obs = rx.from_iterable(range(4))
print(obs)
obs.subscribe(print)
被观察者很像一个有序的迭代器
c = [1,2,3,4,5]
iterator = iter(c)
print(next(iterator))
print(next(iterator))
for i in iterator:
print(i)
Observable.subscribe 注册回调函数
c = [1,2,3,0,4,5]
obs = rx.from_iterable(c)
obs.subscribe(on_next=lambda x: print(f'next elem 1/{x}: {1/x}'),
on_error=lambda x: print(f'error: 1/{x} illegal'),
on_completed=lambda: print(f'completed calculation'))
输出
next elem 1/1: 1.0
next elem 1/2: 0.5
next elem 1/3: 0.3333333333333333
error: 1/division by zero illegal
Process finished with exit code 0
c = [1,2,3,4,5]
obs = rx.from_iterable(c)
obs.subscribe(on_next=lambda x: print(f'next elem 1/{x}: {1/x}'),
on_completed=lambda: print(f'completed calculation'))
输出
next elem 1/1: 1.0
next elem 1/2: 0.5
next elem 1/3: 0.3333333333333333
next elem 1/4: 0.25
next elem 1/5: 0.2
completed calculation
RxPy 提供了可用来创建、变换、过滤 被观察者,以及对其进行编组的运算符,这些操作返回 被观察者(可以继续串接、组合,威力所在)
obs = rx.from_iterable(range(5))
obs2 = obs[:3]
obs2.subscribe(print)
obs.subscribe(print)
运算符
from reactivex.operators import map as rx_map
op = rx_map(lambda x: x**2)
(rx.from_iterable(range(5))).pipe(op).subscribe(print)
from reactivex.operators import group_by as rx_group_by
op = rx_group_by(lambda x: x%3)
obs = (rx.from_iterable(range(10))).pipe(op)
obs.subscribe(lambda x: print(f"group key: {x.key}"))
每个组都是一个 被观察者
obs[0].subscribe(lambda x: x.subscribe(print))
print('-'*10)
obs[1].subscribe(lambda x: x.subscribe(print))
print('-'*10)
obs[2].subscribe(lambda x: x.subscribe(print))
print('-'*10)
0
3
6
9
----------
1
4
7
----------
2
5
8
----------
from reactivex.operators import merge_all
obs.pipe(merge_all()).subscribe(print)
输出 0 - 9 ,合并了所有 group 且按原顺序输出
4. 并行编程
问题是独立的,或者高度独立的,可以使用多核进行计算
如果子问题之间需要共享数据,实现起来不那么容器,有进程间通信开销的问题
线程
以共享内存方式实现并行的一种常见方式是 线程
由于 python 的 全局解释器锁 GIL ,线程执行 python 语句时,获取一个锁,执行完毕后,释放锁 每次只有一个线程能够获得这个锁,其他线程就不能执行 python 语句了
虽然有 GIL 的问题,但是遇到耗时操作(I/O) 时,依然可以使用线程来实现并发
进程
通过使用 进程 可以完全避开 GIL ,进程 不共享内存,彼此独立,每个进程都有自己的解释器 进程的缺点:
优点:分布在多台计算机中,可伸缩性更佳
使用多个进程
multiprocessing.Process 派生子类- 实现
Process.run 编写子进程中要执行的代码,processor_obj.start() 调用
import multiprocessing
import time
class MyProcess(multiprocessing.Process):
def __init__(self, id):
super(MyProcess, self).__init__()
self.id = id
def run(self):
time.sleep(1)
print(f'i am a process with id {self.id}')
if __name__ == '__main__':
p = MyProcess(1)
p.start()
p.join()
print('end')
t0 = time.time()
processes = [MyProcess(1.1) for _ in range(4)]
[p.start() for p in processes]
[p.join() for p in processes]
print(f'time: {time.time() - t0: .2f} s')
创建4个进程,执行并不需要 4倍的时间
import multiprocessing
def square(x):
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
inputs = list(range(4))
out = pool.map(square, inputs)
print(out)
print('end')
调用 Pool.map 主程序将 停止执行,直到所有工作进程处理完毕 使用 map_async 立即返回一个 AsyncResult 对象,在后台进行计算,不阻塞主程序,AsyncResult.get() 获取结果 Pool.apply_async 将单个函数任务分配给一个进程,apply_async 使用 函数,函数的参数,作为参数,返回 AsyncResult 对象
import multiprocessing
import time
def square(x):
time.sleep(5)
return x * x
if __name__ == '__main__':
t0 = time.time()
pool = multiprocessing.Pool(processes=4)
inputs = list(range(4))
out = pool.map_async(square, inputs)
print(out)
print('end')
print(f'{time.time() - t0} s')
get_out = out.get()
print(get_out)
print(f'{time.time() - t0} s')
out = [pool.apply_async(square, (i,)) for i in range(4)]
get_out = [r.get() for r in out]
print(get_out)
接口 Executor ,ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
def square(x):
return x*x
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=4)
fut = executor.submit(square, 2)
print(fut)
print(fut.result())
res = executor.map(square, range(4))
print(list(res))
要从一个或多个 Future 中提取结果,可使用 concurrent.futures.wait concurrent.futures.as_completed
from concurrent.futures import wait, as_completed, ProcessPoolExecutor
def square(x):
return x * x
if __name__ == '__main__':
executor = ProcessPoolExecutor()
fut1 = executor.submit(square, 2)
fut2 = executor.submit(square, 3)
wait([fut1, fut2])
res = as_completed([fut1, fut2])
print(res)
print(list(res))
out = [f.result() for f in [fut1, fut2]]
print(out)
5. 锁
防止多个进程同时执行受保护的代码,例如同时写同一个文件
multiprocessing.Lock()
6. 分布式处理
dask
https://www.dask.org/
pyspark
用户提交任务,集群管理器自动将任务分派给空闲的执行器
mpi4py 科学计算
https://pypi.org/project/mpi4py/
7. 开发部署
travis-ci
https://www.travis-ci.org/
编写 yaml 配置文件,当有新代码push后,自动运行 配置文件中的 测试项
docker
提供隔离环境
|