gen.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. """``tornado.gen`` implements generator-based coroutines.
  2. .. note::
  3. The "decorator and generator" approach in this module is a
  4. precursor to native coroutines (using ``async def`` and ``await``)
  5. which were introduced in Python 3.5. Applications that do not
  6. require compatibility with older versions of Python should use
  7. native coroutines instead. Some parts of this module are still
  8. useful with native coroutines, notably `multi`, `sleep`,
  9. `WaitIterator`, and `with_timeout`. Some of these functions have
  10. counterparts in the `asyncio` module which may be used as well,
  11. although the two may not necessarily be 100% compatible.
  12. Coroutines provide an easier way to work in an asynchronous
  13. environment than chaining callbacks. Code using coroutines is
  14. technically asynchronous, but it is written as a single generator
  15. instead of a collection of separate functions.
  16. For example, here's a coroutine-based handler:
  17. .. testcode::
  18. class GenAsyncHandler(RequestHandler):
  19. @gen.coroutine
  20. def get(self):
  21. http_client = AsyncHTTPClient()
  22. response = yield http_client.fetch("http://example.com")
  23. do_something_with_response(response)
  24. self.render("template.html")
  25. .. testoutput::
  26. :hide:
  27. Asynchronous functions in Tornado return an ``Awaitable`` or `.Future`;
  28. yielding this object returns its result.
  29. You can also yield a list or dict of other yieldable objects, which
  30. will be started at the same time and run in parallel; a list or dict
  31. of results will be returned when they are all finished:
  32. .. testcode::
  33. @gen.coroutine
  34. def get(self):
  35. http_client = AsyncHTTPClient()
  36. response1, response2 = yield [http_client.fetch(url1),
  37. http_client.fetch(url2)]
  38. response_dict = yield dict(response3=http_client.fetch(url3),
  39. response4=http_client.fetch(url4))
  40. response3 = response_dict['response3']
  41. response4 = response_dict['response4']
  42. .. testoutput::
  43. :hide:
  44. If ``tornado.platform.twisted`` is imported, it is also possible to
  45. yield Twisted's ``Deferred`` objects. See the `convert_yielded`
  46. function to extend this mechanism.
  47. .. versionchanged:: 3.2
  48. Dict support added.
  49. .. versionchanged:: 4.1
  50. Support added for yielding ``asyncio`` Futures and Twisted Deferreds
  51. via ``singledispatch``.
  52. """
  53. import asyncio
  54. import builtins
  55. import collections
  56. from collections.abc import Generator
  57. import concurrent.futures
  58. import datetime
  59. import functools
  60. from functools import singledispatch
  61. from inspect import isawaitable
  62. import sys
  63. import types
  64. from tornado.concurrent import (
  65. Future,
  66. is_future,
  67. chain_future,
  68. future_set_exc_info,
  69. future_add_done_callback,
  70. future_set_result_unless_cancelled,
  71. )
  72. from tornado.ioloop import IOLoop
  73. from tornado.log import app_log
  74. from tornado.util import TimeoutError
  75. import typing
  76. from typing import Union, Any, Callable, List, Type, Tuple, Awaitable, Dict
  77. if typing.TYPE_CHECKING:
  78. from typing import Sequence, Deque, Optional, Set, Iterable # noqa: F401
  79. _T = typing.TypeVar("_T")
  80. _Yieldable = Union[
  81. None, Awaitable, List[Awaitable], Dict[Any, Awaitable], concurrent.futures.Future
  82. ]
  83. class KeyReuseError(Exception):
  84. pass
  85. class UnknownKeyError(Exception):
  86. pass
  87. class LeakedCallbackError(Exception):
  88. pass
  89. class BadYieldError(Exception):
  90. pass
  91. class ReturnValueIgnoredError(Exception):
  92. pass
  93. def _value_from_stopiteration(e: Union[StopIteration, "Return"]) -> Any:
  94. try:
  95. # StopIteration has a value attribute beginning in py33.
  96. # So does our Return class.
  97. return e.value
  98. except AttributeError:
  99. pass
  100. try:
  101. # Cython backports coroutine functionality by putting the value in
  102. # e.args[0].
  103. return e.args[0]
  104. except (AttributeError, IndexError):
  105. return None
  106. def _create_future() -> Future:
  107. future = Future() # type: Future
  108. # Fixup asyncio debug info by removing extraneous stack entries
  109. source_traceback = getattr(future, "_source_traceback", ())
  110. while source_traceback:
  111. # Each traceback entry is equivalent to a
  112. # (filename, self.lineno, self.name, self.line) tuple
  113. filename = source_traceback[-1][0]
  114. if filename == __file__:
  115. del source_traceback[-1]
  116. else:
  117. break
  118. return future
  119. def coroutine(
  120. func: Callable[..., "Generator[Any, Any, _T]"]
  121. ) -> Callable[..., "Future[_T]"]:
  122. """Decorator for asynchronous generators.
  123. For compatibility with older versions of Python, coroutines may
  124. also "return" by raising the special exception `Return(value)
  125. <Return>`.
  126. Functions with this decorator return a `.Future`.
  127. .. warning::
  128. When exceptions occur inside a coroutine, the exception
  129. information will be stored in the `.Future` object. You must
  130. examine the result of the `.Future` object, or the exception
  131. may go unnoticed by your code. This means yielding the function
  132. if called from another coroutine, using something like
  133. `.IOLoop.run_sync` for top-level calls, or passing the `.Future`
  134. to `.IOLoop.add_future`.
  135. .. versionchanged:: 6.0
  136. The ``callback`` argument was removed. Use the returned
  137. awaitable object instead.
  138. """
  139. @functools.wraps(func)
  140. def wrapper(*args, **kwargs):
  141. # type: (*Any, **Any) -> Future[_T]
  142. # This function is type-annotated with a comment to work around
  143. # https://bitbucket.org/pypy/pypy/issues/2868/segfault-with-args-type-annotation-in
  144. future = _create_future()
  145. try:
  146. result = func(*args, **kwargs)
  147. except (Return, StopIteration) as e:
  148. result = _value_from_stopiteration(e)
  149. except Exception:
  150. future_set_exc_info(future, sys.exc_info())
  151. try:
  152. return future
  153. finally:
  154. # Avoid circular references
  155. future = None # type: ignore
  156. else:
  157. if isinstance(result, Generator):
  158. # Inline the first iteration of Runner.run. This lets us
  159. # avoid the cost of creating a Runner when the coroutine
  160. # never actually yields, which in turn allows us to
  161. # use "optional" coroutines in critical path code without
  162. # performance penalty for the synchronous case.
  163. try:
  164. yielded = next(result)
  165. except (StopIteration, Return) as e:
  166. future_set_result_unless_cancelled(
  167. future, _value_from_stopiteration(e)
  168. )
  169. except Exception:
  170. future_set_exc_info(future, sys.exc_info())
  171. else:
  172. # Provide strong references to Runner objects as long
  173. # as their result future objects also have strong
  174. # references (typically from the parent coroutine's
  175. # Runner). This keeps the coroutine's Runner alive.
  176. # We do this by exploiting the public API
  177. # add_done_callback() instead of putting a private
  178. # attribute on the Future.
  179. # (Github issues #1769, #2229).
  180. runner = Runner(result, future, yielded)
  181. future.add_done_callback(lambda _: runner)
  182. yielded = None
  183. try:
  184. return future
  185. finally:
  186. # Subtle memory optimization: if next() raised an exception,
  187. # the future's exc_info contains a traceback which
  188. # includes this stack frame. This creates a cycle,
  189. # which will be collected at the next full GC but has
  190. # been shown to greatly increase memory usage of
  191. # benchmarks (relative to the refcount-based scheme
  192. # used in the absence of cycles). We can avoid the
  193. # cycle by clearing the local variable after we return it.
  194. future = None # type: ignore
  195. future_set_result_unless_cancelled(future, result)
  196. return future
  197. wrapper.__wrapped__ = func # type: ignore
  198. wrapper.__tornado_coroutine__ = True # type: ignore
  199. return wrapper
  200. def is_coroutine_function(func: Any) -> bool:
  201. """Return whether *func* is a coroutine function, i.e. a function
  202. wrapped with `~.gen.coroutine`.
  203. .. versionadded:: 4.5
  204. """
  205. return getattr(func, "__tornado_coroutine__", False)
  206. class Return(Exception):
  207. """Special exception to return a value from a `coroutine`.
  208. If this exception is raised, its value argument is used as the
  209. result of the coroutine::
  210. @gen.coroutine
  211. def fetch_json(url):
  212. response = yield AsyncHTTPClient().fetch(url)
  213. raise gen.Return(json_decode(response.body))
  214. In Python 3.3, this exception is no longer necessary: the ``return``
  215. statement can be used directly to return a value (previously
  216. ``yield`` and ``return`` with a value could not be combined in the
  217. same function).
  218. By analogy with the return statement, the value argument is optional,
  219. but it is never necessary to ``raise gen.Return()``. The ``return``
  220. statement can be used with no arguments instead.
  221. """
  222. def __init__(self, value: Any = None) -> None:
  223. super(Return, self).__init__()
  224. self.value = value
  225. # Cython recognizes subclasses of StopIteration with a .args tuple.
  226. self.args = (value,)
  227. class WaitIterator(object):
  228. """Provides an iterator to yield the results of awaitables as they finish.
  229. Yielding a set of awaitables like this:
  230. ``results = yield [awaitable1, awaitable2]``
  231. pauses the coroutine until both ``awaitable1`` and ``awaitable2``
  232. return, and then restarts the coroutine with the results of both
  233. awaitables. If either awaitable raises an exception, the
  234. expression will raise that exception and all the results will be
  235. lost.
  236. If you need to get the result of each awaitable as soon as possible,
  237. or if you need the result of some awaitables even if others produce
  238. errors, you can use ``WaitIterator``::
  239. wait_iterator = gen.WaitIterator(awaitable1, awaitable2)
  240. while not wait_iterator.done():
  241. try:
  242. result = yield wait_iterator.next()
  243. except Exception as e:
  244. print("Error {} from {}".format(e, wait_iterator.current_future))
  245. else:
  246. print("Result {} received from {} at {}".format(
  247. result, wait_iterator.current_future,
  248. wait_iterator.current_index))
  249. Because results are returned as soon as they are available the
  250. output from the iterator *will not be in the same order as the
  251. input arguments*. If you need to know which future produced the
  252. current result, you can use the attributes
  253. ``WaitIterator.current_future``, or ``WaitIterator.current_index``
  254. to get the index of the awaitable from the input list. (if keyword
  255. arguments were used in the construction of the `WaitIterator`,
  256. ``current_index`` will use the corresponding keyword).
  257. On Python 3.5, `WaitIterator` implements the async iterator
  258. protocol, so it can be used with the ``async for`` statement (note
  259. that in this version the entire iteration is aborted if any value
  260. raises an exception, while the previous example can continue past
  261. individual errors)::
  262. async for result in gen.WaitIterator(future1, future2):
  263. print("Result {} received from {} at {}".format(
  264. result, wait_iterator.current_future,
  265. wait_iterator.current_index))
  266. .. versionadded:: 4.1
  267. .. versionchanged:: 4.3
  268. Added ``async for`` support in Python 3.5.
  269. """
  270. _unfinished = {} # type: Dict[Future, Union[int, str]]
  271. def __init__(self, *args: Future, **kwargs: Future) -> None:
  272. if args and kwargs:
  273. raise ValueError("You must provide args or kwargs, not both")
  274. if kwargs:
  275. self._unfinished = dict((f, k) for (k, f) in kwargs.items())
  276. futures = list(kwargs.values()) # type: Sequence[Future]
  277. else:
  278. self._unfinished = dict((f, i) for (i, f) in enumerate(args))
  279. futures = args
  280. self._finished = collections.deque() # type: Deque[Future]
  281. self.current_index = None # type: Optional[Union[str, int]]
  282. self.current_future = None # type: Optional[Future]
  283. self._running_future = None # type: Optional[Future]
  284. for future in futures:
  285. future_add_done_callback(future, self._done_callback)
  286. def done(self) -> bool:
  287. """Returns True if this iterator has no more results."""
  288. if self._finished or self._unfinished:
  289. return False
  290. # Clear the 'current' values when iteration is done.
  291. self.current_index = self.current_future = None
  292. return True
  293. def next(self) -> Future:
  294. """Returns a `.Future` that will yield the next available result.
  295. Note that this `.Future` will not be the same object as any of
  296. the inputs.
  297. """
  298. self._running_future = Future()
  299. if self._finished:
  300. self._return_result(self._finished.popleft())
  301. return self._running_future
  302. def _done_callback(self, done: Future) -> None:
  303. if self._running_future and not self._running_future.done():
  304. self._return_result(done)
  305. else:
  306. self._finished.append(done)
  307. def _return_result(self, done: Future) -> None:
  308. """Called set the returned future's state that of the future
  309. we yielded, and set the current future for the iterator.
  310. """
  311. if self._running_future is None:
  312. raise Exception("no future is running")
  313. chain_future(done, self._running_future)
  314. self.current_future = done
  315. self.current_index = self._unfinished.pop(done)
  316. def __aiter__(self) -> typing.AsyncIterator:
  317. return self
  318. def __anext__(self) -> Future:
  319. if self.done():
  320. # Lookup by name to silence pyflakes on older versions.
  321. raise getattr(builtins, "StopAsyncIteration")()
  322. return self.next()
  323. def multi(
  324. children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
  325. quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
  326. ) -> "Union[Future[List], Future[Dict]]":
  327. """Runs multiple asynchronous operations in parallel.
  328. ``children`` may either be a list or a dict whose values are
  329. yieldable objects. ``multi()`` returns a new yieldable
  330. object that resolves to a parallel structure containing their
  331. results. If ``children`` is a list, the result is a list of
  332. results in the same order; if it is a dict, the result is a dict
  333. with the same keys.
  334. That is, ``results = yield multi(list_of_futures)`` is equivalent
  335. to::
  336. results = []
  337. for future in list_of_futures:
  338. results.append(yield future)
  339. If any children raise exceptions, ``multi()`` will raise the first
  340. one. All others will be logged, unless they are of types
  341. contained in the ``quiet_exceptions`` argument.
  342. In a ``yield``-based coroutine, it is not normally necessary to
  343. call this function directly, since the coroutine runner will
  344. do it automatically when a list or dict is yielded. However,
  345. it is necessary in ``await``-based coroutines, or to pass
  346. the ``quiet_exceptions`` argument.
  347. This function is available under the names ``multi()`` and ``Multi()``
  348. for historical reasons.
  349. Cancelling a `.Future` returned by ``multi()`` does not cancel its
  350. children. `asyncio.gather` is similar to ``multi()``, but it does
  351. cancel its children.
  352. .. versionchanged:: 4.2
  353. If multiple yieldables fail, any exceptions after the first
  354. (which is raised) will be logged. Added the ``quiet_exceptions``
  355. argument to suppress this logging for selected exception types.
  356. .. versionchanged:: 4.3
  357. Replaced the class ``Multi`` and the function ``multi_future``
  358. with a unified function ``multi``. Added support for yieldables
  359. other than ``YieldPoint`` and `.Future`.
  360. """
  361. return multi_future(children, quiet_exceptions=quiet_exceptions)
  362. Multi = multi
  363. def multi_future(
  364. children: Union[List[_Yieldable], Dict[Any, _Yieldable]],
  365. quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
  366. ) -> "Union[Future[List], Future[Dict]]":
  367. """Wait for multiple asynchronous futures in parallel.
  368. Since Tornado 6.0, this function is exactly the same as `multi`.
  369. .. versionadded:: 4.0
  370. .. versionchanged:: 4.2
  371. If multiple ``Futures`` fail, any exceptions after the first (which is
  372. raised) will be logged. Added the ``quiet_exceptions``
  373. argument to suppress this logging for selected exception types.
  374. .. deprecated:: 4.3
  375. Use `multi` instead.
  376. """
  377. if isinstance(children, dict):
  378. keys = list(children.keys()) # type: Optional[List]
  379. children_seq = children.values() # type: Iterable
  380. else:
  381. keys = None
  382. children_seq = children
  383. children_futs = list(map(convert_yielded, children_seq))
  384. assert all(is_future(i) or isinstance(i, _NullFuture) for i in children_futs)
  385. unfinished_children = set(children_futs)
  386. future = _create_future()
  387. if not children_futs:
  388. future_set_result_unless_cancelled(future, {} if keys is not None else [])
  389. def callback(fut: Future) -> None:
  390. unfinished_children.remove(fut)
  391. if not unfinished_children:
  392. result_list = []
  393. for f in children_futs:
  394. try:
  395. result_list.append(f.result())
  396. except Exception as e:
  397. if future.done():
  398. if not isinstance(e, quiet_exceptions):
  399. app_log.error(
  400. "Multiple exceptions in yield list", exc_info=True
  401. )
  402. else:
  403. future_set_exc_info(future, sys.exc_info())
  404. if not future.done():
  405. if keys is not None:
  406. future_set_result_unless_cancelled(
  407. future, dict(zip(keys, result_list))
  408. )
  409. else:
  410. future_set_result_unless_cancelled(future, result_list)
  411. listening = set() # type: Set[Future]
  412. for f in children_futs:
  413. if f not in listening:
  414. listening.add(f)
  415. future_add_done_callback(f, callback)
  416. return future
  417. def maybe_future(x: Any) -> Future:
  418. """Converts ``x`` into a `.Future`.
  419. If ``x`` is already a `.Future`, it is simply returned; otherwise
  420. it is wrapped in a new `.Future`. This is suitable for use as
  421. ``result = yield gen.maybe_future(f())`` when you don't know whether
  422. ``f()`` returns a `.Future` or not.
  423. .. deprecated:: 4.3
  424. This function only handles ``Futures``, not other yieldable objects.
  425. Instead of `maybe_future`, check for the non-future result types
  426. you expect (often just ``None``), and ``yield`` anything unknown.
  427. """
  428. if is_future(x):
  429. return x
  430. else:
  431. fut = _create_future()
  432. fut.set_result(x)
  433. return fut
  434. def with_timeout(
  435. timeout: Union[float, datetime.timedelta],
  436. future: _Yieldable,
  437. quiet_exceptions: "Union[Type[Exception], Tuple[Type[Exception], ...]]" = (),
  438. ) -> Future:
  439. """Wraps a `.Future` (or other yieldable object) in a timeout.
  440. Raises `tornado.util.TimeoutError` if the input future does not
  441. complete before ``timeout``, which may be specified in any form
  442. allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or
  443. an absolute time relative to `.IOLoop.time`)
  444. If the wrapped `.Future` fails after it has timed out, the exception
  445. will be logged unless it is either of a type contained in
  446. ``quiet_exceptions`` (which may be an exception type or a sequence of
  447. types), or an ``asyncio.CancelledError``.
  448. The wrapped `.Future` is not canceled when the timeout expires,
  449. permitting it to be reused. `asyncio.wait_for` is similar to this
  450. function but it does cancel the wrapped `.Future` on timeout.
  451. .. versionadded:: 4.0
  452. .. versionchanged:: 4.1
  453. Added the ``quiet_exceptions`` argument and the logging of unhandled
  454. exceptions.
  455. .. versionchanged:: 4.4
  456. Added support for yieldable objects other than `.Future`.
  457. .. versionchanged:: 6.0.3
  458. ``asyncio.CancelledError`` is now always considered "quiet".
  459. """
  460. # It's tempting to optimize this by cancelling the input future on timeout
  461. # instead of creating a new one, but A) we can't know if we are the only
  462. # one waiting on the input future, so cancelling it might disrupt other
  463. # callers and B) concurrent futures can only be cancelled while they are
  464. # in the queue, so cancellation cannot reliably bound our waiting time.
  465. future_converted = convert_yielded(future)
  466. result = _create_future()
  467. chain_future(future_converted, result)
  468. io_loop = IOLoop.current()
  469. def error_callback(future: Future) -> None:
  470. try:
  471. future.result()
  472. except asyncio.CancelledError:
  473. pass
  474. except Exception as e:
  475. if not isinstance(e, quiet_exceptions):
  476. app_log.error(
  477. "Exception in Future %r after timeout", future, exc_info=True
  478. )
  479. def timeout_callback() -> None:
  480. if not result.done():
  481. result.set_exception(TimeoutError("Timeout"))
  482. # In case the wrapped future goes on to fail, log it.
  483. future_add_done_callback(future_converted, error_callback)
  484. timeout_handle = io_loop.add_timeout(timeout, timeout_callback)
  485. if isinstance(future_converted, Future):
  486. # We know this future will resolve on the IOLoop, so we don't
  487. # need the extra thread-safety of IOLoop.add_future (and we also
  488. # don't care about StackContext here.
  489. future_add_done_callback(
  490. future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
  491. )
  492. else:
  493. # concurrent.futures.Futures may resolve on any thread, so we
  494. # need to route them back to the IOLoop.
  495. io_loop.add_future(
  496. future_converted, lambda future: io_loop.remove_timeout(timeout_handle)
  497. )
  498. return result
  499. def sleep(duration: float) -> "Future[None]":
  500. """Return a `.Future` that resolves after the given number of seconds.
  501. When used with ``yield`` in a coroutine, this is a non-blocking
  502. analogue to `time.sleep` (which should not be used in coroutines
  503. because it is blocking)::
  504. yield gen.sleep(0.5)
  505. Note that calling this function on its own does nothing; you must
  506. wait on the `.Future` it returns (usually by yielding it).
  507. .. versionadded:: 4.1
  508. """
  509. f = _create_future()
  510. IOLoop.current().call_later(
  511. duration, lambda: future_set_result_unless_cancelled(f, None)
  512. )
  513. return f
  514. class _NullFuture(object):
  515. """_NullFuture resembles a Future that finished with a result of None.
  516. It's not actually a `Future` to avoid depending on a particular event loop.
  517. Handled as a special case in the coroutine runner.
  518. We lie and tell the type checker that a _NullFuture is a Future so
  519. we don't have to leak _NullFuture into lots of public APIs. But
  520. this means that the type checker can't warn us when we're passing
  521. a _NullFuture into a code path that doesn't understand what to do
  522. with it.
  523. """
  524. def result(self) -> None:
  525. return None
  526. def done(self) -> bool:
  527. return True
  528. # _null_future is used as a dummy value in the coroutine runner. It differs
  529. # from moment in that moment always adds a delay of one IOLoop iteration
  530. # while _null_future is processed as soon as possible.
  531. _null_future = typing.cast(Future, _NullFuture())
  532. moment = typing.cast(Future, _NullFuture())
  533. moment.__doc__ = """A special object which may be yielded to allow the IOLoop to run for
  534. one iteration.
  535. This is not needed in normal use but it can be helpful in long-running
  536. coroutines that are likely to yield Futures that are ready instantly.
  537. Usage: ``yield gen.moment``
  538. In native coroutines, the equivalent of ``yield gen.moment`` is
  539. ``await asyncio.sleep(0)``.
  540. .. versionadded:: 4.0
  541. .. deprecated:: 4.5
  542. ``yield None`` (or ``yield`` with no argument) is now equivalent to
  543. ``yield gen.moment``.
  544. """
  545. class Runner(object):
  546. """Internal implementation of `tornado.gen.coroutine`.
  547. Maintains information about pending callbacks and their results.
  548. The results of the generator are stored in ``result_future`` (a
  549. `.Future`)
  550. """
  551. def __init__(
  552. self,
  553. gen: "Generator[_Yieldable, Any, _T]",
  554. result_future: "Future[_T]",
  555. first_yielded: _Yieldable,
  556. ) -> None:
  557. self.gen = gen
  558. self.result_future = result_future
  559. self.future = _null_future # type: Union[None, Future]
  560. self.running = False
  561. self.finished = False
  562. self.io_loop = IOLoop.current()
  563. if self.handle_yield(first_yielded):
  564. gen = result_future = first_yielded = None # type: ignore
  565. self.run()
  566. def run(self) -> None:
  567. """Starts or resumes the generator, running until it reaches a
  568. yield point that is not ready.
  569. """
  570. if self.running or self.finished:
  571. return
  572. try:
  573. self.running = True
  574. while True:
  575. future = self.future
  576. if future is None:
  577. raise Exception("No pending future")
  578. if not future.done():
  579. return
  580. self.future = None
  581. try:
  582. exc_info = None
  583. try:
  584. value = future.result()
  585. except Exception:
  586. exc_info = sys.exc_info()
  587. future = None
  588. if exc_info is not None:
  589. try:
  590. yielded = self.gen.throw(*exc_info) # type: ignore
  591. finally:
  592. # Break up a reference to itself
  593. # for faster GC on CPython.
  594. exc_info = None
  595. else:
  596. yielded = self.gen.send(value)
  597. except (StopIteration, Return) as e:
  598. self.finished = True
  599. self.future = _null_future
  600. future_set_result_unless_cancelled(
  601. self.result_future, _value_from_stopiteration(e)
  602. )
  603. self.result_future = None # type: ignore
  604. return
  605. except Exception:
  606. self.finished = True
  607. self.future = _null_future
  608. future_set_exc_info(self.result_future, sys.exc_info())
  609. self.result_future = None # type: ignore
  610. return
  611. if not self.handle_yield(yielded):
  612. return
  613. yielded = None
  614. finally:
  615. self.running = False
  616. def handle_yield(self, yielded: _Yieldable) -> bool:
  617. try:
  618. self.future = convert_yielded(yielded)
  619. except BadYieldError:
  620. self.future = Future()
  621. future_set_exc_info(self.future, sys.exc_info())
  622. if self.future is moment:
  623. self.io_loop.add_callback(self.run)
  624. return False
  625. elif self.future is None:
  626. raise Exception("no pending future")
  627. elif not self.future.done():
  628. def inner(f: Any) -> None:
  629. # Break a reference cycle to speed GC.
  630. f = None # noqa: F841
  631. self.run()
  632. self.io_loop.add_future(self.future, inner)
  633. return False
  634. return True
  635. def handle_exception(
  636. self, typ: Type[Exception], value: Exception, tb: types.TracebackType
  637. ) -> bool:
  638. if not self.running and not self.finished:
  639. self.future = Future()
  640. future_set_exc_info(self.future, (typ, value, tb))
  641. self.run()
  642. return True
  643. else:
  644. return False
  645. # Convert Awaitables into Futures.
  646. try:
  647. _wrap_awaitable = asyncio.ensure_future
  648. except AttributeError:
  649. # asyncio.ensure_future was introduced in Python 3.4.4, but
  650. # Debian jessie still ships with 3.4.2 so try the old name.
  651. _wrap_awaitable = getattr(asyncio, "async")
  652. def convert_yielded(yielded: _Yieldable) -> Future:
  653. """Convert a yielded object into a `.Future`.
  654. The default implementation accepts lists, dictionaries, and
  655. Futures. This has the side effect of starting any coroutines that
  656. did not start themselves, similar to `asyncio.ensure_future`.
  657. If the `~functools.singledispatch` library is available, this function
  658. may be extended to support additional types. For example::
  659. @convert_yielded.register(asyncio.Future)
  660. def _(asyncio_future):
  661. return tornado.platform.asyncio.to_tornado_future(asyncio_future)
  662. .. versionadded:: 4.1
  663. """
  664. if yielded is None or yielded is moment:
  665. return moment
  666. elif yielded is _null_future:
  667. return _null_future
  668. elif isinstance(yielded, (list, dict)):
  669. return multi(yielded) # type: ignore
  670. elif is_future(yielded):
  671. return typing.cast(Future, yielded)
  672. elif isawaitable(yielded):
  673. return _wrap_awaitable(yielded) # type: ignore
  674. else:
  675. raise BadYieldError("yielded unknown object %r" % (yielded,))
  676. convert_yielded = singledispatch(convert_yielded)