asyncio.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. """Bridges between the `asyncio` module and Tornado IOLoop.
  2. .. versionadded:: 3.2
  3. This module integrates Tornado with the ``asyncio`` module introduced
  4. in Python 3.4. This makes it possible to combine the two libraries on
  5. the same event loop.
  6. .. deprecated:: 5.0
  7. While the code in this module is still used, it is now enabled
  8. automatically when `asyncio` is available, so applications should
  9. no longer need to refer to this module directly.
  10. .. note::
  11. Tornado requires the `~asyncio.AbstractEventLoop.add_reader` family of
  12. methods, so it is not compatible with the `~asyncio.ProactorEventLoop` on
  13. Windows. Use the `~asyncio.SelectorEventLoop` instead.
  14. """
  15. import concurrent.futures
  16. import functools
  17. import sys
  18. from threading import get_ident
  19. from tornado.gen import convert_yielded
  20. from tornado.ioloop import IOLoop, _Selectable
  21. import asyncio
  22. import typing
  23. from typing import Any, TypeVar, Awaitable, Callable, Union, Optional
  24. if typing.TYPE_CHECKING:
  25. from typing import Set, Dict, Tuple # noqa: F401
  26. _T = TypeVar("_T")
  27. class BaseAsyncIOLoop(IOLoop):
  28. def initialize( # type: ignore
  29. self, asyncio_loop: asyncio.AbstractEventLoop, **kwargs: Any
  30. ) -> None:
  31. self.asyncio_loop = asyncio_loop
  32. # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
  33. self.handlers = {} # type: Dict[int, Tuple[Union[int, _Selectable], Callable]]
  34. # Set of fds listening for reads/writes
  35. self.readers = set() # type: Set[int]
  36. self.writers = set() # type: Set[int]
  37. self.closing = False
  38. # If an asyncio loop was closed through an asyncio interface
  39. # instead of IOLoop.close(), we'd never hear about it and may
  40. # have left a dangling reference in our map. In case an
  41. # application (or, more likely, a test suite) creates and
  42. # destroys a lot of event loops in this way, check here to
  43. # ensure that we don't have a lot of dead loops building up in
  44. # the map.
  45. #
  46. # TODO(bdarnell): consider making self.asyncio_loop a weakref
  47. # for AsyncIOMainLoop and make _ioloop_for_asyncio a
  48. # WeakKeyDictionary.
  49. for loop in list(IOLoop._ioloop_for_asyncio):
  50. if loop.is_closed():
  51. del IOLoop._ioloop_for_asyncio[loop]
  52. IOLoop._ioloop_for_asyncio[asyncio_loop] = self
  53. self._thread_identity = 0
  54. super(BaseAsyncIOLoop, self).initialize(**kwargs)
  55. def assign_thread_identity() -> None:
  56. self._thread_identity = get_ident()
  57. self.add_callback(assign_thread_identity)
  58. def close(self, all_fds: bool = False) -> None:
  59. self.closing = True
  60. for fd in list(self.handlers):
  61. fileobj, handler_func = self.handlers[fd]
  62. self.remove_handler(fd)
  63. if all_fds:
  64. self.close_fd(fileobj)
  65. # Remove the mapping before closing the asyncio loop. If this
  66. # happened in the other order, we could race against another
  67. # initialize() call which would see the closed asyncio loop,
  68. # assume it was closed from the asyncio side, and do this
  69. # cleanup for us, leading to a KeyError.
  70. del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
  71. self.asyncio_loop.close()
  72. def add_handler(
  73. self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int
  74. ) -> None:
  75. fd, fileobj = self.split_fd(fd)
  76. if fd in self.handlers:
  77. raise ValueError("fd %s added twice" % fd)
  78. self.handlers[fd] = (fileobj, handler)
  79. if events & IOLoop.READ:
  80. self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
  81. self.readers.add(fd)
  82. if events & IOLoop.WRITE:
  83. self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
  84. self.writers.add(fd)
  85. def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
  86. fd, fileobj = self.split_fd(fd)
  87. if events & IOLoop.READ:
  88. if fd not in self.readers:
  89. self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
  90. self.readers.add(fd)
  91. else:
  92. if fd in self.readers:
  93. self.asyncio_loop.remove_reader(fd)
  94. self.readers.remove(fd)
  95. if events & IOLoop.WRITE:
  96. if fd not in self.writers:
  97. self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
  98. self.writers.add(fd)
  99. else:
  100. if fd in self.writers:
  101. self.asyncio_loop.remove_writer(fd)
  102. self.writers.remove(fd)
  103. def remove_handler(self, fd: Union[int, _Selectable]) -> None:
  104. fd, fileobj = self.split_fd(fd)
  105. if fd not in self.handlers:
  106. return
  107. if fd in self.readers:
  108. self.asyncio_loop.remove_reader(fd)
  109. self.readers.remove(fd)
  110. if fd in self.writers:
  111. self.asyncio_loop.remove_writer(fd)
  112. self.writers.remove(fd)
  113. del self.handlers[fd]
  114. def _handle_events(self, fd: int, events: int) -> None:
  115. fileobj, handler_func = self.handlers[fd]
  116. handler_func(fileobj, events)
  117. def start(self) -> None:
  118. try:
  119. old_loop = asyncio.get_event_loop()
  120. except (RuntimeError, AssertionError):
  121. old_loop = None # type: ignore
  122. try:
  123. self._setup_logging()
  124. asyncio.set_event_loop(self.asyncio_loop)
  125. self.asyncio_loop.run_forever()
  126. finally:
  127. asyncio.set_event_loop(old_loop)
  128. def stop(self) -> None:
  129. self.asyncio_loop.stop()
  130. def call_at(
  131. self, when: float, callback: Callable[..., None], *args: Any, **kwargs: Any
  132. ) -> object:
  133. # asyncio.call_at supports *args but not **kwargs, so bind them here.
  134. # We do not synchronize self.time and asyncio_loop.time, so
  135. # convert from absolute to relative.
  136. return self.asyncio_loop.call_later(
  137. max(0, when - self.time()),
  138. self._run_callback,
  139. functools.partial(callback, *args, **kwargs),
  140. )
  141. def remove_timeout(self, timeout: object) -> None:
  142. timeout.cancel() # type: ignore
  143. def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
  144. if get_ident() == self._thread_identity:
  145. call_soon = self.asyncio_loop.call_soon
  146. else:
  147. call_soon = self.asyncio_loop.call_soon_threadsafe
  148. try:
  149. call_soon(self._run_callback, functools.partial(callback, *args, **kwargs))
  150. except RuntimeError:
  151. # "Event loop is closed". Swallow the exception for
  152. # consistency with PollIOLoop (and logical consistency
  153. # with the fact that we can't guarantee that an
  154. # add_callback that completes without error will
  155. # eventually execute).
  156. pass
  157. def add_callback_from_signal(
  158. self, callback: Callable, *args: Any, **kwargs: Any
  159. ) -> None:
  160. try:
  161. self.asyncio_loop.call_soon_threadsafe(
  162. self._run_callback, functools.partial(callback, *args, **kwargs)
  163. )
  164. except RuntimeError:
  165. pass
  166. def run_in_executor(
  167. self,
  168. executor: Optional[concurrent.futures.Executor],
  169. func: Callable[..., _T],
  170. *args: Any
  171. ) -> Awaitable[_T]:
  172. return self.asyncio_loop.run_in_executor(executor, func, *args)
  173. def set_default_executor(self, executor: concurrent.futures.Executor) -> None:
  174. return self.asyncio_loop.set_default_executor(executor)
  175. class AsyncIOMainLoop(BaseAsyncIOLoop):
  176. """``AsyncIOMainLoop`` creates an `.IOLoop` that corresponds to the
  177. current ``asyncio`` event loop (i.e. the one returned by
  178. ``asyncio.get_event_loop()``).
  179. .. deprecated:: 5.0
  180. Now used automatically when appropriate; it is no longer necessary
  181. to refer to this class directly.
  182. .. versionchanged:: 5.0
  183. Closing an `AsyncIOMainLoop` now closes the underlying asyncio loop.
  184. """
  185. def initialize(self, **kwargs: Any) -> None: # type: ignore
  186. super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs)
  187. def make_current(self) -> None:
  188. # AsyncIOMainLoop already refers to the current asyncio loop so
  189. # nothing to do here.
  190. pass
  191. class AsyncIOLoop(BaseAsyncIOLoop):
  192. """``AsyncIOLoop`` is an `.IOLoop` that runs on an ``asyncio`` event loop.
  193. This class follows the usual Tornado semantics for creating new
  194. ``IOLoops``; these loops are not necessarily related to the
  195. ``asyncio`` default event loop.
  196. Each ``AsyncIOLoop`` creates a new ``asyncio.EventLoop``; this object
  197. can be accessed with the ``asyncio_loop`` attribute.
  198. .. versionchanged:: 5.0
  199. When an ``AsyncIOLoop`` becomes the current `.IOLoop`, it also sets
  200. the current `asyncio` event loop.
  201. .. deprecated:: 5.0
  202. Now used automatically when appropriate; it is no longer necessary
  203. to refer to this class directly.
  204. """
  205. def initialize(self, **kwargs: Any) -> None: # type: ignore
  206. self.is_current = False
  207. loop = asyncio.new_event_loop()
  208. try:
  209. super(AsyncIOLoop, self).initialize(loop, **kwargs)
  210. except Exception:
  211. # If initialize() does not succeed (taking ownership of the loop),
  212. # we have to close it.
  213. loop.close()
  214. raise
  215. def close(self, all_fds: bool = False) -> None:
  216. if self.is_current:
  217. self.clear_current()
  218. super(AsyncIOLoop, self).close(all_fds=all_fds)
  219. def make_current(self) -> None:
  220. if not self.is_current:
  221. try:
  222. self.old_asyncio = asyncio.get_event_loop()
  223. except (RuntimeError, AssertionError):
  224. self.old_asyncio = None # type: ignore
  225. self.is_current = True
  226. asyncio.set_event_loop(self.asyncio_loop)
  227. def _clear_current_hook(self) -> None:
  228. if self.is_current:
  229. asyncio.set_event_loop(self.old_asyncio)
  230. self.is_current = False
  231. def to_tornado_future(asyncio_future: asyncio.Future) -> asyncio.Future:
  232. """Convert an `asyncio.Future` to a `tornado.concurrent.Future`.
  233. .. versionadded:: 4.1
  234. .. deprecated:: 5.0
  235. Tornado ``Futures`` have been merged with `asyncio.Future`,
  236. so this method is now a no-op.
  237. """
  238. return asyncio_future
  239. def to_asyncio_future(tornado_future: asyncio.Future) -> asyncio.Future:
  240. """Convert a Tornado yieldable object to an `asyncio.Future`.
  241. .. versionadded:: 4.1
  242. .. versionchanged:: 4.3
  243. Now accepts any yieldable object, not just
  244. `tornado.concurrent.Future`.
  245. .. deprecated:: 5.0
  246. Tornado ``Futures`` have been merged with `asyncio.Future`,
  247. so this method is now equivalent to `tornado.gen.convert_yielded`.
  248. """
  249. return convert_yielded(tornado_future)
  250. if sys.platform == "win32" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
  251. # "Any thread" and "selector" should be orthogonal, but there's not a clean
  252. # interface for composing policies so pick the right base.
  253. _BasePolicy = asyncio.WindowsSelectorEventLoopPolicy # type: ignore
  254. else:
  255. _BasePolicy = asyncio.DefaultEventLoopPolicy
  256. class AnyThreadEventLoopPolicy(_BasePolicy): # type: ignore
  257. """Event loop policy that allows loop creation on any thread.
  258. The default `asyncio` event loop policy only automatically creates
  259. event loops in the main threads. Other threads must create event
  260. loops explicitly or `asyncio.get_event_loop` (and therefore
  261. `.IOLoop.current`) will fail. Installing this policy allows event
  262. loops to be created automatically on any thread, matching the
  263. behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2).
  264. Usage::
  265. asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
  266. .. versionadded:: 5.0
  267. """
  268. def get_event_loop(self) -> asyncio.AbstractEventLoop:
  269. try:
  270. return super().get_event_loop()
  271. except (RuntimeError, AssertionError):
  272. # This was an AssertionError in python 3.4.2 (which ships with debian jessie)
  273. # and changed to a RuntimeError in 3.4.3.
  274. # "There is no current event loop in thread %r"
  275. loop = self.new_event_loop()
  276. self.set_event_loop(loop)
  277. return loop