雷茂的日志– Python AsyncIO的等待对象:协程,未来和任务


15 min


# 介绍

Pythonasyncio是高效单线程并发应用程序的库。在上一篇博客文章“Python AsyncIO 事件循环” 中,我们asyncio通过查看 Python 源代码了解了 Python 中的事件循环。这似乎对理解 Python 的asyncio工作原理很有效。

在这篇博客文章中,我想进一步走一步,并讨论了三个关键的机制asyncioawaitables,其中包括CoroutineFuture,和Task,通过在 Python 源代码重新寻找。

# 协程

从 Python 3.5 开始,coroutine使用定义函数,async defCoroutine通过调用coroutine函数创建对象。的抽象类Coroutine如下。它没有方法重载,因为派生的类和方法重载是由 Python 解释器为coroutine使用定义的函数生成的async defCoroutine上课的关键方法是send。它试图模仿蹦床的行为。

class Coroutine(Awaitable):
    __slots__ = ()
    def send(self, value):
        """Send a value into the coroutine.
        Return next yielded value or raise StopIteration.
        raise StopIteration
    def throw(self, typ, val=None, tb=None):
        """Raise an exception in the coroutine.
        Return next yielded value or raise StopIteration.
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val
    def close(self):
        """Raise GeneratorExit inside coroutine.
        except (GeneratorExit, StopIteration):
            raise RuntimeError("coroutine ignored GeneratorExit")
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented 

“幸运的是”,Pythonasyncio coroutine曾经@asyncio.coroutine在 Python 3.4 中的 Python 生成器上使用装饰器实现。希望coroutinePython 3.5 + 中的逻辑类似于coroutinePython 3.4 中coroutine在调用时产生 sub 的逻辑。

A typical coroutine could be implemented using a Python generator just like the follows.

import asyncio
import datetime
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        if (loop.time() + 1.0) >= end_time:
        yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop))

The @asyncio.coroutine decorator implementation is as follows.

def coroutine(func):
    """Decorator to mark coroutines.
    If the coroutine is not yielded from before it is destroyed,
    an error message is logged.
    warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
    if inspect.iscoroutinefunction(func):
        # In Python 3.5 that's all we need to do for coroutines
        # defined with "async def".
        return func
    if inspect.isgeneratorfunction(func):
        coro = func
        def coro(*args, **kw):
            res = func(*args, **kw)
            if (base_futures.isfuture(res) or inspect.isgenerator(res) or
                    isinstance(res, CoroWrapper)):
                res = yield from res
                # If 'res' is an awaitable, run it.
                    await_meth = res.__await__
                except AttributeError:
                    if isinstance(res,
                        res = yield from await_meth()
            return res
    coro = types.coroutine(coro)
    if not _DEBUG:
        wrapper = coro
        def wrapper(*args, **kwds):
            w = CoroWrapper(coro(*args, **kwds), func=func)
            if w._source_traceback:
                del w._source_traceback[-1]
            # Python < 3.5 does not implement __qualname__
            # on generator objects, so we set it manually.
            # We use getattr as some callables (such as
            # functools.partial may lack __qualname__).
            w.__name__ = getattr(func, '__name__', None)
            w.__qualname__ = getattr(func, '__qualname__', None)
            return w
    wrapper._is_coroutine = _is_coroutine  # For iscoroutinefunction().
    return wrapper 

Without looking into the details, this @asyncio.coroutine decorator almost does not change the generator at all, since most likely wrapper coro.

When we tried to run coroutine with loop.run_until_complete, we see from the comment that if the argument is a coroutine then it would be converted to a Task in the first place, and loop.run_until_complete is actually scheduling Tasks. So we would look into Task shortly.

# Future

Future has closed relationship with Task, so let’s look at Future first.

Future use has an event loop. By default, it is the event loop in the main thread.

class Future:
    """This class is *almost* compatible with concurrent.futures.Future.
    - This class is not thread-safe.
    - result() and exception() do not take a timeout argument and
      raise an exception when the future isn't done yet.
    - Callbacks registered with add_done_callback() are always called
      via the event loop's call_soon().
    - This class is not compatible with the wait() and as_completed()
      methods in the concurrent.futures package.
    (In Python 3.4 or later we may be able to unify the implementations.)
    # Class variables serving as defaults for instance variables.
    _state = _PENDING
    _result = None
    _exception = None
    _loop = None
    _source_traceback = None
    # This field is used for a dual purpose:
    # - Its presence is a marker to declare that a class implements
    #   the Future protocol (i.e. is intended to be duck-type compatible).
    #   The value must also be not-None, to enable a subclass to declare
    #   that it is not compatible by setting this to None.
    # - It is set by __iter__() below so that Task._step() can tell
    #   the difference between
    #   `await Future()` or`yield from Future()` (correct) vs.
    #   `yield Future()` (incorrect).
    _asyncio_future_blocking = False
    __log_traceback = False
    def __init__(self, *, loop=None):
        """Initialize the future.
        The optional event_loop argument allows explicitly setting the event
        loop object used by the future. If it's not provided, the future uses
        the default event loop.
        if loop is None:
            self._loop = events.get_event_loop()
            self._loop = loop
        self._callbacks = []
        if self._loop.get_debug():
            self._source_traceback = format_helpers.extract_stack(
    _repr_info = base_futures._future_repr_info 

The key method of Future is future.set_result. Let’s check what will happen if we call future.set_result.

 def set_result(self, result):
        """Mark the future done and set its result.
        If the future is already done when this method is called, raises
        if self._state != _PENDING:
            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
        self._result = result
        self._state = _FINISHED

 def __schedule_callbacks(self):
        """Internal: Ask the event loop to call all callbacks.
        The callbacks are scheduled to be called as soon as possible. Also
        clears the callback list.
        callbacks = self._callbacks[:]
        if not callbacks:
        self._callbacks[:] = []
        for callback, ctx in callbacks:
            self._loop.call_soon(callback, self, context=ctx) 

Once future.set_result is called, it would trigger self.__schedule_callbacks asking the even loop to call all the callbacks related to the Future as soon as possible. These Future related callbacks are added or removed by future.add_done_callback or future.remove_done_callback. If no Future related callbacks, no more callbacks are scheduled in the event loop.

So we have known what will happen after the Future got result. What happens when the Future is scheduled in the event loop?

From the last blog post “Python AsyncIO Event Loop”, we have seen the Future was scheduled into the event loop via loop.ensure_future. “If the argument is a Future, it is returned directly.” So when the Future is scheduled in the event loop, there is almost no callback scheduled, until the future.set_result is called. (I said almost no callback because there is a default callback _run_until_complete_cb added as we have seen in the last blog post.)

def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.
    If the argument is a Future, it is returned directly.
    if coroutines.iscoroutine(coro_or_future):
        if loop is None:
            loop = events.get_event_loop()
        task = loop.create_task(coro_or_future)
        if task._source_traceback:
            del task._source_traceback[-1]
        return task
    elif futures.isfuture(coro_or_future):
        if loop is not None and loop is not futures._get_loop(coro_or_future):
            raise ValueError('The future belongs to a different loop than '
                             'the one specified as the loop argument')
        return coro_or_future
    elif inspect.isawaitable(coro_or_future):
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

# Task

Because _PyFuture = Future, Task is just a derived class of Future. The task of a Task is to wrap a coroutine in a Future.

class Task(futures._PyFuture):  # Inherit Python Task implementation
                                # from a Python Future implementation. 
    """A coroutine wrapped in a Future."""
    # An important invariant maintained while a Task not done:
    # - Either _fut_waiter is None, and _step() is scheduled;
    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
    # The only transition from the latter to the former is through
    # _wakeup().  When _fut_waiter is not None, one of its callbacks
    # must be _wakeup(). 
    # If False, don't log a message if the task is destroyed whereas its
    # status is still pending
    _log_destroy_pending = True
    def __init__(self, coro, *, loop=None, name=None):
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            # raise after Future.__init__(), attrs are required for __del__
            # prevent logging for pending task in __del__
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")
        if name is None:
            self._name = f'Task-{_task_name_counter()}'
            self._name = str(name)
        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro
        self._context = contextvars.copy_context()
        self._loop.call_soon(self.__step, context=self._context)

In the constructor, we see that the Task schedules a callback self.__step in the event loop. The task.__step is a long method, but we should just pay attention to the try block and the else block since these two are the ones mostly likely to be executed.

 def __step(self, exc=None):
        if self.done():
            raise exceptions.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, exceptions.CancelledError):
                exc = self._make_cancelled_error()
            self._must_cancel = False
        coro = self._coro
        self._fut_waiter = None
        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
        except exceptions.CancelledError as exc:
            # Save the original exception so we can chain it later.
            self._cancelled_exc = exc
            super().cancel()  # I.e., Future.cancel(self).
        except (KeyboardInterrupt, SystemExit) as exc:
        except BaseException as exc:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                        self.__step, new_exc, context=self._context)
                elif blocking:
                    if result is self:
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                            self.__step, new_exc, context=self._context)
                        result._asyncio_future_blocking = False
                            self.__wakeup, context=self._context)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel(
                                self._must_cancel = False
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                        self.__step, new_exc, context=self._context)
            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                    self.__step, new_exc, context=self._context)
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                    self.__step, new_exc, context=self._context)
            _leave_task(self._loop, self)
            self = None  # Needed to break cycles when an exception occurs. 

Here we see the coroutine.send method again. Each time we call coroutine.send in the try block, we get a result. In the else blcok, we always have another self._loop.call_soon call. We do this in a trampoline fashion until Coroutine runs out of results to send.

# Trampoline Function

import asyncio
import time
def trampoline(loop: asyncio.BaseEventLoop, name: str = "") -> None:
    current_time = time.time()
    loop.call_later(0.5, trampoline, loop, name)
    return current_time
loop = asyncio.get_event_loop()
loop.call_soon(trampoline, loop, "")
loop.call_later(5, loop.stop)

The flavor of the wrapping of Task to Coroutine is somewhat similar to trampoline. Every time we call coroutine.send, we got some returned values and scheduled another callback.

# Conclusion


到调度所述密钥的密钥asyncioawaitables, ,CoroutineFutureTask,是该 awaitables 都包装成Future在引擎盖下的一些方式asyncio的接口。

上次编辑于: 5/20/2021, 7:26:49 AM