| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845 |
- """``tornado.gen`` implements generator-based coroutines.
- .. note::
- The "decorator and generator" approach in this module is a
- precursor to native coroutines (using ``async def`` and ``await``)
- which were introduced in Python 3.5. Applications that do not
- require compatibility with older versions of Python should use
- native coroutines instead. Some parts of this module are still
- useful with native coroutines, notably `multi`, `sleep`,
- `WaitIterator`, and `with_timeout`. Some of these functions have
- counterparts in the `asyncio` module which may be used as well,
- although the two may not necessarily be 100% compatible.
- Coroutines provide an easier way to work in an asynchronous
- environment than chaining callbacks. Code using coroutines is
- technically asynchronous, but it is written as a single generator
- instead of a collection of separate functions.
- For example, here's a coroutine-based handler:
- .. testcode::
- class GenAsyncHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- http_client = AsyncHTTPClient()
- response = yield http_client.fetch("http://example.com")
- do_something_with_response(response)
- self.render("template.html")
- .. testoutput::
- :hide:
- Asynchronous functions in Tornado return an ``Awaitable`` or `.Future`;
- yielding this object returns its result.
- You can also yield a list or dict of other yieldable objects, which
- will be started at the same time and run in parallel; a list or dict
- of results will be returned when they are all finished:
- .. testcode::
- @gen.coroutine
- def get(self):
- http_client = AsyncHTTPClient()
- response1, response2 = yield [http_client.fetch(url1),
- http_client.fetch(url2)]
- response_dict = yield dict(response3=http_client.fetch(url3),
- response4=http_client.fetch(url4))
- response3 = response_dict['response3']
- response4 = response_dict['response4']
- .. testoutput::
- :hide:
- If ``tornado.platform.twisted`` is imported, it is also possible to
- yield Twisted's ``Deferred`` objects. See the `convert_yielded`
- function to extend this mechanism.
- .. versionchanged:: 3.2
- Dict support added.
- .. versionchanged:: 4.1
- Support added for yielding ``asyncio`` Futures and Twisted Deferreds
- via ``singledispatch``.
- """
- import asyncio
- import builtins
- import collections
- from collections.abc import Generator
- import concurrent.futures
- import datetime
- import functools
- from functools import singledispatch
- from inspect import isawaitable
- import sys
- import types
- from tornado.concurrent import (
- Future,
- is_future,
- chain_future,
- future_set_exc_info,
- future_add_done_callback,
- future_set_result_unless_cancelled,
- )
- from tornado.ioloop import IOLoop
- from tornado.log import app_log
- from tornado.util import TimeoutError
- import typing
- from typing import Union, Any, Callable, List, Type, Tuple, Awaitable, Dict
- if typing.TYPE_CHECKING:
- from typing import Sequence, Deque, Optional, Set, Iterable # noqa: F401
- _T = typing.TypeVar("_T")
- _Yieldable = Union[
- None, Awaitable, List[Awaitable], Dict[Any, Awaitable], concurrent.futures.Future
- ]
- class KeyReuseError(Exception):
- pass
- class UnknownKeyError(Exception):
- pass
- class LeakedCallbackError(Exception):
- pass
- class BadYieldError(Exception):
- pass
- class ReturnValueIgnoredError(Exception):
- pass
- def _value_from_stopiteration(e: Union[StopIteration, "Return"]) -> Any:
- try:
- # StopIteration has a value attribute beginning in py33.
- # So does our Return class.
- return e.value
- except AttributeError:
- pass
- try:
- # Cython backports coroutine functionality by putting the value in
- # e.args[0].
- return e.args[0]
- except (AttributeError, IndexError):
- return None
- def _create_future() -> Future:
- future = Future() # type: Future
- # Fixup asyncio debug info by removing extraneous stack entries
- source_traceback = getattr(future, "_source_traceback", ())
- while source_traceback:
- # Each traceback entry is equivalent to a
- # (filename, self.lineno, self.name, self.line) tuple
- filename = source_traceback[-1][0]
- if filename == __file__:
- del source_traceback[-1]
- else:
- break
- return future
- def coroutine(
- func: Callable[..., "Generator[Any, Any, _T]"]
- ) -> Callable[..., "Future[_T]"]:
- """Decorator for asynchronous generators.
- For compatibility with older versions of Python, coroutines may
- also "return" by raising the special exception `Return(value)
- <Return>`.
- Functions with this decorator return a `.Future`.
- .. warning::
- When exceptions occur inside a coroutine, the exception
- information will be stored in the `.Future` object. You must
- examine the result of the `.Future` object, or the exception
- may go unnoticed by your code. This means yielding the function
- if called from another coroutine, using something like
- `.IOLoop.run_sync` for top-level calls, or passing the `.Future`
- to `.IOLoop.add_future`.
- .. versionchanged:: 6.0
- The ``callback`` argument was removed. Use the returned
- awaitable object instead.
- """
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- # type: (*Any, **Any) -> Future[_T]
- # This function is type-annotated with a comment to work around
- # https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in
- future = _create_future()
- try:
- result = func(*args, **kwargs)
- except (Return, StopIteration) as e:
- result = _value_from_stopiteration(e)
- except Exception:
- future_set_exc_info(future, sys.exc_info())
- try:
- return future
- finally:
- # Avoid circular references
- future = None # type: ignore
- else:
- if isinstance(result, Generator):
- # Inline the first iteration of Runner.run. This lets us
- # avoid the cost of creating a Runner when the coroutine
- # never actually yields, which in turn allows us to
- # use "optional" coroutines in critical path code without
- # performance penalty for the synchronous case.
- try:
- yielded = next(result)
- except (StopIteration, Return) as e:
- future_set_result_unless_cancelled(
- future, _value_from_stopiteration(e)
- )
- except Exception:
- future_set_exc_info(future, sys.exc_info())
- else:
- # Provide strong references to Runner objects as long
- # as their result future objects also have strong
- # references (typically from the parent coroutine's
- # Runner). This keeps the coroutine's Runner alive.
- # We do this by exploiting the public API
- # add_done_callback() instead of putting a private
- # attribute on the Future.
- # (Github issues #1769, #2229).
- runner = Runner(result, future, yielded)
- future.add_done_callback(lambda _: runner)
- yielded = None
- try:
- return future
- finally:
- # Subtle memory optimization: if next() raised an exception,
- # the future's exc_info contains a traceback which
- # includes this stack frame. This creates a cycle,
- # which will be collected at the next full GC but has
- # been shown to greatly increase memory usage of
- # benchmarks (relative to the refcount-based scheme
- # used in the absence of cycles). We can avoid the
- # cycle by clearing the local variable after we return it.
- future = None # type: ignore
- future_set_result_unless_cancelled(future, result)
- return future
- wrapper.__wrapped__ = func # type: ignore
- wrapper.__tornado_coroutine__ = True # type: ignore
- return wrapper
- def is_coroutine_function(func: Any) -> bool:
- """Return whether *func* is a coroutine function, i.e. a function
- wrapped with `~.gen.coroutine`.
- .. versionadded:: 4.5
- """
- return getattr(func, "__tornado_coroutine__", False)
- class Return(Exception):
- """Special exception to return a value from a `coroutine`.
- If this exception is raised, its value argument is used as the
- result of the coroutine::
- @gen.coroutine
- def fetch_json(url):
- response = yield AsyncHTTPClient().fetch(url)
- raise gen.Return(json_decode(response.body))
- In Python 3.3, this exception is no longer necessary: the ``return``
- statement can be used directly to return a value (previously
- ``yield`` and ``return`` with a value could not be combined in the
- same function).
- By analogy with the return statement, the value argument is optional,
- but it is never necessary to ``raise gen.Return()``. The ``return``
- statement can be used with no arguments instead.
- """
- def __init__(self, value: Any = None) -> None:
- super(Return, self).__init__()
- self.value = value
- # Cython recognizes subclasses of StopIteration with a .args tuple.
- self.args = (value,)
- class WaitIterator(object):
- """Provides an iterator to yield the results of awaitables as they finish.
- Yielding a set of awaitables like this:
- ``results = yield [awaitable1, awaitable2]``
- pauses the coroutine until both ``awaitable1`` and ``awaitable2``
- return, and then restarts the coroutine with the results of both
- awaitables. If either awaitable raises an exception, the
- expression will raise that exception and all the results will be
- lost.
- If you need to get the result of each awaitable as soon as possible,
- or if you need the result of some awaitables even if others produce
- errors, you can use ``WaitIterator``::
- wait_iterator = gen.WaitIterator(awaitable1, awaitable2)
- while not wait_iterator.done():
- try:
- result = yield wait_iterator.next()
- except Exception as e:
- print("Error {} from {}".format(e, wait_iterator.current_future))
- else:
- print("Result {} received from {} at {}".format(
- result, wait_iterator.current_future,
- wait_iterator.current_index))
- Because results are returned as soon as they are available the
- output from the iterator *will not be in the same order as the
- input arguments*. If you need to know which future produced the
- current result, you can use the attributes
- ``WaitIterator.current_future``, or ``WaitIterator.current_index``
- to get the index of the awaitable from the input list. (if keyword
- arguments were used in the construction of the `WaitIterator`,
- ``current_index`` will use the corresponding keyword).
- On Python 3.5, `WaitIterator` implements the async iterator
- protocol, so it can be used with the ``async for`` statement (note
- that in this version the entire iteration is aborted if any value
- raises an exception, while the previous example can continue past
- individual errors)::
- async for result in gen.WaitIterator(future1, future2):
- print("Result {} received from {} at {}".format(
- result, wait_iterator.current_future,
- wait_iterator.current_index))
- .. versionadded:: 4.1
- .. versionchanged:: 4.3
- Added ``async for`` support in Python 3.5.
- """
- _unfinished = {} # type: Dict[Future, Union[int, str]]
- def __init__(self, *args: Future, **kwargs: Future) -> None:
- if args and kwargs:
- raise ValueError("You must provide args or kwargs, not both")
- if kwargs:
- self._unfinished = dict((f, k) for (k, f) in kwargs.items())
- futures = list(kwargs.values()) # type: Sequence[Future]
- else:
- self._unfinished = dict((f, i) for (i, f) in enumerate(args))
- futures = args
- self._finished = collections.deque() # type: Deque[Future]
- self.current_index = None # type: Optional[Union[str, int]]
- self.current_future = None # type: Optional[Future]
- self._running_future = None # type: Optional[Future]
- for future in futures:
- future_add_done_callback(future, self._done_callback)
- def done(self) -> bool:
- """Returns True if this iterator has no more results."""
- if self._finished or self._unfinished:
- return False
- # Clear the 'current' values when iteration is done.
- self.current_index = self.current_future = None
- return True
- def next(self) -> Future:
- """Returns a `.Future` that will yield the next available result.
- Note that this `.Future` will not be the same object as any of
- the inputs.
- """
- self._running_future = Future()
- if self._finished:
- self._return_result(self._finished.popleft())
- return self._running_future
- def _done_callback(self, done: Future) -> None:
- if self._running_future and not self._running_future.done():
- self._return_result(done)
- else:
- self._finished.append(done)
- def _return_result(self, done: Future) -> None:
- """Called set the returned future's state that of the future
- we yielded, and set the current future for the iterator.
- """
- if self._running_future is None:
- raise Exception("no future is running")
- chain_future(done, self._running_future)
- self.current_future = done
- self.current_index = self._unfinished.pop(done)
- def __aiter__(self) -> typing.AsyncIterator:
- return self
- def __anext__(self) -> Future:
- if self.done():
- # Lookup by name to silence pyflakes on older versions.
- raise getattr(builtins, "StopAsyncIteration")()
- return self.next()
- def multi(
- children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
- quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
- ) -> "Union[Future[List], Future[Dict]]":
- """Runs multiple asynchronous operations in parallel.
- ``children`` may either be a list or a dict whose values are
- yieldable objects. ``multi()`` returns a new yieldable
- object that resolves to a parallel structure containing their
- results. If ``children`` is a list, the result is a list of
- results in the same order; if it is a dict, the result is a dict
- with the same keys.
- That is, ``results = yield multi(list_of_futures)`` is equivalent
- to::
- results = []
- for future in list_of_futures:
- results.append(yield future)
- If any children raise exceptions, ``multi()`` will raise the first
- one. All others will be logged, unless they are of types
- contained in the ``quiet_exceptions`` argument.
- In a ``yield``-based coroutine, it is not normally necessary to
- call this function directly, since the coroutine runner will
- do it automatically when a list or dict is yielded. However,
- it is necessary in ``await``-based coroutines, or to pass
- the ``quiet_exceptions`` argument.
- This function is available under the names ``multi()`` and ``Multi()``
- for historical reasons.
- Cancelling a `.Future` returned by ``multi()`` does not cancel its
- children. `asyncio.gather` is similar to ``multi()``, but it does
- cancel its children.
- .. versionchanged:: 4.2
- If multiple yieldables fail, any exceptions after the first
- (which is raised) will be logged. Added the ``quiet_exceptions``
- argument to suppress this logging for selected exception types.
- .. versionchanged:: 4.3
- Replaced the class ``Multi`` and the function ``multi_future``
- with a unified function ``multi``. Added support for yieldables
- other than ``YieldPoint`` and `.Future`.
- """
- return multi_future(children, quiet_exceptions=quiet_exceptions)
- Multi = multi
- def multi_future(
- children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
- quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
- ) -> "Union[Future[List], Future[Dict]]":
- """Wait for multiple asynchronous futures in parallel.
- Since Tornado 6.0, this function is exactly the same as `multi`.
- .. versionadded:: 4.0
- .. versionchanged:: 4.2
- If multiple ``Futures`` fail, any exceptions after the first (which is
- raised) will be logged. Added the ``quiet_exceptions``
- argument to suppress this logging for selected exception types.
- .. deprecated:: 4.3
- Use `multi` instead.
- """
- if isinstance(children, dict):
- keys = list(children.keys()) # type: Optional[List]
- children_seq = children.values() # type: Iterable
- else:
- keys = None
- children_seq = children
- children_futs = list(map(convert_yielded, children_seq))
- assert all(is_future(i) or isinstance(i, _NullFuture) for i in children_futs)
- unfinished_children = set(children_futs)
- future = _create_future()
- if not children_futs:
- future_set_result_unless_cancelled(future, {} if keys is not None else [])
- def callback(fut: Future) -> None:
- unfinished_children.remove(fut)
- if not unfinished_children:
- result_list = []
- for f in children_futs:
- try:
- result_list.append(f.result())
- except Exception as e:
- if future.done():
- if not isinstance(e, quiet_exceptions):
- app_log.error(
- "Multiple exceptions in yield list", exc_info=True
- )
- else:
- future_set_exc_info(future, sys.exc_info())
- if not future.done():
- if keys is not None:
- future_set_result_unless_cancelled(
- future, dict(zip(keys, result_list))
- )
- else:
- future_set_result_unless_cancelled(future, result_list)
- listening = set() # type: Set[Future]
- for f in children_futs:
- if f not in listening:
- listening.add(f)
- future_add_done_callback(f, callback)
- return future
- def maybe_future(x: Any) -> Future:
- """Converts ``x`` into a `.Future`.
- If ``x`` is already a `.Future`, it is simply returned; otherwise
- it is wrapped in a new `.Future`. This is suitable for use as
- ``result = yield gen.maybe_future(f())`` when you don't know whether
- ``f()`` returns a `.Future` or not.
- .. deprecated:: 4.3
- This function only handles ``Futures``, not other yieldable objects.
- Instead of `maybe_future`, check for the non-future result types
- you expect (often just ``None``), and ``yield`` anything unknown.
- """
- if is_future(x):
- return x
- else:
- fut = _create_future()
- fut.set_result(x)
- return fut
- def with_timeout(
- timeout: Union[float, datetime.timedelta],
- future: _Yieldable,
- quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
- ) -> Future:
- """Wraps a `.Future` (or other yieldable object) in a timeout.
- Raises `tornado.util.TimeoutError` if the input future does not
- complete before ``timeout``, which may be specified in any form
- allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or
- an absolute time relative to `.IOLoop.time`)
- If the wrapped `.Future` fails after it has timed out, the exception
- will be logged unless it is either of a type contained in
- ``quiet_exceptions`` (which may be an exception type or a sequence of
- types), or an ``asyncio.CancelledError``.
- The wrapped `.Future` is not canceled when the timeout expires,
- permitting it to be reused. `asyncio.wait_for` is similar to this
- function but it does cancel the wrapped `.Future` on timeout.
- .. versionadded:: 4.0
- .. versionchanged:: 4.1
- Added the ``quiet_exceptions`` argument and the logging of unhandled
- exceptions.
- .. versionchanged:: 4.4
- Added support for yieldable objects other than `.Future`.
- .. versionchanged:: 6.0.3
- ``asyncio.CancelledError`` is now always considered "quiet".
- """
- # It's tempting to optimize this by cancelling the input future on timeout
- # instead of creating a new one, but A) we can't know if we are the only
- # one waiting on the input future, so cancelling it might disrupt other
- # callers and B) concurrent futures can only be cancelled while they are
- # in the queue, so cancellation cannot reliably bound our waiting time.
- future_converted = convert_yielded(future)
- result = _create_future()
- chain_future(future_converted, result)
- io_loop = IOLoop.current()
- def error_callback(future: Future) -> None:
- try:
- future.result()
- except asyncio.CancelledError:
- pass
- except Exception as e:
- if not isinstance(e, quiet_exceptions):
- app_log.error(
- "Exception in Future %r after timeout", future, exc_info=True
- )
- def timeout_callback() -> None:
- if not result.done():
- result.set_exception(TimeoutError("Timeout"))
- # In case the wrapped future goes on to fail, log it.
- future_add_done_callback(future_converted, error_callback)
- timeout_handle = io_loop.add_timeout(timeout, timeout_callback)
- if isinstance(future_converted, Future):
- # We know this future will resolve on the IOLoop, so we don't
- # need the extra thread-safety of IOLoop.add_future (and we also
- # don't care about StackContext here.
- future_add_done_callback(
- future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
- )
- else:
- # concurrent.futures.Futures may resolve on any thread, so we
- # need to route them back to the IOLoop.
- io_loop.add_future(
- future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
- )
- return result
- def sleep(duration: float) -> "Future[None]":
- """Return a `.Future` that resolves after the given number of seconds.
- When used with ``yield`` in a coroutine, this is a non-blocking
- analogue to `time.sleep` (which should not be used in coroutines
- because it is blocking)::
- yield gen.sleep(0.5)
- Note that calling this function on its own does nothing; you must
- wait on the `.Future` it returns (usually by yielding it).
- .. versionadded:: 4.1
- """
- f = _create_future()
- IOLoop.current().call_later(
- duration, lambda: future_set_result_unless_cancelled(f, None)
- )
- return f
- class _NullFuture(object):
- """_NullFuture resembles a Future that finished with a result of None.
- It's not actually a `Future` to avoid depending on a particular event loop.
- Handled as a special case in the coroutine runner.
- We lie and tell the type checker that a _NullFuture is a Future so
- we don't have to leak _NullFuture into lots of public APIs. But
- this means that the type checker can't warn us when we're passing
- a _NullFuture into a code path that doesn't understand what to do
- with it.
- """
- def result(self) -> None:
- return None
- def done(self) -> bool:
- return True
- # _null_future is used as a dummy value in the coroutine runner. It differs
- # from moment in that moment always adds a delay of one IOLoop iteration
- # while _null_future is processed as soon as possible.
- _null_future = typing.cast(Future, _NullFuture())
- moment = typing.cast(Future, _NullFuture())
- moment.__doc__ = """A special object which may be yielded to allow the IOLoop to run for
- one iteration.
- This is not needed in normal use but it can be helpful in long-running
- coroutines that are likely to yield Futures that are ready instantly.
- Usage: ``yield gen.moment``
- In native coroutines, the equivalent of ``yield gen.moment`` is
- ``await asyncio.sleep(0)``.
- .. versionadded:: 4.0
- .. deprecated:: 4.5
- ``yield None`` (or ``yield`` with no argument) is now equivalent to
- ``yield gen.moment``.
- """
- class Runner(object):
- """Internal implementation of `tornado.gen.coroutine`.
- Maintains information about pending callbacks and their results.
- The results of the generator are stored in ``result_future`` (a
- `.Future`)
- """
- def __init__(
- self,
- gen: "Generator[_Yieldable, Any, _T]",
- result_future: "Future[_T]",
- first_yielded: _Yieldable,
- ) -> None:
- self.gen = gen
- self.result_future = result_future
- self.future = _null_future # type: Union[None, Future]
- self.running = False
- self.finished = False
- self.io_loop = IOLoop.current()
- if self.handle_yield(first_yielded):
- gen = result_future = first_yielded = None # type: ignore
- self.run()
- def run(self) -> None:
- """Starts or resumes the generator, running until it reaches a
- yield point that is not ready.
- """
- if self.running or self.finished:
- return
- try:
- self.running = True
- while True:
- future = self.future
- if future is None:
- raise Exception("No pending future")
- if not future.done():
- return
- self.future = None
- try:
- exc_info = None
- try:
- value = future.result()
- except Exception:
- exc_info = sys.exc_info()
- future = None
- if exc_info is not None:
- try:
- yielded = self.gen.throw(*exc_info) # type: ignore
- finally:
- # Break up a reference to itself
- # for faster GC on CPython.
- exc_info = None
- else:
- yielded = self.gen.send(value)
- except (StopIteration, Return) as e:
- self.finished = True
- self.future = _null_future
- future_set_result_unless_cancelled(
- self.result_future, _value_from_stopiteration(e)
- )
- self.result_future = None # type: ignore
- return
- except Exception:
- self.finished = True
- self.future = _null_future
- future_set_exc_info(self.result_future, sys.exc_info())
- self.result_future = None # type: ignore
- return
- if not self.handle_yield(yielded):
- return
- yielded = None
- finally:
- self.running = False
- def handle_yield(self, yielded: _Yieldable) -> bool:
- try:
- self.future = convert_yielded(yielded)
- except BadYieldError:
- self.future = Future()
- future_set_exc_info(self.future, sys.exc_info())
- if self.future is moment:
- self.io_loop.add_callback(self.run)
- return False
- elif self.future is None:
- raise Exception("no pending future")
- elif not self.future.done():
- def inner(f: Any) -> None:
- # Break a reference cycle to speed GC.
- f = None # noqa: F841
- self.run()
- self.io_loop.add_future(self.future, inner)
- return False
- return True
- def handle_exception(
- self, typ: Type[Exception], value: Exception, tb: types.TracebackType
- ) -> bool:
- if not self.running and not self.finished:
- self.future = Future()
- future_set_exc_info(self.future, (typ, value, tb))
- self.run()
- return True
- else:
- return False
- # Convert Awaitables into Futures.
- try:
- _wrap_awaitable = asyncio.ensure_future
- except AttributeError:
- # asyncio.ensure_future was introduced in Python 3.4.4, but
- # Debian jessie still ships with 3.4.2 so try the old name.
- _wrap_awaitable = getattr(asyncio, "async")
- def convert_yielded(yielded: _Yieldable) -> Future:
- """Convert a yielded object into a `.Future`.
- The default implementation accepts lists, dictionaries, and
- Futures. This has the side effect of starting any coroutines that
- did not start themselves, similar to `asyncio.ensure_future`.
- If the `~functools.singledispatch` library is available, this function
- may be extended to support additional types. For example::
- @convert_yielded.register(asyncio.Future)
- def _(asyncio_future):
- return tornado.platform.asyncio.to_tornado_future(asyncio_future)
- .. versionadded:: 4.1
- """
- if yielded is None or yielded is moment:
- return moment
- elif yielded is _null_future:
- return _null_future
- elif isinstance(yielded, (list, dict)):
- return multi(yielded) # type: ignore
- elif is_future(yielded):
- return typing.cast(Future, yielded)
- elif isawaitable(yielded):
- return _wrap_awaitable(yielded) # type: ignore
- else:
- raise BadYieldError("yielded unknown object %r" % (yielded,))
- convert_yielded = singledispatch(convert_yielded)
|