ioloop_test.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. from concurrent.futures import ThreadPoolExecutor
  2. from concurrent import futures
  3. import contextlib
  4. import datetime
  5. import functools
  6. import socket
  7. import subprocess
  8. import sys
  9. import threading
  10. import time
  11. import types
  12. from unittest import mock
  13. import unittest
  14. from tornado.escape import native_str
  15. from tornado import gen
  16. from tornado.ioloop import IOLoop, TimeoutError, PeriodicCallback
  17. from tornado.log import app_log
  18. from tornado.testing import AsyncTestCase, bind_unused_port, ExpectLog, gen_test
  19. from tornado.test.util import skipIfNonUnix, skipOnTravis
  20. import typing
  21. if typing.TYPE_CHECKING:
  22. from typing import List # noqa: F401
  23. class TestIOLoop(AsyncTestCase):
  24. def test_add_callback_return_sequence(self):
  25. # A callback returning {} or [] shouldn't spin the CPU, see Issue #1803.
  26. self.calls = 0
  27. loop = self.io_loop
  28. test = self
  29. old_add_callback = loop.add_callback
  30. def add_callback(self, callback, *args, **kwargs):
  31. test.calls += 1
  32. old_add_callback(callback, *args, **kwargs)
  33. loop.add_callback = types.MethodType(add_callback, loop)
  34. loop.add_callback(lambda: {})
  35. loop.add_callback(lambda: [])
  36. loop.add_timeout(datetime.timedelta(milliseconds=50), loop.stop)
  37. loop.start()
  38. self.assertLess(self.calls, 10)
  39. @skipOnTravis
  40. def test_add_callback_wakeup(self):
  41. # Make sure that add_callback from inside a running IOLoop
  42. # wakes up the IOLoop immediately instead of waiting for a timeout.
  43. def callback():
  44. self.called = True
  45. self.stop()
  46. def schedule_callback():
  47. self.called = False
  48. self.io_loop.add_callback(callback)
  49. # Store away the time so we can check if we woke up immediately
  50. self.start_time = time.time()
  51. self.io_loop.add_timeout(self.io_loop.time(), schedule_callback)
  52. self.wait()
  53. self.assertAlmostEqual(time.time(), self.start_time, places=2)
  54. self.assertTrue(self.called)
  55. @skipOnTravis
  56. def test_add_callback_wakeup_other_thread(self):
  57. def target():
  58. # sleep a bit to let the ioloop go into its poll loop
  59. time.sleep(0.01)
  60. self.stop_time = time.time()
  61. self.io_loop.add_callback(self.stop)
  62. thread = threading.Thread(target=target)
  63. self.io_loop.add_callback(thread.start)
  64. self.wait()
  65. delta = time.time() - self.stop_time
  66. self.assertLess(delta, 0.1)
  67. thread.join()
  68. def test_add_timeout_timedelta(self):
  69. self.io_loop.add_timeout(datetime.timedelta(microseconds=1), self.stop)
  70. self.wait()
  71. def test_multiple_add(self):
  72. sock, port = bind_unused_port()
  73. try:
  74. self.io_loop.add_handler(
  75. sock.fileno(), lambda fd, events: None, IOLoop.READ
  76. )
  77. # Attempting to add the same handler twice fails
  78. # (with a platform-dependent exception)
  79. self.assertRaises(
  80. Exception,
  81. self.io_loop.add_handler,
  82. sock.fileno(),
  83. lambda fd, events: None,
  84. IOLoop.READ,
  85. )
  86. finally:
  87. self.io_loop.remove_handler(sock.fileno())
  88. sock.close()
  89. def test_remove_without_add(self):
  90. # remove_handler should not throw an exception if called on an fd
  91. # was never added.
  92. sock, port = bind_unused_port()
  93. try:
  94. self.io_loop.remove_handler(sock.fileno())
  95. finally:
  96. sock.close()
  97. def test_add_callback_from_signal(self):
  98. # cheat a little bit and just run this normally, since we can't
  99. # easily simulate the races that happen with real signal handlers
  100. self.io_loop.add_callback_from_signal(self.stop)
  101. self.wait()
  102. def test_add_callback_from_signal_other_thread(self):
  103. # Very crude test, just to make sure that we cover this case.
  104. # This also happens to be the first test where we run an IOLoop in
  105. # a non-main thread.
  106. other_ioloop = IOLoop()
  107. thread = threading.Thread(target=other_ioloop.start)
  108. thread.start()
  109. other_ioloop.add_callback_from_signal(other_ioloop.stop)
  110. thread.join()
  111. other_ioloop.close()
  112. def test_add_callback_while_closing(self):
  113. # add_callback should not fail if it races with another thread
  114. # closing the IOLoop. The callbacks are dropped silently
  115. # without executing.
  116. closing = threading.Event()
  117. def target():
  118. other_ioloop.add_callback(other_ioloop.stop)
  119. other_ioloop.start()
  120. closing.set()
  121. other_ioloop.close(all_fds=True)
  122. other_ioloop = IOLoop()
  123. thread = threading.Thread(target=target)
  124. thread.start()
  125. closing.wait()
  126. for i in range(1000):
  127. other_ioloop.add_callback(lambda: None)
  128. @skipIfNonUnix # just because socketpair is so convenient
  129. def test_read_while_writeable(self):
  130. # Ensure that write events don't come in while we're waiting for
  131. # a read and haven't asked for writeability. (the reverse is
  132. # difficult to test for)
  133. client, server = socket.socketpair()
  134. try:
  135. def handler(fd, events):
  136. self.assertEqual(events, IOLoop.READ)
  137. self.stop()
  138. self.io_loop.add_handler(client.fileno(), handler, IOLoop.READ)
  139. self.io_loop.add_timeout(
  140. self.io_loop.time() + 0.01, functools.partial(server.send, b"asdf")
  141. )
  142. self.wait()
  143. self.io_loop.remove_handler(client.fileno())
  144. finally:
  145. client.close()
  146. server.close()
  147. def test_remove_timeout_after_fire(self):
  148. # It is not an error to call remove_timeout after it has run.
  149. handle = self.io_loop.add_timeout(self.io_loop.time(), self.stop)
  150. self.wait()
  151. self.io_loop.remove_timeout(handle)
  152. def test_remove_timeout_cleanup(self):
  153. # Add and remove enough callbacks to trigger cleanup.
  154. # Not a very thorough test, but it ensures that the cleanup code
  155. # gets executed and doesn't blow up. This test is only really useful
  156. # on PollIOLoop subclasses, but it should run silently on any
  157. # implementation.
  158. for i in range(2000):
  159. timeout = self.io_loop.add_timeout(self.io_loop.time() + 3600, lambda: None)
  160. self.io_loop.remove_timeout(timeout)
  161. # HACK: wait two IOLoop iterations for the GC to happen.
  162. self.io_loop.add_callback(lambda: self.io_loop.add_callback(self.stop))
  163. self.wait()
  164. def test_remove_timeout_from_timeout(self):
  165. calls = [False, False]
  166. # Schedule several callbacks and wait for them all to come due at once.
  167. # t2 should be cancelled by t1, even though it is already scheduled to
  168. # be run before the ioloop even looks at it.
  169. now = self.io_loop.time()
  170. def t1():
  171. calls[0] = True
  172. self.io_loop.remove_timeout(t2_handle)
  173. self.io_loop.add_timeout(now + 0.01, t1)
  174. def t2():
  175. calls[1] = True
  176. t2_handle = self.io_loop.add_timeout(now + 0.02, t2)
  177. self.io_loop.add_timeout(now + 0.03, self.stop)
  178. time.sleep(0.03)
  179. self.wait()
  180. self.assertEqual(calls, [True, False])
  181. def test_timeout_with_arguments(self):
  182. # This tests that all the timeout methods pass through *args correctly.
  183. results = [] # type: List[int]
  184. self.io_loop.add_timeout(self.io_loop.time(), results.append, 1)
  185. self.io_loop.add_timeout(datetime.timedelta(seconds=0), results.append, 2)
  186. self.io_loop.call_at(self.io_loop.time(), results.append, 3)
  187. self.io_loop.call_later(0, results.append, 4)
  188. self.io_loop.call_later(0, self.stop)
  189. self.wait()
  190. # The asyncio event loop does not guarantee the order of these
  191. # callbacks.
  192. self.assertEqual(sorted(results), [1, 2, 3, 4])
  193. def test_add_timeout_return(self):
  194. # All the timeout methods return non-None handles that can be
  195. # passed to remove_timeout.
  196. handle = self.io_loop.add_timeout(self.io_loop.time(), lambda: None)
  197. self.assertFalse(handle is None)
  198. self.io_loop.remove_timeout(handle)
  199. def test_call_at_return(self):
  200. handle = self.io_loop.call_at(self.io_loop.time(), lambda: None)
  201. self.assertFalse(handle is None)
  202. self.io_loop.remove_timeout(handle)
  203. def test_call_later_return(self):
  204. handle = self.io_loop.call_later(0, lambda: None)
  205. self.assertFalse(handle is None)
  206. self.io_loop.remove_timeout(handle)
  207. def test_close_file_object(self):
  208. """When a file object is used instead of a numeric file descriptor,
  209. the object should be closed (by IOLoop.close(all_fds=True),
  210. not just the fd.
  211. """
  212. # Use a socket since they are supported by IOLoop on all platforms.
  213. # Unfortunately, sockets don't support the .closed attribute for
  214. # inspecting their close status, so we must use a wrapper.
  215. class SocketWrapper(object):
  216. def __init__(self, sockobj):
  217. self.sockobj = sockobj
  218. self.closed = False
  219. def fileno(self):
  220. return self.sockobj.fileno()
  221. def close(self):
  222. self.closed = True
  223. self.sockobj.close()
  224. sockobj, port = bind_unused_port()
  225. socket_wrapper = SocketWrapper(sockobj)
  226. io_loop = IOLoop()
  227. io_loop.add_handler(socket_wrapper, lambda fd, events: None, IOLoop.READ)
  228. io_loop.close(all_fds=True)
  229. self.assertTrue(socket_wrapper.closed)
  230. def test_handler_callback_file_object(self):
  231. """The handler callback receives the same fd object it passed in."""
  232. server_sock, port = bind_unused_port()
  233. fds = []
  234. def handle_connection(fd, events):
  235. fds.append(fd)
  236. conn, addr = server_sock.accept()
  237. conn.close()
  238. self.stop()
  239. self.io_loop.add_handler(server_sock, handle_connection, IOLoop.READ)
  240. with contextlib.closing(socket.socket()) as client_sock:
  241. client_sock.connect(("127.0.0.1", port))
  242. self.wait()
  243. self.io_loop.remove_handler(server_sock)
  244. self.io_loop.add_handler(server_sock.fileno(), handle_connection, IOLoop.READ)
  245. with contextlib.closing(socket.socket()) as client_sock:
  246. client_sock.connect(("127.0.0.1", port))
  247. self.wait()
  248. self.assertIs(fds[0], server_sock)
  249. self.assertEqual(fds[1], server_sock.fileno())
  250. self.io_loop.remove_handler(server_sock.fileno())
  251. server_sock.close()
  252. def test_mixed_fd_fileobj(self):
  253. server_sock, port = bind_unused_port()
  254. def f(fd, events):
  255. pass
  256. self.io_loop.add_handler(server_sock, f, IOLoop.READ)
  257. with self.assertRaises(Exception):
  258. # The exact error is unspecified - some implementations use
  259. # IOError, others use ValueError.
  260. self.io_loop.add_handler(server_sock.fileno(), f, IOLoop.READ)
  261. self.io_loop.remove_handler(server_sock.fileno())
  262. server_sock.close()
  263. def test_reentrant(self):
  264. """Calling start() twice should raise an error, not deadlock."""
  265. returned_from_start = [False]
  266. got_exception = [False]
  267. def callback():
  268. try:
  269. self.io_loop.start()
  270. returned_from_start[0] = True
  271. except Exception:
  272. got_exception[0] = True
  273. self.stop()
  274. self.io_loop.add_callback(callback)
  275. self.wait()
  276. self.assertTrue(got_exception[0])
  277. self.assertFalse(returned_from_start[0])
  278. def test_exception_logging(self):
  279. """Uncaught exceptions get logged by the IOLoop."""
  280. self.io_loop.add_callback(lambda: 1 / 0)
  281. self.io_loop.add_callback(self.stop)
  282. with ExpectLog(app_log, "Exception in callback"):
  283. self.wait()
  284. def test_exception_logging_future(self):
  285. """The IOLoop examines exceptions from Futures and logs them."""
  286. @gen.coroutine
  287. def callback():
  288. self.io_loop.add_callback(self.stop)
  289. 1 / 0
  290. self.io_loop.add_callback(callback)
  291. with ExpectLog(app_log, "Exception in callback"):
  292. self.wait()
  293. def test_exception_logging_native_coro(self):
  294. """The IOLoop examines exceptions from awaitables and logs them."""
  295. async def callback():
  296. # Stop the IOLoop two iterations after raising an exception
  297. # to give the exception time to be logged.
  298. self.io_loop.add_callback(self.io_loop.add_callback, self.stop)
  299. 1 / 0
  300. self.io_loop.add_callback(callback)
  301. with ExpectLog(app_log, "Exception in callback"):
  302. self.wait()
  303. def test_spawn_callback(self):
  304. # Both add_callback and spawn_callback run directly on the IOLoop,
  305. # so their errors are logged without stopping the test.
  306. self.io_loop.add_callback(lambda: 1 / 0)
  307. self.io_loop.add_callback(self.stop)
  308. with ExpectLog(app_log, "Exception in callback"):
  309. self.wait()
  310. # A spawned callback is run directly on the IOLoop, so it will be
  311. # logged without stopping the test.
  312. self.io_loop.spawn_callback(lambda: 1 / 0)
  313. self.io_loop.add_callback(self.stop)
  314. with ExpectLog(app_log, "Exception in callback"):
  315. self.wait()
  316. @skipIfNonUnix
  317. def test_remove_handler_from_handler(self):
  318. # Create two sockets with simultaneous read events.
  319. client, server = socket.socketpair()
  320. try:
  321. client.send(b"abc")
  322. server.send(b"abc")
  323. # After reading from one fd, remove the other from the IOLoop.
  324. chunks = []
  325. def handle_read(fd, events):
  326. chunks.append(fd.recv(1024))
  327. if fd is client:
  328. self.io_loop.remove_handler(server)
  329. else:
  330. self.io_loop.remove_handler(client)
  331. self.io_loop.add_handler(client, handle_read, self.io_loop.READ)
  332. self.io_loop.add_handler(server, handle_read, self.io_loop.READ)
  333. self.io_loop.call_later(0.1, self.stop)
  334. self.wait()
  335. # Only one fd was read; the other was cleanly removed.
  336. self.assertEqual(chunks, [b"abc"])
  337. finally:
  338. client.close()
  339. server.close()
  340. @gen_test
  341. def test_init_close_race(self):
  342. # Regression test for #2367
  343. def f():
  344. for i in range(10):
  345. loop = IOLoop()
  346. loop.close()
  347. yield gen.multi([self.io_loop.run_in_executor(None, f) for i in range(2)])
  348. # Deliberately not a subclass of AsyncTestCase so the IOLoop isn't
  349. # automatically set as current.
  350. class TestIOLoopCurrent(unittest.TestCase):
  351. def setUp(self):
  352. self.io_loop = None
  353. IOLoop.clear_current()
  354. def tearDown(self):
  355. if self.io_loop is not None:
  356. self.io_loop.close()
  357. def test_default_current(self):
  358. self.io_loop = IOLoop()
  359. # The first IOLoop with default arguments is made current.
  360. self.assertIs(self.io_loop, IOLoop.current())
  361. # A second IOLoop can be created but is not made current.
  362. io_loop2 = IOLoop()
  363. self.assertIs(self.io_loop, IOLoop.current())
  364. io_loop2.close()
  365. def test_non_current(self):
  366. self.io_loop = IOLoop(make_current=False)
  367. # The new IOLoop is not initially made current.
  368. self.assertIsNone(IOLoop.current(instance=False))
  369. # Starting the IOLoop makes it current, and stopping the loop
  370. # makes it non-current. This process is repeatable.
  371. for i in range(3):
  372. def f():
  373. self.current_io_loop = IOLoop.current()
  374. self.io_loop.stop()
  375. self.io_loop.add_callback(f)
  376. self.io_loop.start()
  377. self.assertIs(self.current_io_loop, self.io_loop)
  378. # Now that the loop is stopped, it is no longer current.
  379. self.assertIsNone(IOLoop.current(instance=False))
  380. def test_force_current(self):
  381. self.io_loop = IOLoop(make_current=True)
  382. self.assertIs(self.io_loop, IOLoop.current())
  383. with self.assertRaises(RuntimeError):
  384. # A second make_current=True construction cannot succeed.
  385. IOLoop(make_current=True)
  386. # current() was not affected by the failed construction.
  387. self.assertIs(self.io_loop, IOLoop.current())
  388. class TestIOLoopCurrentAsync(AsyncTestCase):
  389. @gen_test
  390. def test_clear_without_current(self):
  391. # If there is no current IOLoop, clear_current is a no-op (but
  392. # should not fail). Use a thread so we see the threading.Local
  393. # in a pristine state.
  394. with ThreadPoolExecutor(1) as e:
  395. yield e.submit(IOLoop.clear_current)
  396. class TestIOLoopFutures(AsyncTestCase):
  397. def test_add_future_threads(self):
  398. with futures.ThreadPoolExecutor(1) as pool:
  399. def dummy():
  400. pass
  401. self.io_loop.add_future(
  402. pool.submit(dummy), lambda future: self.stop(future)
  403. )
  404. future = self.wait()
  405. self.assertTrue(future.done())
  406. self.assertTrue(future.result() is None)
  407. @gen_test
  408. def test_run_in_executor_gen(self):
  409. event1 = threading.Event()
  410. event2 = threading.Event()
  411. def sync_func(self_event, other_event):
  412. self_event.set()
  413. other_event.wait()
  414. # Note that return value doesn't actually do anything,
  415. # it is just passed through to our final assertion to
  416. # make sure it is passed through properly.
  417. return self_event
  418. # Run two synchronous functions, which would deadlock if not
  419. # run in parallel.
  420. res = yield [
  421. IOLoop.current().run_in_executor(None, sync_func, event1, event2),
  422. IOLoop.current().run_in_executor(None, sync_func, event2, event1),
  423. ]
  424. self.assertEqual([event1, event2], res)
  425. @gen_test
  426. def test_run_in_executor_native(self):
  427. event1 = threading.Event()
  428. event2 = threading.Event()
  429. def sync_func(self_event, other_event):
  430. self_event.set()
  431. other_event.wait()
  432. return self_event
  433. # Go through an async wrapper to ensure that the result of
  434. # run_in_executor works with await and not just gen.coroutine
  435. # (simply passing the underlying concurrrent future would do that).
  436. async def async_wrapper(self_event, other_event):
  437. return await IOLoop.current().run_in_executor(
  438. None, sync_func, self_event, other_event
  439. )
  440. res = yield [async_wrapper(event1, event2), async_wrapper(event2, event1)]
  441. self.assertEqual([event1, event2], res)
  442. @gen_test
  443. def test_set_default_executor(self):
  444. count = [0]
  445. class MyExecutor(futures.ThreadPoolExecutor):
  446. def submit(self, func, *args):
  447. count[0] += 1
  448. return super(MyExecutor, self).submit(func, *args)
  449. event = threading.Event()
  450. def sync_func():
  451. event.set()
  452. executor = MyExecutor(1)
  453. loop = IOLoop.current()
  454. loop.set_default_executor(executor)
  455. yield loop.run_in_executor(None, sync_func)
  456. self.assertEqual(1, count[0])
  457. self.assertTrue(event.is_set())
  458. class TestIOLoopRunSync(unittest.TestCase):
  459. def setUp(self):
  460. self.io_loop = IOLoop()
  461. def tearDown(self):
  462. self.io_loop.close()
  463. def test_sync_result(self):
  464. with self.assertRaises(gen.BadYieldError):
  465. self.io_loop.run_sync(lambda: 42)
  466. def test_sync_exception(self):
  467. with self.assertRaises(ZeroDivisionError):
  468. self.io_loop.run_sync(lambda: 1 / 0)
  469. def test_async_result(self):
  470. @gen.coroutine
  471. def f():
  472. yield gen.moment
  473. raise gen.Return(42)
  474. self.assertEqual(self.io_loop.run_sync(f), 42)
  475. def test_async_exception(self):
  476. @gen.coroutine
  477. def f():
  478. yield gen.moment
  479. 1 / 0
  480. with self.assertRaises(ZeroDivisionError):
  481. self.io_loop.run_sync(f)
  482. def test_current(self):
  483. def f():
  484. self.assertIs(IOLoop.current(), self.io_loop)
  485. self.io_loop.run_sync(f)
  486. def test_timeout(self):
  487. @gen.coroutine
  488. def f():
  489. yield gen.sleep(1)
  490. self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=0.01)
  491. def test_native_coroutine(self):
  492. @gen.coroutine
  493. def f1():
  494. yield gen.moment
  495. async def f2():
  496. await f1()
  497. self.io_loop.run_sync(f2)
  498. class TestPeriodicCallbackMath(unittest.TestCase):
  499. def simulate_calls(self, pc, durations):
  500. """Simulate a series of calls to the PeriodicCallback.
  501. Pass a list of call durations in seconds (negative values
  502. work to simulate clock adjustments during the call, or more or
  503. less equivalently, between calls). This method returns the
  504. times at which each call would be made.
  505. """
  506. calls = []
  507. now = 1000
  508. pc._next_timeout = now
  509. for d in durations:
  510. pc._update_next(now)
  511. calls.append(pc._next_timeout)
  512. now = pc._next_timeout + d
  513. return calls
  514. def dummy(self):
  515. pass
  516. def test_basic(self):
  517. pc = PeriodicCallback(self.dummy, 10000)
  518. self.assertEqual(
  519. self.simulate_calls(pc, [0] * 5), [1010, 1020, 1030, 1040, 1050]
  520. )
  521. def test_overrun(self):
  522. # If a call runs for too long, we skip entire cycles to get
  523. # back on schedule.
  524. call_durations = [9, 9, 10, 11, 20, 20, 35, 35, 0, 0, 0]
  525. expected = [
  526. 1010,
  527. 1020,
  528. 1030, # first 3 calls on schedule
  529. 1050,
  530. 1070, # next 2 delayed one cycle
  531. 1100,
  532. 1130, # next 2 delayed 2 cycles
  533. 1170,
  534. 1210, # next 2 delayed 3 cycles
  535. 1220,
  536. 1230, # then back on schedule.
  537. ]
  538. pc = PeriodicCallback(self.dummy, 10000)
  539. self.assertEqual(self.simulate_calls(pc, call_durations), expected)
  540. def test_clock_backwards(self):
  541. pc = PeriodicCallback(self.dummy, 10000)
  542. # Backwards jumps are ignored, potentially resulting in a
  543. # slightly slow schedule (although we assume that when
  544. # time.time() and time.monotonic() are different, time.time()
  545. # is getting adjusted by NTP and is therefore more accurate)
  546. self.assertEqual(
  547. self.simulate_calls(pc, [-2, -1, -3, -2, 0]), [1010, 1020, 1030, 1040, 1050]
  548. )
  549. # For big jumps, we should perhaps alter the schedule, but we
  550. # don't currently. This trace shows that we run callbacks
  551. # every 10s of time.time(), but the first and second calls are
  552. # 110s of real time apart because the backwards jump is
  553. # ignored.
  554. self.assertEqual(self.simulate_calls(pc, [-100, 0, 0]), [1010, 1020, 1030])
  555. def test_jitter(self):
  556. random_times = [0.5, 1, 0, 0.75]
  557. expected = [1010, 1022.5, 1030, 1041.25]
  558. call_durations = [0] * len(random_times)
  559. pc = PeriodicCallback(self.dummy, 10000, jitter=0.5)
  560. def mock_random():
  561. return random_times.pop(0)
  562. with mock.patch("random.random", mock_random):
  563. self.assertEqual(self.simulate_calls(pc, call_durations), expected)
  564. class TestIOLoopConfiguration(unittest.TestCase):
  565. def run_python(self, *statements):
  566. stmt_list = [
  567. "from tornado.ioloop import IOLoop",
  568. "classname = lambda x: x.__class__.__name__",
  569. ] + list(statements)
  570. args = [sys.executable, "-c", "; ".join(stmt_list)]
  571. return native_str(subprocess.check_output(args)).strip()
  572. def test_default(self):
  573. # When asyncio is available, it is used by default.
  574. cls = self.run_python("print(classname(IOLoop.current()))")
  575. self.assertEqual(cls, "AsyncIOMainLoop")
  576. cls = self.run_python("print(classname(IOLoop()))")
  577. self.assertEqual(cls, "AsyncIOLoop")
  578. def test_asyncio(self):
  579. cls = self.run_python(
  580. 'IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")',
  581. "print(classname(IOLoop.current()))",
  582. )
  583. self.assertEqual(cls, "AsyncIOMainLoop")
  584. def test_asyncio_main(self):
  585. cls = self.run_python(
  586. "from tornado.platform.asyncio import AsyncIOMainLoop",
  587. "AsyncIOMainLoop().install()",
  588. "print(classname(IOLoop.current()))",
  589. )
  590. self.assertEqual(cls, "AsyncIOMainLoop")
  591. if __name__ == "__main__":
  592. unittest.main()