Python3 ThreadPoolExecutor线程池
用到的模块:concurrent.futures
为什么需要线程池
主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值。
futures可以让多线程和多进程编码接口一致
怎么使用
线程池的基类是concurrent.futures模块中的Executor,Executor提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。
Exectuor 提供了如下常用方法:
- submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
- map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
- shutdown(wait=True):关闭线程池。
程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 对象类主要用于获取线程任务函数的返回值。由于是异步任务,Python 用 Future 对象来作为 task 的返回容器。
Future 未来对象
Futrure 提供了以下方法:
class Future(object):
"""Represents the result of an asynchronous computation."""
def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
-
cancel():取消该 Future 代表的线程任务。如果该任务正在执行或者已完成,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。 def cancel(self):
"""Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
-
cancelled():返回 Future 代表的线程任务是否被成功取消。 def cancelled(self):
"""Return True if the future was cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
-
running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。 def running(self):
"""Return True if the future is currently executing."""
with self._condition:
return self._state == RUNNING
-
done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。 def done(self):
"""Return True of the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
-
result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。 def result(self, timeout=None):
"""Return the result of the call that the future represents.
Args:
timeout: The number of seconds to wait for the result if the future
isn't done. If None, then there is no limit on the wait time.
Returns:
The result of the call that the future represents.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
Exception: If the call raised then that exception will be raised.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
-
exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。 def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
Args:
timeout: The number of seconds to wait for the exception if the
future isn't done. If None, then there is no limit on the wait
time.
Returns:
The exception raised by the call that the future represents or None
if the call completed without raising.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
-
add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。 def add_done_callback(self, fn):
"""Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
try:
fn(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
简单示例1
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future
import time
def get_html(times):
time.sleep(times)
print(f'get page {times} success')
return times
executor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(get_html, 3)
task2 = executor.submit(get_html, 2)
print(task1.done())
time.sleep(4)
print(task1.done())
print(task1.result())
print(task2.cancel())
获取已经成功的 task 的返回值
as_completed
as_completed(fs, timeout=None)
先收集已完成的 task 输出,然后循环等待未完成的 task,直到所有剩余 task 列表被清空。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def get_html(times):
time.sleep(times)
print(f'get page {times} success')
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
all_task = [executor.submit(get_html, url) for url in urls]
for future in as_completed(all_task):
data = future.result()
print(f'get {data} page')
结果:
get page 2 success
get 2 page
get page 3 success
get 3 page
get page 4 success
get 4 page
可以看到这里是按 task 的完成顺序来打印的。谁先完成打印谁
executor.map
map(func, *iterables, timeout=None, chunksize=1)
from concurrent.futures import ThreadPoolExecutor
import time
def get_html(times):
time.sleep(times)
print(f'get page {times} success')
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
for data in executor.map(get_html, urls):
print(f'get {data} page')
结果:
get page 2 success
get page 3 success
get 3 page
get 2 page
get page 4 success
get 4 page
可以看到这里是按照参数列表里的顺序打印
wait
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
阻塞主线程,直到满足条件
- fs:future 对象的序列
- timeout:最大阻塞时间。如果没有,则无限制
- return_when:何时返回。有3个选项:FIRST_COMPLETED(完成1个)、FIRST_EXCEPTION(报错1个)、ALL_COMPLETED(完成所有)。默认是完成所有
示例:
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
import time
def get_html(times):
time.sleep(times)
print(f'get page {times} success')
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
all_task = [executor.submit(get_html, url) for url in urls]
wait(all_task)
print('end')
结果:
get page 2 success
get page 3 success
get page 4 success
end
ThreadPoolExecutor 源码
init
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
"""
if max_workers is None:
max_workers = min(32, (os.cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
self._initializer = initializer
self._initargs = initargs
submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock, _global_shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
if _shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
这里面的 _WorkItem 对象负责运行任务和 future 对象进行设置,最后将 future 对象返回。不阻塞,立即返回
_WorkItem 对象
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
self = None
else:
self.future.set_result(result)
__class_getitem__ = classmethod(types.GenericAlias)
_adjust_thread_count
def _adjust_thread_count(self):
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
如果当前线程数小于最大线程数,就创建线程,线程使用的函数是 _worker ,参数是弱引用和 task队列
_worker ( _idle_semaphore空闲信号量 暂时没理解)
def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
del work_item
executor = executor_reference()
if executor is not None:
executor._idle_semaphore.release()
del executor
continue
executor = executor_reference()
if _shutdown or executor is None or executor._shutdown:
if executor is not None:
executor._shutdown = True
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
|