ioloop.py 35 KB


  1. #
  2. # Copyright 2009 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """An I/O event loop for non-blocking sockets.
  16. In Tornado 6.0, `.IOLoop` is a wrapper around the `asyncio` event
  17. loop, with a slightly different interface for historical reasons.
  18. Applications can use either the `.IOLoop` interface or the underlying
  19. `asyncio` event loop directly (unless compatibility with older
  20. versions of Tornado is desired, in which case `.IOLoop` must be used).
  21. Typical applications will use a single `IOLoop` object, accessed via
  22. `IOLoop.current` class method. The `IOLoop.start` method (or
  23. equivalently, `asyncio.AbstractEventLoop.run_forever`) should usually
  24. be called at the end of the ``main()`` function. Atypical applications
  25. may use more than one `IOLoop`, such as one `IOLoop` per thread, or
  26. per `unittest` case.
  27. """
  28. import asyncio
  29. import concurrent.futures
  30. import datetime
  31. import functools
  32. import logging
  33. import numbers
  34. import os
  35. import sys
  36. import time
  37. import math
  38. import random
  39. from tornado.concurrent import (
  40. Future,
  41. is_future,
  42. chain_future,
  43. future_set_exc_info,
  44. future_add_done_callback,
  45. )
  46. from tornado.log import app_log
  47. from tornado.util import Configurable, TimeoutError, import_object
  48. import typing
  49. from typing import Union, Any, Type, Optional, Callable, TypeVar, Tuple, Awaitable
  50. if typing.TYPE_CHECKING:
  51. from typing import Dict, List # noqa: F401
  52. from typing_extensions import Protocol
  53. else:
  54. Protocol = object
  55. class _Selectable(Protocol):
  56. def fileno(self) -> int:
  57. pass
  58. def close(self) -> None:
  59. pass
  60. _T = TypeVar("_T")
  61. _S = TypeVar("_S", bound=_Selectable)
  62. class IOLoop(Configurable):
  63. """An I/O event loop.
  64. As of Tornado 6.0, `IOLoop` is a wrapper around the `asyncio` event
  65. loop.
  66. Example usage for a simple TCP server:
  67. .. testcode::
  68. import errno
  69. import functools
  70. import socket
  71. import tornado.ioloop
  72. from tornado.iostream import IOStream
  73. async def handle_connection(connection, address):
  74. stream = IOStream(connection)
  75. message = await stream.read_until_close()
  76. print("message from client:", message.decode().strip())
  77. def connection_ready(sock, fd, events):
  78. while True:
  79. try:
  80. connection, address = sock.accept()
  81. except socket.error as e:
  82. if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
  83. raise
  84. return
  85. connection.setblocking(0)
  86. io_loop = tornado.ioloop.IOLoop.current()
  87. io_loop.spawn_callback(handle_connection, connection, address)
  88. if __name__ == '__main__':
  89. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  90. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  91. sock.setblocking(0)
  92. sock.bind(("", 8888))
  93. sock.listen(128)
  94. io_loop = tornado.ioloop.IOLoop.current()
  95. callback = functools.partial(connection_ready, sock)
  96. io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
  97. io_loop.start()
  98. .. testoutput::
  99. :hide:
  100. By default, a newly-constructed `IOLoop` becomes the thread's current
  101. `IOLoop`, unless there already is a current `IOLoop`. This behavior
  102. can be controlled with the ``make_current`` argument to the `IOLoop`
  103. constructor: if ``make_current=True``, the new `IOLoop` will always
  104. try to become current and it raises an error if there is already a
  105. current instance. If ``make_current=False``, the new `IOLoop` will
  106. not try to become current.
  107. In general, an `IOLoop` cannot survive a fork or be shared across
  108. processes in any way. When multiple processes are being used, each
  109. process should create its own `IOLoop`, which also implies that
  110. any objects which depend on the `IOLoop` (such as
  111. `.AsyncHTTPClient`) must also be created in the child processes.
  112. As a guideline, anything that starts processes (including the
  113. `tornado.process` and `multiprocessing` modules) should do so as
  114. early as possible, ideally the first thing the application does
  115. after loading its configuration in ``main()``.
  116. .. versionchanged:: 4.2
  117. Added the ``make_current`` keyword argument to the `IOLoop`
  118. constructor.
  119. .. versionchanged:: 5.0
  120. Uses the `asyncio` event loop by default. The
  121. ``IOLoop.configure`` method cannot be used on Python 3 except
  122. to redundantly specify the `asyncio` event loop.
  123. """
  124. # These constants were originally based on constants from the epoll module.
  125. NONE = 0
  126. READ = 0x001
  127. WRITE = 0x004
  128. ERROR = 0x018
  129. # In Python 3, _ioloop_for_asyncio maps from asyncio loops to IOLoops.
  130. _ioloop_for_asyncio = dict() # type: Dict[asyncio.AbstractEventLoop, IOLoop]
  131. @classmethod
  132. def configure(
  133. cls, impl: "Union[None, str, Type[Configurable]]", **kwargs: Any
  134. ) -> None:
  135. if asyncio is not None:
  136. from tornado.platform.asyncio import BaseAsyncIOLoop
  137. if isinstance(impl, str):
  138. impl = import_object(impl)
  139. if isinstance(impl, type) and not issubclass(impl, BaseAsyncIOLoop):
  140. raise RuntimeError(
  141. "only AsyncIOLoop is allowed when asyncio is available"
  142. )
  143. super(IOLoop, cls).configure(impl, **kwargs)
  144. @staticmethod
  145. def instance() -> "IOLoop":
  146. """Deprecated alias for `IOLoop.current()`.
  147. .. versionchanged:: 5.0
  148. Previously, this method returned a global singleton
  149. `IOLoop`, in contrast with the per-thread `IOLoop` returned
  150. by `current()`. In nearly all cases the two were the same
  151. (when they differed, it was generally used from non-Tornado
  152. threads to communicate back to the main thread's `IOLoop`).
  153. This distinction is not present in `asyncio`, so in order
  154. to facilitate integration with that package `instance()`
  155. was changed to be an alias to `current()`. Applications
  156. using the cross-thread communications aspect of
  157. `instance()` should instead set their own global variable
  158. to point to the `IOLoop` they want to use.
  159. .. deprecated:: 5.0
  160. """
  161. return IOLoop.current()
  162. def install(self) -> None:
  163. """Deprecated alias for `make_current()`.
  164. .. versionchanged:: 5.0
  165. Previously, this method would set this `IOLoop` as the
  166. global singleton used by `IOLoop.instance()`. Now that
  167. `instance()` is an alias for `current()`, `install()`
  168. is an alias for `make_current()`.
  169. .. deprecated:: 5.0
  170. """
  171. self.make_current()
  172. @staticmethod
  173. def clear_instance() -> None:
  174. """Deprecated alias for `clear_current()`.
  175. .. versionchanged:: 5.0
  176. Previously, this method would clear the `IOLoop` used as
  177. the global singleton by `IOLoop.instance()`. Now that
  178. `instance()` is an alias for `current()`,
  179. `clear_instance()` is an alias for `clear_current()`.
  180. .. deprecated:: 5.0
  181. """
  182. IOLoop.clear_current()
  183. @typing.overload
  184. @staticmethod
  185. def current() -> "IOLoop":
  186. pass
  187. @typing.overload # noqa: F811
  188. @staticmethod
  189. def current(instance: bool = True) -> Optional["IOLoop"]:
  190. pass
  191. @staticmethod # noqa: F811
  192. def current(instance: bool = True) -> Optional["IOLoop"]:
  193. """Returns the current thread's `IOLoop`.
  194. If an `IOLoop` is currently running or has been marked as
  195. current by `make_current`, returns that instance. If there is
  196. no current `IOLoop` and ``instance`` is true, creates one.
  197. .. versionchanged:: 4.1
  198. Added ``instance`` argument to control the fallback to
  199. `IOLoop.instance()`.
  200. .. versionchanged:: 5.0
  201. On Python 3, control of the current `IOLoop` is delegated
  202. to `asyncio`, with this and other methods as pass-through accessors.
  203. The ``instance`` argument now controls whether an `IOLoop`
  204. is created automatically when there is none, instead of
  205. whether we fall back to `IOLoop.instance()` (which is now
  206. an alias for this method). ``instance=False`` is deprecated,
  207. since even if we do not create an `IOLoop`, this method
  208. may initialize the asyncio loop.
  209. """
  210. try:
  211. loop = asyncio.get_event_loop()
  212. except (RuntimeError, AssertionError):
  213. if not instance:
  214. return None
  215. raise
  216. try:
  217. return IOLoop._ioloop_for_asyncio[loop]
  218. except KeyError:
  219. if instance:
  220. from tornado.platform.asyncio import AsyncIOMainLoop
  221. current = AsyncIOMainLoop(make_current=True) # type: Optional[IOLoop]
  222. else:
  223. current = None
  224. return current
  225. def make_current(self) -> None:
  226. """Makes this the `IOLoop` for the current thread.
  227. An `IOLoop` automatically becomes current for its thread
  228. when it is started, but it is sometimes useful to call
  229. `make_current` explicitly before starting the `IOLoop`,
  230. so that code run at startup time can find the right
  231. instance.
  232. .. versionchanged:: 4.1
  233. An `IOLoop` created while there is no current `IOLoop`
  234. will automatically become current.
  235. .. versionchanged:: 5.0
  236. This method also sets the current `asyncio` event loop.
  237. """
  238. # The asyncio event loops override this method.
  239. raise NotImplementedError()
  240. @staticmethod
  241. def clear_current() -> None:
  242. """Clears the `IOLoop` for the current thread.
  243. Intended primarily for use by test frameworks in between tests.
  244. .. versionchanged:: 5.0
  245. This method also clears the current `asyncio` event loop.
  246. """
  247. old = IOLoop.current(instance=False)
  248. if old is not None:
  249. old._clear_current_hook()
  250. if asyncio is None:
  251. IOLoop._current.instance = None
  252. def _clear_current_hook(self) -> None:
  253. """Instance method called when an IOLoop ceases to be current.
  254. May be overridden by subclasses as a counterpart to make_current.
  255. """
  256. pass
  257. @classmethod
  258. def configurable_base(cls) -> Type[Configurable]:
  259. return IOLoop
  260. @classmethod
  261. def configurable_default(cls) -> Type[Configurable]:
  262. from tornado.platform.asyncio import AsyncIOLoop
  263. return AsyncIOLoop
  264. def initialize(self, make_current: bool = None) -> None:
  265. if make_current is None:
  266. if IOLoop.current(instance=False) is None:
  267. self.make_current()
  268. elif make_current:
  269. current = IOLoop.current(instance=False)
  270. # AsyncIO loops can already be current by this point.
  271. if current is not None and current is not self:
  272. raise RuntimeError("current IOLoop already exists")
  273. self.make_current()
  274. def close(self, all_fds: bool = False) -> None:
  275. """Closes the `IOLoop`, freeing any resources used.
  276. If ``all_fds`` is true, all file descriptors registered on the
  277. IOLoop will be closed (not just the ones created by the
  278. `IOLoop` itself).
  279. Many applications will only use a single `IOLoop` that runs for the
  280. entire lifetime of the process. In that case closing the `IOLoop`
  281. is not necessary since everything will be cleaned up when the
  282. process exits. `IOLoop.close` is provided mainly for scenarios
  283. such as unit tests, which create and destroy a large number of
  284. ``IOLoops``.
  285. An `IOLoop` must be completely stopped before it can be closed. This
  286. means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
  287. be allowed to return before attempting to call `IOLoop.close()`.
  288. Therefore the call to `close` will usually appear just after
  289. the call to `start` rather than near the call to `stop`.
  290. .. versionchanged:: 3.1
  291. If the `IOLoop` implementation supports non-integer objects
  292. for "file descriptors", those objects will have their
  293. ``close`` method when ``all_fds`` is true.
  294. """
  295. raise NotImplementedError()
  296. @typing.overload
  297. def add_handler(
  298. self, fd: int, handler: Callable[[int, int], None], events: int
  299. ) -> None:
  300. pass
  301. @typing.overload # noqa: F811
  302. def add_handler(
  303. self, fd: _S, handler: Callable[[_S, int], None], events: int
  304. ) -> None:
  305. pass
  306. def add_handler( # noqa: F811
  307. self, fd: Union[int, _Selectable], handler: Callable[..., None], events: int
  308. ) -> None:
  309. """Registers the given handler to receive the given events for ``fd``.
  310. The ``fd`` argument may either be an integer file descriptor or
  311. a file-like object with a ``fileno()`` and ``close()`` method.
  312. The ``events`` argument is a bitwise or of the constants
  313. ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
  314. When an event occurs, ``handler(fd, events)`` will be run.
  315. .. versionchanged:: 4.0
  316. Added the ability to pass file-like objects in addition to
  317. raw file descriptors.
  318. """
  319. raise NotImplementedError()
  320. def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
  321. """Changes the events we listen for ``fd``.
  322. .. versionchanged:: 4.0
  323. Added the ability to pass file-like objects in addition to
  324. raw file descriptors.
  325. """
  326. raise NotImplementedError()
  327. def remove_handler(self, fd: Union[int, _Selectable]) -> None:
  328. """Stop listening for events on ``fd``.
  329. .. versionchanged:: 4.0
  330. Added the ability to pass file-like objects in addition to
  331. raw file descriptors.
  332. """
  333. raise NotImplementedError()
  334. def start(self) -> None:
  335. """Starts the I/O loop.
  336. The loop will run until one of the callbacks calls `stop()`, which
  337. will make the loop stop after the current event iteration completes.
  338. """
  339. raise NotImplementedError()
  340. def _setup_logging(self) -> None:
  341. """The IOLoop catches and logs exceptions, so it's
  342. important that log output be visible. However, python's
  343. default behavior for non-root loggers (prior to python
  344. 3.2) is to print an unhelpful "no handlers could be
  345. found" message rather than the actual log entry, so we
  346. must explicitly configure logging if we've made it this
  347. far without anything.
  348. This method should be called from start() in subclasses.
  349. """
  350. if not any(
  351. [
  352. logging.getLogger().handlers,
  353. logging.getLogger("tornado").handlers,
  354. logging.getLogger("tornado.application").handlers,
  355. ]
  356. ):
  357. logging.basicConfig()
  358. def stop(self) -> None:
  359. """Stop the I/O loop.
  360. If the event loop is not currently running, the next call to `start()`
  361. will return immediately.
  362. Note that even after `stop` has been called, the `IOLoop` is not
  363. completely stopped until `IOLoop.start` has also returned.
  364. Some work that was scheduled before the call to `stop` may still
  365. be run before the `IOLoop` shuts down.
  366. """
  367. raise NotImplementedError()
  368. def run_sync(self, func: Callable, timeout: float = None) -> Any:
  369. """Starts the `IOLoop`, runs the given function, and stops the loop.
  370. The function must return either an awaitable object or
  371. ``None``. If the function returns an awaitable object, the
  372. `IOLoop` will run until the awaitable is resolved (and
  373. `run_sync()` will return the awaitable's result). If it raises
  374. an exception, the `IOLoop` will stop and the exception will be
  375. re-raised to the caller.
  376. The keyword-only argument ``timeout`` may be used to set
  377. a maximum duration for the function. If the timeout expires,
  378. a `tornado.util.TimeoutError` is raised.
  379. This method is useful to allow asynchronous calls in a
  380. ``main()`` function::
  381. async def main():
  382. # do stuff...
  383. if __name__ == '__main__':
  384. IOLoop.current().run_sync(main)
  385. .. versionchanged:: 4.3
  386. Returning a non-``None``, non-awaitable value is now an error.
  387. .. versionchanged:: 5.0
  388. If a timeout occurs, the ``func`` coroutine will be cancelled.
  389. """
  390. future_cell = [None] # type: List[Optional[Future]]
  391. def run() -> None:
  392. try:
  393. result = func()
  394. if result is not None:
  395. from tornado.gen import convert_yielded
  396. result = convert_yielded(result)
  397. except Exception:
  398. fut = Future() # type: Future[Any]
  399. future_cell[0] = fut
  400. future_set_exc_info(fut, sys.exc_info())
  401. else:
  402. if is_future(result):
  403. future_cell[0] = result
  404. else:
  405. fut = Future()
  406. future_cell[0] = fut
  407. fut.set_result(result)
  408. assert future_cell[0] is not None
  409. self.add_future(future_cell[0], lambda future: self.stop())
  410. self.add_callback(run)
  411. if timeout is not None:
  412. def timeout_callback() -> None:
  413. # If we can cancel the future, do so and wait on it. If not,
  414. # Just stop the loop and return with the task still pending.
  415. # (If we neither cancel nor wait for the task, a warning
  416. # will be logged).
  417. assert future_cell[0] is not None
  418. if not future_cell[0].cancel():
  419. self.stop()
  420. timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback)
  421. self.start()
  422. if timeout is not None:
  423. self.remove_timeout(timeout_handle)
  424. assert future_cell[0] is not None
  425. if future_cell[0].cancelled() or not future_cell[0].done():
  426. raise TimeoutError("Operation timed out after %s seconds" % timeout)
  427. return future_cell[0].result()
  428. def time(self) -> float:
  429. """Returns the current time according to the `IOLoop`'s clock.
  430. The return value is a floating-point number relative to an
  431. unspecified time in the past.
  432. Historically, the IOLoop could be customized to use e.g.
  433. `time.monotonic` instead of `time.time`, but this is not
  434. currently supported and so this method is equivalent to
  435. `time.time`.
  436. """
  437. return time.time()
  438. def add_timeout(
  439. self,
  440. deadline: Union[float, datetime.timedelta],
  441. callback: Callable[..., None],
  442. *args: Any,
  443. **kwargs: Any
  444. ) -> object:
  445. """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
  446. Returns an opaque handle that may be passed to
  447. `remove_timeout` to cancel.
  448. ``deadline`` may be a number denoting a time (on the same
  449. scale as `IOLoop.time`, normally `time.time`), or a
  450. `datetime.timedelta` object for a deadline relative to the
  451. current time. Since Tornado 4.0, `call_later` is a more
  452. convenient alternative for the relative case since it does not
  453. require a timedelta object.
  454. Note that it is not safe to call `add_timeout` from other threads.
  455. Instead, you must use `add_callback` to transfer control to the
  456. `IOLoop`'s thread, and then call `add_timeout` from there.
  457. Subclasses of IOLoop must implement either `add_timeout` or
  458. `call_at`; the default implementations of each will call
  459. the other. `call_at` is usually easier to implement, but
  460. subclasses that wish to maintain compatibility with Tornado
  461. versions prior to 4.0 must use `add_timeout` instead.
  462. .. versionchanged:: 4.0
  463. Now passes through ``*args`` and ``**kwargs`` to the callback.
  464. """
  465. if isinstance(deadline, numbers.Real):
  466. return self.call_at(deadline, callback, *args, **kwargs)
  467. elif isinstance(deadline, datetime.timedelta):
  468. return self.call_at(
  469. self.time() + deadline.total_seconds(), callback, *args, **kwargs
  470. )
  471. else:
  472. raise TypeError("Unsupported deadline %r" % deadline)
  473. def call_later(
  474. self, delay: float, callback: Callable[..., None], *args: Any, **kwargs: Any
  475. ) -> object:
  476. """Runs the ``callback`` after ``delay`` seconds have passed.
  477. Returns an opaque handle that may be passed to `remove_timeout`
  478. to cancel. Note that unlike the `asyncio` method of the same
  479. name, the returned object does not have a ``cancel()`` method.
  480. See `add_timeout` for comments on thread-safety and subclassing.
  481. .. versionadded:: 4.0
  482. """
  483. return self.call_at(self.time() + delay, callback, *args, **kwargs)
  484. def call_at(
  485. self, when: float, callback: Callable[..., None], *args: Any, **kwargs: Any
  486. ) -> object:
  487. """Runs the ``callback`` at the absolute time designated by ``when``.
  488. ``when`` must be a number using the same reference point as
  489. `IOLoop.time`.
  490. Returns an opaque handle that may be passed to `remove_timeout`
  491. to cancel. Note that unlike the `asyncio` method of the same
  492. name, the returned object does not have a ``cancel()`` method.
  493. See `add_timeout` for comments on thread-safety and subclassing.
  494. .. versionadded:: 4.0
  495. """
  496. return self.add_timeout(when, callback, *args, **kwargs)
  497. def remove_timeout(self, timeout: object) -> None:
  498. """Cancels a pending timeout.
  499. The argument is a handle as returned by `add_timeout`. It is
  500. safe to call `remove_timeout` even if the callback has already
  501. been run.
  502. """
  503. raise NotImplementedError()
  504. def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
  505. """Calls the given callback on the next I/O loop iteration.
  506. It is safe to call this method from any thread at any time,
  507. except from a signal handler. Note that this is the **only**
  508. method in `IOLoop` that makes this thread-safety guarantee; all
  509. other interaction with the `IOLoop` must be done from that
  510. `IOLoop`'s thread. `add_callback()` may be used to transfer
  511. control from other threads to the `IOLoop`'s thread.
  512. To add a callback from a signal handler, see
  513. `add_callback_from_signal`.
  514. """
  515. raise NotImplementedError()
  516. def add_callback_from_signal(
  517. self, callback: Callable, *args: Any, **kwargs: Any
  518. ) -> None:
  519. """Calls the given callback on the next I/O loop iteration.
  520. Safe for use from a Python signal handler; should not be used
  521. otherwise.
  522. """
  523. raise NotImplementedError()
  524. def spawn_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
  525. """Calls the given callback on the next IOLoop iteration.
  526. As of Tornado 6.0, this method is equivalent to `add_callback`.
  527. .. versionadded:: 4.0
  528. """
  529. self.add_callback(callback, *args, **kwargs)
  530. def add_future(
  531. self,
  532. future: "Union[Future[_T], concurrent.futures.Future[_T]]",
  533. callback: Callable[["Future[_T]"], None],
  534. ) -> None:
  535. """Schedules a callback on the ``IOLoop`` when the given
  536. `.Future` is finished.
  537. The callback is invoked with one argument, the
  538. `.Future`.
  539. This method only accepts `.Future` objects and not other
  540. awaitables (unlike most of Tornado where the two are
  541. interchangeable).
  542. """
  543. if isinstance(future, Future):
  544. # Note that we specifically do not want the inline behavior of
  545. # tornado.concurrent.future_add_done_callback. We always want
  546. # this callback scheduled on the next IOLoop iteration (which
  547. # asyncio.Future always does).
  548. #
  549. # Wrap the callback in self._run_callback so we control
  550. # the error logging (i.e. it goes to tornado.log.app_log
  551. # instead of asyncio's log).
  552. future.add_done_callback(
  553. lambda f: self._run_callback(functools.partial(callback, future))
  554. )
  555. else:
  556. assert is_future(future)
  557. # For concurrent futures, we use self.add_callback, so
  558. # it's fine if future_add_done_callback inlines that call.
  559. future_add_done_callback(
  560. future, lambda f: self.add_callback(callback, future)
  561. )
  562. def run_in_executor(
  563. self,
  564. executor: Optional[concurrent.futures.Executor],
  565. func: Callable[..., _T],
  566. *args: Any
  567. ) -> Awaitable[_T]:
  568. """Runs a function in a ``concurrent.futures.Executor``. If
  569. ``executor`` is ``None``, the IO loop's default executor will be used.
  570. Use `functools.partial` to pass keyword arguments to ``func``.
  571. .. versionadded:: 5.0
  572. """
  573. if executor is None:
  574. if not hasattr(self, "_executor"):
  575. from tornado.process import cpu_count
  576. self._executor = concurrent.futures.ThreadPoolExecutor(
  577. max_workers=(cpu_count() * 5)
  578. ) # type: concurrent.futures.Executor
  579. executor = self._executor
  580. c_future = executor.submit(func, *args)
  581. # Concurrent Futures are not usable with await. Wrap this in a
  582. # Tornado Future instead, using self.add_future for thread-safety.
  583. t_future = Future() # type: Future[_T]
  584. self.add_future(c_future, lambda f: chain_future(f, t_future))
  585. return t_future
  586. def set_default_executor(self, executor: concurrent.futures.Executor) -> None:
  587. """Sets the default executor to use with :meth:`run_in_executor`.
  588. .. versionadded:: 5.0
  589. """
  590. self._executor = executor
  591. def _run_callback(self, callback: Callable[[], Any]) -> None:
  592. """Runs a callback with error handling.
  593. .. versionchanged:: 6.0
  594. CancelledErrors are no longer logged.
  595. """
  596. try:
  597. ret = callback()
  598. if ret is not None:
  599. from tornado import gen
  600. # Functions that return Futures typically swallow all
  601. # exceptions and store them in the Future. If a Future
  602. # makes it out to the IOLoop, ensure its exception (if any)
  603. # gets logged too.
  604. try:
  605. ret = gen.convert_yielded(ret)
  606. except gen.BadYieldError:
  607. # It's not unusual for add_callback to be used with
  608. # methods returning a non-None and non-yieldable
  609. # result, which should just be ignored.
  610. pass
  611. else:
  612. self.add_future(ret, self._discard_future_result)
  613. except asyncio.CancelledError:
  614. pass
  615. except Exception:
  616. app_log.error("Exception in callback %r", callback, exc_info=True)
  617. def _discard_future_result(self, future: Future) -> None:
  618. """Avoid unhandled-exception warnings from spawned coroutines."""
  619. future.result()
  620. def split_fd(
  621. self, fd: Union[int, _Selectable]
  622. ) -> Tuple[int, Union[int, _Selectable]]:
  623. # """Returns an (fd, obj) pair from an ``fd`` parameter.
  624. # We accept both raw file descriptors and file-like objects as
  625. # input to `add_handler` and related methods. When a file-like
  626. # object is passed, we must retain the object itself so we can
  627. # close it correctly when the `IOLoop` shuts down, but the
  628. # poller interfaces favor file descriptors (they will accept
  629. # file-like objects and call ``fileno()`` for you, but they
  630. # always return the descriptor itself).
  631. # This method is provided for use by `IOLoop` subclasses and should
  632. # not generally be used by application code.
  633. # .. versionadded:: 4.0
  634. # """
  635. if isinstance(fd, int):
  636. return fd, fd
  637. return fd.fileno(), fd
  638. def close_fd(self, fd: Union[int, _Selectable]) -> None:
  639. # """Utility method to close an ``fd``.
  640. # If ``fd`` is a file-like object, we close it directly; otherwise
  641. # we use `os.close`.
  642. # This method is provided for use by `IOLoop` subclasses (in
  643. # implementations of ``IOLoop.close(all_fds=True)`` and should
  644. # not generally be used by application code.
  645. # .. versionadded:: 4.0
  646. # """
  647. try:
  648. if isinstance(fd, int):
  649. os.close(fd)
  650. else:
  651. fd.close()
  652. except OSError:
  653. pass
  654. class _Timeout(object):
  655. """An IOLoop timeout, a UNIX timestamp and a callback"""
  656. # Reduce memory overhead when there are lots of pending callbacks
  657. __slots__ = ["deadline", "callback", "tdeadline"]
  658. def __init__(
  659. self, deadline: float, callback: Callable[[], None], io_loop: IOLoop
  660. ) -> None:
  661. if not isinstance(deadline, numbers.Real):
  662. raise TypeError("Unsupported deadline %r" % deadline)
  663. self.deadline = deadline
  664. self.callback = callback
  665. self.tdeadline = (
  666. deadline,
  667. next(io_loop._timeout_counter),
  668. ) # type: Tuple[float, int]
  669. # Comparison methods to sort by deadline, with object id as a tiebreaker
  670. # to guarantee a consistent ordering. The heapq module uses __le__
  671. # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
  672. # use __lt__).
  673. def __lt__(self, other: "_Timeout") -> bool:
  674. return self.tdeadline < other.tdeadline
  675. def __le__(self, other: "_Timeout") -> bool:
  676. return self.tdeadline <= other.tdeadline
  677. class PeriodicCallback(object):
  678. """Schedules the given callback to be called periodically.
  679. The callback is called every ``callback_time`` milliseconds.
  680. Note that the timeout is given in milliseconds, while most other
  681. time-related functions in Tornado use seconds.
  682. If ``jitter`` is specified, each callback time will be randomly selected
  683. within a window of ``jitter * callback_time`` milliseconds.
  684. Jitter can be used to reduce alignment of events with similar periods.
  685. A jitter of 0.1 means allowing a 10% variation in callback time.
  686. The window is centered on ``callback_time`` so the total number of calls
  687. within a given interval should not be significantly affected by adding
  688. jitter.
  689. If the callback runs for longer than ``callback_time`` milliseconds,
  690. subsequent invocations will be skipped to get back on schedule.
  691. `start` must be called after the `PeriodicCallback` is created.
  692. .. versionchanged:: 5.0
  693. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  694. .. versionchanged:: 5.1
  695. The ``jitter`` argument is added.
  696. """
  697. def __init__(
  698. self, callback: Callable[[], None], callback_time: float, jitter: float = 0
  699. ) -> None:
  700. self.callback = callback
  701. if callback_time <= 0:
  702. raise ValueError("Periodic callback must have a positive callback_time")
  703. self.callback_time = callback_time
  704. self.jitter = jitter
  705. self._running = False
  706. self._timeout = None # type: object
  707. def start(self) -> None:
  708. """Starts the timer."""
  709. # Looking up the IOLoop here allows to first instantiate the
  710. # PeriodicCallback in another thread, then start it using
  711. # IOLoop.add_callback().
  712. self.io_loop = IOLoop.current()
  713. self._running = True
  714. self._next_timeout = self.io_loop.time()
  715. self._schedule_next()
  716. def stop(self) -> None:
  717. """Stops the timer."""
  718. self._running = False
  719. if self._timeout is not None:
  720. self.io_loop.remove_timeout(self._timeout)
  721. self._timeout = None
  722. def is_running(self) -> bool:
  723. """Returns ``True`` if this `.PeriodicCallback` has been started.
  724. .. versionadded:: 4.1
  725. """
  726. return self._running
  727. def _run(self) -> None:
  728. if not self._running:
  729. return
  730. try:
  731. return self.callback()
  732. except Exception:
  733. app_log.error("Exception in callback %r", self.callback, exc_info=True)
  734. finally:
  735. self._schedule_next()
  736. def _schedule_next(self) -> None:
  737. if self._running:
  738. self._update_next(self.io_loop.time())
  739. self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
  740. def _update_next(self, current_time: float) -> None:
  741. callback_time_sec = self.callback_time / 1000.0
  742. if self.jitter:
  743. # apply jitter fraction
  744. callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5))
  745. if self._next_timeout <= current_time:
  746. # The period should be measured from the start of one call
  747. # to the start of the next. If one call takes too long,
  748. # skip cycles to get back to a multiple of the original
  749. # schedule.
  750. self._next_timeout += (
  751. math.floor((current_time - self._next_timeout) / callback_time_sec) + 1
  752. ) * callback_time_sec
  753. else:
  754. # If the clock moved backwards, ensure we advance the next
  755. # timeout instead of recomputing the same value again.
  756. # This may result in long gaps between callbacks if the
  757. # clock jumps backwards by a lot, but the far more common
  758. # scenario is a small NTP adjustment that should just be
  759. # ignored.
  760. #
  761. # Note that on some systems if time.time() runs slower
  762. # than time.monotonic() (most common on windows), we
  763. # effectively experience a small backwards time jump on
  764. # every iteration because PeriodicCallback uses
  765. # time.time() while asyncio schedules callbacks using
  766. # time.monotonic().
  767. # https://github.com/tornadoweb/tornado/issues/2333
  768. self._next_timeout += callback_time_sec