as_completed和wait源码分析

Python 小记 2019-02-22 10352 字 2697 浏览 点赞

前言

在ThreadPoolExecutor引导的多线程开发中,有as_completed()wait()两个辅助函数。下面结合源码分析它们各自作用。因后面多次提到事件锁,也许,你需要对它事先了解——Python同步机制

(以下基于Python3.7)

as_complete

def greet():
    print("hello world")

if __name__ == "__main__":
    executor = ThreadPoolExecutor()

    task = executor.submit(greet)
    print(type(task))  # 输出:<class 'concurrent.futures._base.Future'>

    results = as_completed([task])
    print(type(results))  # 输出:<class 'generator'>

    for result in results:
        print(type(result))  # 输出:<class 'concurrent.futures._base.Future'>

最初知道有as_completed()这个函数我是很不解的,因为如上面代码所示,submit()提交任务后拿到的返回值是Future对象,经过as_completed包装后成了生成器,但是打开生成器一看,结果还是Future对象!这破玩意就这点用处?显示不是。官方文档对其做的说明,揭露了它的本质:

Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or were cancelled). Any futures given by fs that are duplicated will be returned once. Any futures that completed before as_completed() is called will be yielded first. The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

在这里边有两句话比较重要:

  • Any futures given by fs that are duplicated will be returned once.
  • Any futures that completed before as_completed() is called will be yielded first.

先看第一句:当future重复时,只返回一次。示例如下:

def greet(word):
    return word

if __name__ == "__main__"
    executor = ThreadPoolExecutor()

    tasks = [executor.submit(greet, word) for word in ["hello", "world"]]
    tasksDouble = tasks * 2  # futures x 2
    for item in as_completed(tasksDouble):
        print(item.result())

# 输出:
hello
world

可以看出,我们对tasks做了乘2操作,但是经手as_completed()之后并没有重复打印hello或者word。说明在as_completed中有去重操作。Python内部仅仅做了一个很简单的处理——集合真是强大的去重助理。

# as_completed源码
def as_completed(fs, timeout=None):
    ...
    fs = set(fs)  # 去重操作
    ...

再看第二句:as_completed会先把该函数调用之前完成的furture依次yeild出去。也就是说,返回结果不会顺序了。似乎莫名奇妙,但我们来看看用as_completed和不用的区别。

# 不使用as_completed
def print_num(order):
    """
    i 表示线程启动次序
    通过随机获取num, 使得线程与线程之间的结束时间可能不同
    """
    num = random.randrange(10)
    time.sleep(num)

    ordict = collections.OrderedDict()
    ordict["oroder"] = order
    ordict["value"] = num
    return ordict  # 最后打印调用次序以及线程运行的近似时间

if __name__ == "__main__":
    executor = ThreadPoolExecutor()
    alltasks = [executor.submit(print_num, i) for i in range(10)]

    for task in alltasks:
        print(task.result())

# 输出:
OrderedDict([('oroder', 0), ('value', 5)])
OrderedDict([('oroder', 1), ('value', 3)])
OrderedDict([('oroder', 2), ('value', 1)])
OrderedDict([('oroder', 3), ('value', 1)])
OrderedDict([('oroder', 4), ('value', 1)])
OrderedDict([('oroder', 5), ('value', 7)])
OrderedDict([('oroder', 6), ('value', 0)])
OrderedDict([('oroder', 7), ('value', 0)])
OrderedDict([('oroder', 8), ('value', 6)])
OrderedDict([('oroder', 9), ('value', 1)])

可以看到输出结果按照了线程启动的顺序,那么使用as_completed又会怎样呢?

def print_num(order):

    num = random.randrange(10)
    time.sleep(num)
    ordict = collections.OrderedDict()

    ordict["oroder"] = order
    ordict["value"] = num
    return ordict

if __name__ == "__main__":
    executor = ThreadPoolExecutor()
    alltasks = [executor.submit(print_num, i) for i in range(10)]

    for task in as_completed(alltasks):
        print(task.result())

# 输出:
OrderedDict([('oroder', 9), ('value', 1)])
OrderedDict([('oroder', 1), ('value', 3)])
OrderedDict([('oroder', 2), ('value', 6)])
OrderedDict([('oroder', 4), ('value', 6)])
OrderedDict([('oroder', 0), ('value', 7)])
OrderedDict([('oroder', 3), ('value', 7)])
OrderedDict([('oroder', 5), ('value', 7)])
OrderedDict([('oroder', 6), ('value', 9)])
OrderedDict([('oroder', 7), ('value', 9)])
OrderedDict([('oroder', 8), ('value', 9)])

显然,先执行完的先输出

实现逻辑从as_completed函数的try语句开始,

def as_completed(fs, timeout=None):
    ...
    try:
        # 先yield出所有已经结束的future,
        # 包括了两个状态:CANCELLED_AND_NOTIFIED 和 FINISHED
        yield from _yield_finished_futures(finished, waiter,
                                           ref_collect=(fs,))
        # pending里面存放的是还没有结束的future
        while pending:
            ...  # 处理超时逻辑
            waiter.event.wait(wait_timeout)  # 使用event锁阻塞程序
            # 直到finished_futures中有了内容,程序向下执行
            
            with waiter.lock:
                finished = waiter.finished_futures
                waiter.finished_futures = []
                waiter.event.clear()  # 重置event锁的状态,为下次阻塞程序做准备

            # reverse to keep finishing order
            finished.reverse()
            # yield出去已经结束的future
            yield from _yield_finished_futures(finished, waiter,
                                               ref_collect=(fs, pending))
    finally:
        ...

单看as_completed中的代码,并不能直接获悉上边注释里的内容,因为涉及到太多函数之间的调用。避免篇幅冗长,捡重要说。

waiter = _create_and_install_waiters(fs, _AS_COMPLETED)这一句拿到的waiter是_AsCompletedWaiter的对象,这个对象的add_result()存在释放锁event锁的行为,使得程序可以通过event.wait向下执行。

# 从as_completed中的_create_and_install_waiters跳进去
class _AsCompletedWaiter(_Waiter):
    """Used by as_completed()."""
    ...
    def add_result(self, future):
        with self.lock:
            super(_AsCompletedWaiter, self).add_result(future)
            self.event.set()  # 释放event锁
    ...

_yield_finished_futures()方法会返回已经完成的future,同时还会把这个future从pending中移除,达到结束while循环的目的。

def _yield_finished_futures(fs, waiter, ref_collect):
    # fs表示已经完成的future
    # ref_collect为元组类型,
    #    第一个元素为全部future,
    #    第二个元素pendding,也就是as_completed中待解决的future
    while fs:
        f = fs[-1]
        for futures_set in ref_collect:
            futures_set.remove(f)  # 移除已经完成的future
        with f._condition:
            f._waiters.remove(waiter)
        del f
        # Careful not to keep a reference to the popped value
        yield fs.pop()

wait

def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    ...

wait()函数如其名,只为等待。它返回的对象是可迭代类型(你可以把它当作元组看待,它是collections.namedtuple的实例),里边有两个集合类型的元素,第一个集合装着已经完成的future,第二个集合装着未完成的future。

根据return_when接收到的参数不同,决定wait()什么时候返回。源码中的docstring对其做了这样说明:

  • FIRST_COMPLETED - Return when any future finishes or is cancelled.
  • FIRST_EXCEPTION - Return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.
  • ALL_COMPLETED - Return when all futures finish or are cancelled.

也就是说,return_when有三种状态。当值为FIRST_COMPLETED时,一旦有future完成或者被取消,返回;当值为FIRST_EXCEPTION时,一旦有future抛出异常就返回,倘若没有异常,它就等效于ALL_COMPLETED;当值为ALL_COMPLETED时,直到所有future结束或者被取消才会返回。

源码分析如下:

# wait源码
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    with _AcquireFutures(fs):
        # 先筛选出完成或被取消的future,放进done中
        done = set(f for f in fs 
                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
        not_done = set(fs) - done
        
        # 当return_when=FIRST_COMPLETED,同时done不为空,返回
        if (return_when == FIRST_COMPLETED) and done:
            return DoneAndNotDoneFutures(done, not_done)
        
        # 当return_when=FIRST_EXCEPTION,同时done不为空,也有异常存在,返回
        elif (return_when == FIRST_EXCEPTION) and done:
            if any(f for f in done
                   if not f.cancelled() and f.exception() is not None):
                return DoneAndNotDoneFutures(done, not_done)
        
        # len(done)等于len(fs)时,说明所有future都结束了,返回
        if len(done) == len(fs):
            return DoneAndNotDoneFutures(done, not_done)
        
        # -------------------------------------------------------
        # 上述代码对futures做了一遍过滤,我想这是为了尽快返回结果的策略
        # 如果运行到这儿,都还没退出wait,此时借助_create_and_install_waiters帮忙
        waiter = _create_and_install_waiters(fs, return_when)

    waiter.event.wait(timeout)  # 程序会在此阻塞,直到事件锁释放
    for f in fs:
        with f._condition:
            f._waiters.remove(waiter)

    done.update(waiter.finished_futures)
    return DoneAndNotDoneFutures(done, set(fs) - done)

_create_and_install_waiters()函数的作用类似于分发任务,讲as_completed()时也简单提到过。其处理逻辑是:当as_completed调用它时,任务交给_AsCompletedWaiter();当wait调用它,且return_when=FIRST_COMPLETED是,任务交给_FirstCompletedWaiter();当wait调用它,且return_when=FIRST_EXCEPTION或者ALL_COMPLETED时,任务交给_AllCompletedWaiter()

# _FirstCompletedWaiter源码
class _FirstCompletedWaiter(_Waiter):
    """Used by wait(return_when=FIRST_COMPLETED)."""

    def add_result(self, future):
        super().add_result(future)
        self.event.set()

    def add_exception(self, future):
        super().add_exception(future)
        self.event.set()

    def add_cancelled(self, future):
        super().add_cancelled(future)
        self.event.set()

event.set()会释放事件锁,所以add_result()add_cancelled()中都调用了。需要注意的是,add_exception也调用了set,所以在FIRST_COMPLETED状态下,出现第一个异常也能导致wait立马返回。

# _AllCompletedWaiter源码
class _AllCompletedWaiter(_Waiter):
    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""

    def __init__(self, num_pending_calls, stop_on_exception):
        self.num_pending_calls = num_pending_calls  # 计数器
        self.stop_on_exception = stop_on_exception
        self.lock = threading.Lock()
        super().__init__()
    
    def _decrement_pending_calls(self):
        with self.lock:
            self.num_pending_calls -= 1
            if not self.num_pending_calls:
                self.event.set()
    
    def add_exception(self, future):
        super().add_exception(future)
        if self.stop_on_exception:  # 如果是FIRST_EXCEPTION,出现异常就释放锁
            self.event.set()
        else:
            self._decrement_pending_calls()  # 否则对计数器self.num_pending_calls减1,
                                             # 直到self.num_pending_calls = 0,释放锁
    ...
                                          

FIRST_EXCEPTION和ALL_COMPLETED都使用了_AllCompletedWaiter的代码,区别在于前者self.stop_on_exception=True,后者=False。



本文由 Guan 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论