1、介绍
1、Python并发编程简介
2、选择多线程多进程多协程
Python并发变成有三种方式:多线程Thread、多进程Process、多协程Coroutine。
3、全局解释器锁GIL
由于GIL的存在,即使电脑上有多核CPU,单个时刻也只能使用一个,相比于并发加速的C++/Java所以慢。
2、多线程
1、使用多线程,Python爬虫加速10倍
1、创建多线程的方式
join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后。
2、代码
blog_spider.py
import requests
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)]
def craw(url):
r = requests.get(url)
print(url, len(r.text))
if __name__ == '__main__':
pass
python01_multi_thread_craw.py
import chapter08.spider.blog_spider as blog_spider
import threading
import time
def single_thread():
print("single_thread begin")
for url in blog_spider.urls:
blog_spider.craw(url)
print("single_thread end")
def multi_thread():
print("multi_thread begin")
threads = []
for url in blog_spider.urls:
threads.append(threading.Thread(target=blog_spider.craw, args=(url,)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("multi_thread end")
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print("single thread cost:", end - start, "seconds")
start = time.time()
multi_thread()
end = time.time()
print("multi thread cost:", end - start, "seconds")
pass
通过控制台进行简单计算:
2、生产者消费者爬虫模式
blog_spider.py函数
# @Time : 2021/10/25 15:13
# @Author : Li Kunlun
# @Description : 对博客程序进行爬虫
import requests
from bs4 import BeautifulSoup
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1, 50 + 1)]
# 生产者
def craw(url):
r = requests.get(url)
return r.text
# 消费者
def parse(html):
# 对html进行解析,拿到title内容
soup = BeautifulSoup(html, "html.parser")
# "a"表示 a标签
links = soup.find_all("a", class_="post-item-title")
# 返回一个元组, href和标签
return [(link["href"], link.get_text()) for link in links]
pass
if __name__ == '__main__':
# 解析出该网页中的href和标签
for result in parse(craw(urls[2])):
print(result)
pass
python01_producer_consumer_spider.py
# @Time : 2021/10/25 15:16
# @Author : Li Kunlun
# @Description : 生产者和消费者模块
import chapter08.spider02.blog_spider as blog_spider
import threading
import time
import random
import queue
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
while True:
url = url_queue.get()
html = blog_spider.craw(url)
# 将下载的html结果放到html_queue队列中
html_queue.put(html)
print(threading.current_thread().name, f"craw {url}", f"url_queue.size=", url_queue.qsize())
# 随机休息1-2秒
time.sleep(random.randint(1, 2))
def do_parse(html_queue: queue.Queue, fout):
while True:
html = html_queue.get()
# 获取对象之后进行解析
results = blog_spider.parse(html)
for result in results:
# 文件对象fout
fout.write(str(result) + "\n")
print(threading.current_thread().name, f"results.size=", len(results), "html_queue.size=", html_queue.qsize())
# 随机休息1-2秒
time.sleep(random.randint(1, 2))
if __name__ == '__main__':
url_queue = queue.Queue()
html_queue = queue.Queue()
for url in blog_spider.urls:
url_queue.put(url)
for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw{idx}")
t.start()
fout = open("data.txt", "w")
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
t.start()
总结:对于一个复杂的爬虫程序,可以将其分成很多个模块,每个模块又可以启动不同的线程组进行处理,线程之间通过队列Queue来进行交互。
3、线程安全问题以及解决方案
4、线程池ThreadPoolExecutor
有两种创建线程池的方式。
代码:
import concurrent.futures
import chapter08.py02_spider02.blog_spider as blog_spider
with concurrent.futures.ThreadPoolExecutor() as pool:
htmls = pool.map(blog_spider.craw, blog_spider.urls)
htmls = list(zip(blog_spider.urls, htmls))
for url, html in htmls:
print(url, len(html))
print("craw over")
with concurrent.futures.ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
future = pool.submit(blog_spider.parse, html)
futures[future] = url
for future in concurrent.futures.as_completed(futures):
url = futures[future]
print(url, future.result())
5、使用多线程在Web服务器中实现加速
python02_web.py:
import flask
import json
import time
app = flask.Flask(__name__)
def read_file():
time.sleep(0.1)
return "read_file result"
def read_db():
time.sleep(0.2)
return "read_db result"
def read_api():
time.sleep(0.3)
return "read_api result"
@app.route("/")
def index():
result_file = read_file()
result_db = read_db()
result_api = read_api()
return json.dumps({
"result_file": result_file,
"result_db": result_db,
"result_api": result_api
})
if __name__ == '__main__':
app.run()
启动后,控制台输出:
浏览器访问该网址:
postman测试查看网络调用消耗的时间:
对代码进行下面的改造(添加线程池):
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
app = flask.Flask(__name__)
pool = ThreadPoolExecutor()
def read_file():
time.sleep(0.1)
return "read_file result"
def read_db():
time.sleep(0.2)
return "read_db result"
def read_api():
time.sleep(0.3)
return "read_api result"
@app.route("/")
def index():
result_file = pool.submit(read_file)
result_db = pool.submit(read_db)
result_api = pool.submit(read_api)
return json.dumps({
"result_file": result_file.result(),
"result_db": result_db.result(),
"result_api": result_api.result()
})
if __name__ == '__main__':
app.run()
此时,程序整个运行时间以最长的io操作为准,实现了加速:
相当于3个io操作并发运行,以最长的时间为准。
参考网址:
https://blog.csdn.net/han2529386161/article/details/103592862
https://blog.csdn.net/daijiguo/article/details/78042309
https://blog.csdn.net/qq_35869630/article/details/105876923?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522163516814816780271565862%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=163516814816780271565862&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-5-105876923.pc_search_result_control_group&utm_term=python%E5%A4%9A%E7%BA%BF%E7%A8%8B%E9%81%87%E5%88%B0io%E6%97%B6%E9%87%8A%E6%94%BEgil+%E4%BC%9A%E5%8A%A0%E5%BF%AB%E6%89%A7%E8%A1%8C%E9%80%9F%E5%BA%A6&spm=1018.2226.3001.4187
3、多进程
1、多进程模块加速程序运行
python01_thread_process_cpu_bound.py:
import math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
prime = [112272535095293] * 100
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def single_thread():
for number in prime:
is_prime(number)
def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, prime)
def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, prime)
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print(f"single_thread cost:", end - start, "seconds")
start = time.time()
multi_thread()
end = time.time()
print(f"multi_thread cost:", end - start, "seconds")
start = time.time()
multi_process()
end = time.time()
print(f"multi_process cost:", end - start, "seconds")
"""
程序执行结果:
single_thread cost: 42.00479984283447 seconds
multi_thread cost: 42.01970911026001 seconds
multi_process cost: 9.357584953308105 seconds
"""
2、在Flask服务中使用多进程池加速程序运行
import flask
import json
import time
from concurrent.futures import ProcessPoolExecutor
import math
import json
app = flask.Flask(__name__)
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
number_list = [int(x) for x in numbers.split(",")]
results = process_pool.map(is_prime, number_list)
return json.dumps(dict(zip(number_list, results)))
if __name__ == '__main__':
process_pool = ProcessPoolExecutor()
app.run()
程序执行结果:
4、莫烦Python
1、多线程
1、添加线程
import threading
def show():
print(threading.active_count())
print(threading.enumerate())
print(threading.current_thread())
def thread_job():
print('This is a thread of %s' % threading.current_thread())
def main():
thread = threading.Thread(target=thread_job, )
thread.start()
show()
if __name__ == '__main__':
main()
2、join功能
不添加join()的结果:
import threading
import time
def thread_job():
print("T1 start\n")
for i in range(10):
time.sleep(0.1)
print("T1 finish\n")
added_thread = threading.Thread(target=thread_job, name='T1')
added_thread.start()
print("all done\n")
"""
预期输出结果:
T1 start
T1 finish
all done
实际结果:
T1 start
all done
T1 finish
"""
线程任务还未完成便输出all done 。如果要遵循顺序,可以在启动线程后对它调用join 。
使用join() 函数,按照顺序正常输出。
import threading
import time
def T1_job():
print('T1 start\n')
for i in range(10):
time.sleep(0.1)
print('T1 finish\n')
def T2_job():
print('T2 start\n')
print('T2 finish\n')
def main():
added_thread = threading.Thread(target=thread_job, name='T1')
thread2 = threading.Thread(target=T2_job, name='T2')
added_thread.start()
thread2.start()
thread2.join()
added_thread.join()
print('all done\n')
if __name__ == '__main__':
main()
"""
可以接收的输出结果(按照顺序进行输出):
T1 start
T2 start
T2 finish
T1 finish
all done
"""
3、队列
import threading
import time
from queue import Queue
def job(l, q):
for i in range(len(l)):
l[i] = l[i] ** 2
q.put(l)
def multithreading():
q = Queue()
threads = []
data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]
for i in range(4):
t = threading.Thread(target=job, args=(data[i], q))
t.start()
threads.append(t)
for thread in threads:
thread.join()
results = []
for _ in range(4):
results.append(q.get())
print(results)
if __name__ == '__main__':
multithreading()
2、多进程
1、queue
import multiprocessing as mp
"""
Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后
再从队列中取出结果, 继续加载运算。
因为:多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果
"""
def job(q):
res = 0
for i in range(1000):
res += i + i ** 2 + i ** 3
q.put(res)
if __name__ == '__main__':
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1 + res2)
2、pool
import multiprocessing as mp
def job(x):
return x * x
def multicore():
pool = mp.Pool(processes=2)
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
print(res.get())
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
print([res.get() for res in multi_res])
if __name__ == '__main__':
multicore()
3、lock
import multiprocessing as mp
import time
def job(v, num, l):
l.acquire()
for _ in range(4):
time.sleep(0.1)
v.value += num
print(v.value)
l.release()
def multicore():
l = mp.Lock()
v = mp.Value('i', 0)
p1 = mp.Process(target=job, args=(v, 1, l))
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
"""
控制台输出:
1
2
3
4
7
10
13
16
"""
|