一、Python对并发编程的支持
1.多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成 2.多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务 3.异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行
4.使用Lock对资源加锁,防止冲突访问 5.使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式 6.使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果 7.使用subprocess启动外部程序的进行,并进行输入输出交互
二、怎样选择多线程多进程多协程?
Python并发编程有三种方式:多线程Thread、多进程Process、多协程Coroutine
1、什么是CPU密集型计算、IO密集型计算?
2、多线程、多进程、多协程的对比
-
多进程Process(multiprocessing) 优点:可以利用多核CPU并行运算 缺点:占用资源多、可启动数目比线程少 适用于:CPU密集型计算 -
多线程Thread(threading) 优点:相比进程,更轻量级、占用资源少 缺点: 相比进程:多线程只能并发执行,不能利用多CPU(GIL) 相比协程:启动数目有限,占用内存资源,有线程切换开销 适用于:IO密集型计算、同时运行的任务数目要求不多 -
多协程Coroutine(asyncio) 优点:内存开销最少、启动协程数量最多 缺点:支持库有限(aiohttp vs requests)、代码实现复杂 适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景
一个进程中可以启动N个线程,一个线程中可以启动N个协程
3、怎样根据任务选择对应的技术?
三、Python速度慢的罪魁祸首–全局解释器锁GIL
1、Python速度慢的两大原因
相比于C/C++/JAVA ,Python确实慢,在一些特殊场景下,Python比C++慢100~200倍。由于速度慢的原因,很多公司的基础架构代码依然用C/C++开发。比如各大公司(阿里/腾讯/快手)的推荐引擎、搜索引擎、存储引擎等底层对性能要求高的模块。
- Python速度慢的原因1:
动态类型语言,边解释边执行 - Python速度慢的原因2:
GIL:无法利用多核CPU并发执行
2、GIL是什么?
全局解释器锁(英语:Global Interpreter Lock,缩写GIL)是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核处理器上,使用GIL的解释器也只允许同一时间执行一个线程。 由于GIL的存在,即使电脑有多核CPU,单个时刻也只能使用1个线程,相比于并发加速的C++/JAVA所以慢。
3、为什么有GIL这个东西?
简而言之:Python设计之初,为了规避并发问题引入GIL,现在想去除却去不掉。 原因详解:为了解决多线程之间数据完整性和状态同步问题,Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象。 开始:线程A和线程B都引用了对象obj,obj.ref_num=2,线程A和B都想撤销对obj的引用
4、怎样规避GIL带来的限制?
1、多线程threading机制依然是有用的,用于IO密集型计算 因为在I/O(read, write, send, recv, etc.)期间,线程会释放GIL,实现CPU和IO的并行。因此多线程用于IO密集型计算依然可以大幅提升速度。 但是多线程用于CPU密集型计算时,只会更加拖慢速度。 2、使用multiprocessing的多进程机制实现并行计算,利用多核CPU优势。 为了应对GIL问题,Python提供了multiprocessing。
四、使用多线程,Python爬虫被加速10倍
1、Python创建多线程的方法
def my_func(a, b):
do_craw(a, b)
import threading
t = threading.Thread(target=my_func, args=(100,200))
t.start()
t.join()
2、改写爬虫程序,变成多线程爬取 blog_spider.py
import requests
import certifi
urls = [
f"https://www.cnblogs.com/#p{page}" for page in range(2, 50)
]
def craw(url):
r = requests.get(url, verify=False)
print(url, len(r.text))
craw(urls[0])
01.multi_thread_craw.py
import blog_spider
import threading
import time
def single_thread():
print("single thread begin")
for url in blog_spider.urls:
blog_spider.craw(url)
print("single thread end")
def multi_thread():
print("multi thread begin")
threads = []
for url in blog_spider.urls:
threads.append(
threading.Thread(target=blog_spider.craw, args=(url, ))
)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("multi thread end")
if __name__ == "__main__":
start = time.time()
single_thread()
end = time.time()
print("single thread cost:", end - start, "seconds.")
start = time.time()
multi_thread()
end = time.time()
print("multi thread cost:", end - start, "seconds.")
3、速度对比:单线程爬虫vs多线程爬虫 single thread cost: 5.592846155166626 seconds multi thread cost: 1.0412259101867676 seconds.
五、Python实现生产者消费者爬虫
1、多组件的Pipeline技术架构
复杂的事情一般都不会一下子做完,而是分很多中间步骤一步步完成。
2、生产者消费者爬虫的架构
3、多线程数据通信的queue.Queue
queue.Queue可以用于多线程之间的、线程安全的数据通信。
import queue
q = queue.Queue()
q.put(item)
item = q.get()
q.qsize()
q.empty()
q.full()
4、代码编写实现生产者消费者爬虫
blog_spider.py
import requests
import certifi
from bs4 import BeautifulSoup
urls = [
f"https://www.cnblogs.com/#p{page}" for page in range(2, 50)
]
def craw(url):
r = requests.get(url, verify=False)
print(url, len(r.text))
return r.text
def parse(html):
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_ ="post-item-title")
return [(link["href"], link.get_text()) for link in links]
if __name__ == "__main__":
for result in parse(craw(urls[1])):
print(result)
02.producer_consumer_spider.py
import queue
import time
import blog_spider
import random
import threading
def do_craw(url_queue:queue.Queue, html_queue:queue.Queue):
while True:
url = url_queue.get()
html = blog_spider.craw(url)
html_queue.put(html)
print(threading.current_thread().name, f"craw {url}", "url_queue.size=", url_queue.qsize())
time.sleep(random.randint(1, 2))
def do_parse(html_queue:queue.Queue, fout):
while True:
html = html_queue.get()
results = blog_spider.parse(html)
for result in results:
fout.write(str(result) + "\n")
print(threading.current_thread().name, f"results.size", len(results), "html_queue.size", html_queue.qsize())
time.sleep(random.randint(1, 2))
if __name__ == "__main__":
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in blog_spider.urls:
url_queue.put(url)
for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
t.start()
fout = open("02.data.txt", "w")
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
t.start()
六、Python线程安全问题以及解决方案
1、线程安全概念介绍
线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间得共享变量,使程序功能正确完成。 由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全。
2、Lock用于解决线程安全问题
3、实例代码演示问题以及解决方案
03.lock_concurrent.py
import threading
import time
lock = threading.Lock()
class Account:
def __init__(self, balance):
self.balance = balance
def draw(account, amount):
with lock:
if account.balance >= amount:
time.sleep(0.1)
print(threading.current_thread().name, "取钱成功!")
account.balance -= amount
print(threading.current_thread().name, "余额", account.balance)
else:
print(threading.current_thread().name, "取钱失败,余额不足!")
if __name__ == "__main__":
account = Account(1000)
ta = threading.Thread(name="ta", target=draw, args=(account, 800))
tb = threading.Thread(name="tb", target=draw, args=(account, 800))
ta.start()
tb.start()
七、Python好用的线程池ThreadPoolExecutor
1、线程池的原理
线程的生命周期 新建线程系统需要分配资源、终止线程系统需要回收资源。如果可以重用线程,则可以减去新建/终止的开销。
2、使用线程池的好处
(1)提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源; (2)适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短 (3)防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题 (4)代码优势:使用线程池的语法比自己新建线程执行线程更加简洁
3、ThreadPoolExecutor的使用语法
4、使用线程池改造爬虫程序
04.thread_pool.py
import concurrent.futures
import blog_spider
with concurrent.futures.ThreadPoolExecutor() as pool:
htmls = pool.map(blog_spider.craw, blog_spider.urls)
htmls = list(zip(blog_spider.urls, htmls))
for url, html in htmls:
print(url, len(html))
print("craw over")
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
future = pool.submit(blog_spider.parse, html)
futures[future] = url
for feature in concurrent.futures.as_completed(futures):
url = futures[future]
print(url, future.result())
八、Python使用线程池在Web服务中实现加速
1、Web服务的架构以及特点
Web后台服务的特点: (1)Web服务对响应时间要求非常高,比如要求200ms返回 (2)Web服务有大量的依赖IO操作的调用,比如磁盘文件、数据库、远程API (3)Web服务经常需要处理几万人、几百万人的同时请求
2、使用线程池ThreadPoolExecutor加速
使用线程池ThreadPoolExecutor的好处: (1)方便将磁盘文件、数据库、远程API的IO调用并发执行 (2)线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能
3、代码用Flask实现Web服务并实现加速
05.flask_thread_pool.py
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
app = flask.Flask(__name__)
pool = ThreadPoolExecutor()
def read_file():
time.sleep(0.1)
return "file result"
def read_db():
time.sleep(0.2)
return "db result"
def read_api():
time.sleep(0.3)
return "api result"
@app.route("/")
def index():
result_file = pool.submit(read_file)
result_db = pool.submit(read_db)
result_api = pool.submit(read_api)
return json.dumps({"result_file": result_file.result(),
"result_db": result_db.result(),
"result_api": result_api.result()
})
if __name__ == "__main__":
app.run()
九、使用多进程multiprocessing模块加速程序的运行
1、有了多线程threading,为什么还要用多进程multiprocessing?
如果遇到CPU密集型计算,多线程反而会降低执行速度!! multiprocessing模块就是Python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行。
2、多进程multiprocessing知识梳理(对比多线程threading)
3、代码实战:单线程、多线程、多进程对比CPU密集计算速度
06.thread_process_cpu_bound.py
import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
PRIMES = [1122246565566511259] * 100
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n+1, 2):
if n % i == 0:
return False
return True
def single_thread():
for number in PRIMES:
is_prime(number)
def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, PRIMES)
def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, PRIMES)
if __name__ == "__main__":
start = time.time()
single_thread()
end = time.time()
print("single thread, cost:", end-start, "seconds.")
start = time.time()
multi_thread()
end = time.time()
print("multi thread, cost:", end - start, "seconds.")
start = time.time()
multi_process()
end = time.time()
print("multi process, cost:", end - start, "seconds.")
single thread, cost: 0.0010006427764892578 seconds. multi thread, cost: 0.0040018558502197266 seconds. multi process, cost: 0.9257152080535889 seconds.
十、Python在Flask服务中使用多进程池加速程序运行
07.flask_process_pool.py
import flask
from concurrent.futures import ProcessPoolExecutor
import math
import json
app = flask.Flask(__name__)
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n+1, 2):
if n % i == 0:
return False
return True
@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
number_list = [int(x) for x in numbers.split(",")]
results = process_pool.map(is_prime, number_list)
return json.dumps(dict(zip(number_list, results)))
if __name__ == "__main__":
process_pool = ProcessPoolExecutor()
app.run()
运行:http://127.0.0.1:5000/is_prime/1,2,3,4
十一、Python异步IO实现并发爬虫
Python异步IO库介绍:asyncio 08.async_spider.py
import asyncio
import aiohttp
import blog_spider
import time
async def async_craw(url):
print("craw url:", url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
print(f"craw url:{url}, {len(result)}")
loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url)) for url in blog_spider.urls]
start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds: ", end - start)
十二、在异步IO中使用信号量控制爬虫并发度
10.async_spider_semaphore.py
import asyncio
import aiohttp
import blog_spider
import time
semaphore = asyncio.Semaphore(10)
async def async_craw(url):
async with semaphore:
print("craw url:", url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
await asyncio.sleep(5)
print(f"craw url:{url}, {len(result)}")
loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url)) for url in blog_spider.urls]
start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds: ", end - start)
|