服务报价 | 域名主机 | 网络营销 | 软件工具| [加入收藏]
 热线电话: #
当前位置: 主页 > 开发教程 > python教程 >

Python的concurrent.futures理解

时间:2016-04-14 20:36来源:未知 作者:最模板 点击:
读了 concurrent.futures 源码,记录一下实现原理。 主要包括三个文件:_base.py、thread.py 和 process.py,_base.py 主要是 Future 本身的内容,thread.py 和 process.py 是 Future 的执行器。 _base.py 主要定

读了 concurrent.futures 源码,记录一下实现原理。

主要包括三个文件:_base.py、thread.py 和 process.py,_base.py 主要是 Future 本身的内容,thread.py 和 process.py 是 Future 的执行器。

_base.py 主要定了三部分内容:

1). waiter 类。

class _Waiter(object):

class _AsCompletedWaiter(_Waiter):

class _FirstCompletedWaiter(_Waiter):

class _AllCompletedWaiter(_Waiter):

waiter 类用来等待 Future 执行完,_Waiter 里定义了 threading.Event(),_AsCompletedWaiter 每个 Future 完成都会触发 event.set(),_FirstCompletedWaiter 每个 Future 完成也会触发,_AllCompletedWaiter 会等所有 Future 完成才触发 event.set()。

另外,_AsCompletedWaiter 和 _AllCompletedWaiter 还有把锁 threading.Lock()。

2). 辅助函数。

def _create_and_install_waiters(fs, return_when):

def as_completed(fs, timeout=None):

def wait(fs, timeout=None, return_when=ALL_COMPLETED):

_create_and_install_waiters 是对 Future 列表 fs 创建和安装 waiter,创建好响应的 waiter 之后,会对 fs 中的每一个 Future 增加此 waiter (Future 有个列表变量 _waiters,加入即可),并且返回此 waiter;

as_completed 是一个生成器,配合 for 使用可以循环得到已经完成的 Future,as_completed 使用了 _create_and_install_waiters;

wait 用于等待 Future 列表依次完成。

3). Future 类和 Executor 类。

Future 类的成员变量:

self._condition = threading.Condition()

self._state = PENDING

self._result = None

self._exception = None

self._traceback = None

self._waiters = []

self._done_callbacks = []

_condition 用于控制 Future 内部的条件,比如 result() 要得到值,如果没有完成就要_condition.wait,直到 set_result() 触发 _condition.notify_all(),当然,cancel() 也可以触发 _condition.notify_all()。

Future 支持 callback,记录在 _done_callbacks,Future 完成后会执行这些 callback。

Executor 类供继承,需要实现 submit 方法,thread.py 中的 ThreadPoolExecutor 和 process.py 中的 ProcessPoolExecutor 都实现了此类。

thread.py 主要包括三部分:

class _WorkItem(object):

def _worker(executor_reference, work_queue):

class ThreadPoolExecutor(_base.Executor):

_WorkItem 是 Future 的包装,变量有 self.future、self.fn (执行的函数)、self.args 和 self.kwargs,里面的 run() 函数用来执行 Future 并设置 Future 信息,_state 会被设置成 FINISHED,如果是无异常会设置 _result,否则设置 _exception 和 _traceback。

_worker 不断从 _work_queue 队列中取 Future 并执行。

ThreadPoolExecutor 实现 _base.Executor,主要变量是 _max_workers  和 _work_queue,_max_workers 是最大线程数,_work_queue 是 queue.Queue(),当调用 submit 的时候会把 Future 包装成 _WorkItem,放入 _work_queue,然后开启最多 _max_workers 的线程去执行 _worker (不断读取队列并执行 )。

process.py

(责任编辑:最模板)
顶一下
(0)
0%
踩一下
(0)
0%
------分隔线----------------------------
栏目列表
热点内容