从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。 比如在跑任务的时候,python 多线程跑回很慢,但是开多个线程跑任务,速度会是倍数的增长。
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
executor = ThreadPoolExecutor(20)
for ind, item in enumerate(self.isin_info_list):
executor.submit(self.load_s3_news_each, isin_info=item)
print(self.file_10_24_amount, '10____24')
print(self.file_10_23_amount, '10____23')
arr = []
for line in range(1, 501):
arr.append(line)
if len(arr) == 500:
all_task = [executor.submit(self.load_s3_news_each, (item)) for item in self.isin_info_list]
wait(all_task, return_when=ALL_COMPLETED)
arr = []
if len(arr) > 0:
all_task = [executor.submit(self.load_s3_news_each, (item)) for item in self.isin_info_list]
wait(all_task, return_when=ALL_COMPLETED)
print(self.file_10_24_amount, '10____24')
print(self.file_10_23_amount, '10____23')
all_task = [executor.submit(self.load_s3_news_each, (item)) for item in self.isin_info_list]
wait(all_task, return_when=ALL_COMPLETED)
print(self.file_10_24_amount, '10____24')
print(self.file_10_23_amount, '10____23')
def load_s3_news_each(self, isin_info):
prefix = "xxx/xxxxxx/" + isin_info["id_isin"] + "/"
for ind_item, obj in enumerate(s3_bucket.objects.filter(Prefix=prefix)):
key = obj.key
if self.yesterday in obj.key:
self.file_10_24_amount += 1
print(key, '>>>>>>>>10-24>>>>>>>>>>>>>>>', self.file_10_24_amount)
if self.last_2day in obj.key:
self.file_10_23_amount += 1
print(key, '>>>>>>>>>10-23>>>>>>>>>>>>>>', self.file_10_23_amount)
方案1 与 方案3 的执行效率基本一致。
但是 方案1 并没有线程结束的标志,print 并不是在所有线程都执行完成才输出的。
而方案3 的 print 则是在 所有线程结束之后会打印。
方案2 的执行效率会 略差于 方案1、方案3。
但是 好处就是 可以 清晰的知道 线程 目前 执行到了哪里。500个一输出,可以看到具体执行进度。
|