IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Python3 ThreadPoolExecutor线程池 笔记 -> 正文阅读

[Python知识库]Python3 ThreadPoolExecutor线程池 笔记

Python3 ThreadPoolExecutor线程池

用到的模块:concurrent.futures

为什么需要线程池

主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值。

futures可以让多线程和多进程编码接口一致

怎么使用

线程池的基类是concurrent.futures模块中的Executor,Executor提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 对象类主要用于获取线程任务函数的返回值。由于是异步任务,Python 用 Future 对象来作为 task 的返回容器。

Future 未来对象

Futrure 提供了以下方法:

class Future(object):
    """Represents the result of an asynchronous computation."""

    def __init__(self):
        """Initializes the future. Should not be called by clients."""
        self._condition = threading.Condition()	# 这里使用了threading中的条件变量 控制线程间同步
        self._state = PENDING	# 初始状态为待办
        self._result = None		# 运行结果
        self._exception = None	# 发生的异常
        self._waiters = []
        self._done_callbacks = []	# 回调函数集
  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行或者已完成,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。

    def cancel(self):
        """Cancel the future if possible.
    
            Returns True if the future was cancelled, False otherwise. A future
            cannot be cancelled if it is running or has already completed.
        """
        with self._condition:
            if self._state in [RUNNING, FINISHED]:	#
                return False
    
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:	#
                return True
    
            self._state = CANCELLED
            self._condition.notify_all()
    
            self._invoke_callbacks()
            return True
    
  • cancelled():返回 Future 代表的线程任务是否被成功取消。

    def cancelled(self):
        """Return True if the future was cancelled."""
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
    
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。

    def running(self):
        """Return True if the future is currently executing."""
        with self._condition:
            return self._state == RUNNING
    
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。

    def done(self):
        """Return True of the future was cancelled or finished executing."""
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
    
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。

    def result(self, timeout=None):
        """Return the result of the call that the future represents.
    
        Args:
        timeout: The number of seconds to wait for the result if the future
        isn't done. If None, then there is no limit on the wait time.
    
        Returns:
        The result of the call that the future represents.
    
        Raises:
        CancelledError: If the future was cancelled.
        TimeoutError: If the future didn't finish executing before the given
        timeout.
        Exception: If the call raised then that exception will be raised.
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
                elif self._state == FINISHED:
                    return self.__get_result()
    
                self._condition.wait(timeout)	# 阻塞 获取结果
    
                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                    raise CancelledError()
                    elif self._state == FINISHED:
                        return self.__get_result()
                    else:
                        raise TimeoutError()
    
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。

    def exception(self, timeout=None):
        """Return the exception raised by the call that the future represents.
    
            Args:
                timeout: The number of seconds to wait for the exception if the
                    future isn't done. If None, then there is no limit on the wait
                    time.
    
            Returns:
                The exception raised by the call that the future represents or None
                if the call completed without raising.
    
            Raises:
                CancelledError: If the future was cancelled.
                TimeoutError: If the future didn't finish executing before the given
                    timeout.
        """
    
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
                elif self._state == FINISHED:
                    return self._exception
    
                self._condition.wait(timeout)	# 阻塞 获取异常 同上result
    
                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                    raise CancelledError()
                    elif self._state == FINISHED:
                        return self._exception
                    else:
                        raise TimeoutError()
    
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

    def add_done_callback(self, fn):
        """Attaches a callable that will be called when the future finishes.
    
            Args:
                fn: A callable that will be called with this future as its only
                    argument when the future completes or is cancelled. The callable
                    will always be called by a thread in the same process in which
                    it was added. If the future has already completed or been
                    cancelled then the callable will be called immediately. These
                    callables are called in the order that they were added.
        """
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)	# 如果未开始或者执行中,添加fn进回调函数列表
                return
            try:
                fn(self)
                except Exception:
                    LOGGER.exception('exception calling callback for %r', self)
    

简单示例1

from concurrent.futures import ThreadPoolExecutor

from concurrent.futures import Future
# 未来对象 task的返回容器

import time


# 定义一个准备作为线程任务的函数
def get_html(times):
    time.sleep(times)
    print(f'get page {times} success')
    return times


# 1.创建一个包含2条线程的线程池
executor = ThreadPoolExecutor(max_workers=2)

# 2.通过submit函数提交执行的函数到线程池中,submit 是立即返回一个Future实例
task1 = executor.submit(get_html, 3)
task2 = executor.submit(get_html, 2)

# done 方法用于判定某个任务是否完成 不阻塞 立即返回
print(task1.done())	# False
time.sleep(4)
print(task1.done())	# True

# result 方法可以获取task的执行结果 阻塞
print(task1.result())	# 3

# cancel 方法可以取消一个任务,无法取消执行中或者已完成的任务,返回False
print(task2.cancel())	# False  因为线程池有2个线程,这一刻task2已经执行完,所以取消失败

获取已经成功的 task 的返回值

as_completed

as_completed(fs, timeout=None)

先收集已完成的 task 输出,然后循环等待未完成的 task,直到所有剩余 task 列表被清空。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time


# 定义一个准备作为线程任务的函数
def get_html(times):
    time.sleep(times)
    print(f'get page {times} success')
    return times


# 1.创建一个包含2条线程的线程池
executor = ThreadPoolExecutor(max_workers=2)

# 要获取已经成功的task的返回值 as_completed
urls = [3, 2, 4]
all_task = [executor.submit(get_html, url) for url in urls]

for future in as_completed(all_task):	# as_completed返回的是 future
    data = future.result()
    print(f'get {data} page')

结果:

get page 2 success
get 2 page
get page 3 success
get 3 page
get page 4 success
get 4 page

可以看到这里是按 task 的完成顺序来打印的。谁先完成打印谁

executor.map

map(func, *iterables, timeout=None, chunksize=1)

from concurrent.futures import ThreadPoolExecutor
import time


# 定义一个准备作为线程任务的函数
def get_html(times):
    time.sleep(times)
    print(f'get page {times} success')
    return times


# 1.创建一个包含2条线程的线程池
executor = ThreadPoolExecutor(max_workers=2)

# 要获取已经成功的task的返回值
urls = [3, 2, 4]

# 通过 executor 的map 获取已经完成的task  
for data in executor.map(get_html, urls):	# map返回的是future.result()
    print(f'get {data} page')

结果:

get page 2 success
get page 3 success
get 3 page
get 2 page
get page 4 success
get 4 page

可以看到这里是按照参数列表里的顺序打印

wait

def wait(fs, timeout=None, return_when=ALL_COMPLETED):

阻塞主线程,直到满足条件

  • fs:future 对象的序列
  • timeout:最大阻塞时间。如果没有,则无限制
  • return_when:何时返回。有3个选项:FIRST_COMPLETED(完成1个)、FIRST_EXCEPTION(报错1个)、ALL_COMPLETED(完成所有)。默认是完成所有

示例:

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
import time


# 定义一个准备作为线程任务的函数
def get_html(times):
    time.sleep(times)
    print(f'get page {times} success')
    return times


# 1.创建一个包含2条线程的线程池
executor = ThreadPoolExecutor(max_workers=2)

# 要获取已经成功的task的返回值 as_completed
urls = [3, 2, 4]
all_task = [executor.submit(get_html, url) for url in urls]
wait(all_task)	# 主进程阻塞,默认完成所有task后结束
print('end')

结果:

get page 2 success
get page 3 success
get page 4 success
end

ThreadPoolExecutor 源码

init

def __init__(self, max_workers=None, thread_name_prefix='',
             initializer=None, initargs=()):
    """Initializes a new ThreadPoolExecutor instance.

    Args:
        max_workers: The maximum number of threads that can be used to
            execute the given calls.
        thread_name_prefix: An optional name prefix to give our threads.
        initializer: A callable used to initialize worker threads.
        initargs: A tuple of arguments to pass to the initializer.
    """
    if max_workers is None:
        # ThreadPoolExecutor is often used to:
        # * CPU bound task which releases GIL
        # * I/O bound task (which releases GIL, of course)
        #
        # We use cpu_count + 4 for both types of tasks.
        # But we limit it to 32 to avoid consuming surprisingly large resource
        # on many core machine.
        max_workers = min(32, (os.cpu_count() or 1) + 4)	# 1. 在这里设置最大线程数
    if max_workers <= 0:
        raise ValueError("max_workers must be greater than 0")

    if initializer is not None and not callable(initializer):
        raise TypeError("initializer must be a callable")

    self._max_workers = max_workers
    self._work_queue = queue.SimpleQueue()	# 2. 任务队列,submit后将task放入队列中
    self._idle_semaphore = threading.Semaphore(0)
    self._threads = set()	# 3. 启动的线程集合
    self._broken = False
    self._shutdown = False
    self._shutdown_lock = threading.Lock()	# 4. submit 和 shutdown
    self._thread_name_prefix = (thread_name_prefix or
                                ("ThreadPoolExecutor-%d" % self._counter()))
    self._initializer = initializer
    self._initargs = initargs

submit

def submit(self, fn, /, *args, **kwargs):
    with self._shutdown_lock, _global_shutdown_lock:
        if self._broken:
            raise BrokenThreadPool(self._broken)

        if self._shutdown:
           raise RuntimeError('cannot schedule new futures after shutdown')
        if _shutdown:
           raise RuntimeError('cannot schedule new futures after '
                                       'interpreter shutdown')

        f = _base.Future()	# 1. 这里创建了一个Future对象
        w = _WorkItem(f, fn, args, kwargs)	# 2. 这个对象的run方法里会运行 fn 函数

        self._work_queue.put(w)		# 3. 将其加入工作队列
        self._adjust_thread_count()
        return f

这里面的 _WorkItem 对象负责运行任务和 future 对象进行设置,最后将 future 对象返回。不阻塞,立即返回

_WorkItem 对象
class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)	# 1. 运行 fn 函数
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:
            self.future.set_result(result)	# 2. 给future对象设置结果

    __class_getitem__ = classmethod(types.GenericAlias)

_adjust_thread_count

def _adjust_thread_count(self):
    # if idle threads are available, don't spin new threads
    if self._idle_semaphore.acquire(timeout=0):
        return

    # When the executor gets lost, the weakref callback will wake up
    # the worker threads.
    def weakref_cb(_, q=self._work_queue):
        q.put(None)

    num_threads = len(self._threads)
    if num_threads < self._max_workers:
        thread_name = '%s_%d' % (self._thread_name_prefix or self,
                                 num_threads)
        t = threading.Thread(name=thread_name, target=_worker,
                             args=(weakref.ref(self, weakref_cb),
                                   self._work_queue,
                                   self._initializer,
                                   self._initargs))
        t.start()
        self._threads.add(t)
        _threads_queues[t] = self._work_queue

如果当前线程数小于最大线程数,就创建线程,线程使用的函数是 _worker ,参数是弱引用和 task队列

_worker ( _idle_semaphore空闲信号量 暂时没理解)

def _worker(executor_reference, work_queue, initializer, initargs):
    if initializer is not None:
        try:
            initializer(*initargs)
        except BaseException:
            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
            executor = executor_reference()
            if executor is not None:
                executor._initializer_failed()
            return
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item

                # attempt to increment idle count
                executor = executor_reference()
                if executor is not None:
                    executor._idle_semaphore.release()
                del executor
                continue

            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Flag the executor as shutting down as early as possible if it
                # is not gc-ed yet.
                if executor is not None:
                    executor._shutdown = True
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-10-18 17:21:19  更:2021-10-18 17:22:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 21:07:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码