gen_test.py 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. import asyncio
  2. from concurrent import futures
  3. import gc
  4. import datetime
  5. import platform
  6. import sys
  7. import time
  8. import weakref
  9. import unittest
  10. from tornado.concurrent import Future
  11. from tornado.log import app_log
  12. from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
  13. from tornado.test.util import skipOnTravis, skipNotCPython
  14. from tornado.web import Application, RequestHandler, HTTPError
  15. from tornado import gen
  16. try:
  17. import contextvars
  18. except ImportError:
  19. contextvars = None # type: ignore
  20. import typing
  21. if typing.TYPE_CHECKING:
  22. from typing import List, Optional # noqa: F401
  23. class GenBasicTest(AsyncTestCase):
  24. @gen.coroutine
  25. def delay(self, iterations, arg):
  26. """Returns arg after a number of IOLoop iterations."""
  27. for i in range(iterations):
  28. yield gen.moment
  29. raise gen.Return(arg)
  30. @gen.coroutine
  31. def async_future(self, result):
  32. yield gen.moment
  33. return result
  34. @gen.coroutine
  35. def async_exception(self, e):
  36. yield gen.moment
  37. raise e
  38. @gen.coroutine
  39. def add_one_async(self, x):
  40. yield gen.moment
  41. raise gen.Return(x + 1)
  42. def test_no_yield(self):
  43. @gen.coroutine
  44. def f():
  45. pass
  46. self.io_loop.run_sync(f)
  47. def test_exception_phase1(self):
  48. @gen.coroutine
  49. def f():
  50. 1 / 0
  51. self.assertRaises(ZeroDivisionError, self.io_loop.run_sync, f)
  52. def test_exception_phase2(self):
  53. @gen.coroutine
  54. def f():
  55. yield gen.moment
  56. 1 / 0
  57. self.assertRaises(ZeroDivisionError, self.io_loop.run_sync, f)
  58. def test_bogus_yield(self):
  59. @gen.coroutine
  60. def f():
  61. yield 42
  62. self.assertRaises(gen.BadYieldError, self.io_loop.run_sync, f)
  63. def test_bogus_yield_tuple(self):
  64. @gen.coroutine
  65. def f():
  66. yield (1, 2)
  67. self.assertRaises(gen.BadYieldError, self.io_loop.run_sync, f)
  68. def test_reuse(self):
  69. @gen.coroutine
  70. def f():
  71. yield gen.moment
  72. self.io_loop.run_sync(f)
  73. self.io_loop.run_sync(f)
  74. def test_none(self):
  75. @gen.coroutine
  76. def f():
  77. yield None
  78. self.io_loop.run_sync(f)
  79. def test_multi(self):
  80. @gen.coroutine
  81. def f():
  82. results = yield [self.add_one_async(1), self.add_one_async(2)]
  83. self.assertEqual(results, [2, 3])
  84. self.io_loop.run_sync(f)
  85. def test_multi_dict(self):
  86. @gen.coroutine
  87. def f():
  88. results = yield dict(foo=self.add_one_async(1), bar=self.add_one_async(2))
  89. self.assertEqual(results, dict(foo=2, bar=3))
  90. self.io_loop.run_sync(f)
  91. def test_multi_delayed(self):
  92. @gen.coroutine
  93. def f():
  94. # callbacks run at different times
  95. responses = yield gen.multi_future(
  96. [self.delay(3, "v1"), self.delay(1, "v2")]
  97. )
  98. self.assertEqual(responses, ["v1", "v2"])
  99. self.io_loop.run_sync(f)
  100. def test_multi_dict_delayed(self):
  101. @gen.coroutine
  102. def f():
  103. # callbacks run at different times
  104. responses = yield gen.multi_future(
  105. dict(foo=self.delay(3, "v1"), bar=self.delay(1, "v2"))
  106. )
  107. self.assertEqual(responses, dict(foo="v1", bar="v2"))
  108. self.io_loop.run_sync(f)
  109. @skipOnTravis
  110. @gen_test
  111. def test_multi_performance(self):
  112. # Yielding a list used to have quadratic performance; make
  113. # sure a large list stays reasonable. On my laptop a list of
  114. # 2000 used to take 1.8s, now it takes 0.12.
  115. start = time.time()
  116. yield [gen.moment for i in range(2000)]
  117. end = time.time()
  118. self.assertLess(end - start, 1.0)
  119. @gen_test
  120. def test_multi_empty(self):
  121. # Empty lists or dicts should return the same type.
  122. x = yield []
  123. self.assertTrue(isinstance(x, list))
  124. y = yield {}
  125. self.assertTrue(isinstance(y, dict))
  126. @gen_test
  127. def test_future(self):
  128. result = yield self.async_future(1)
  129. self.assertEqual(result, 1)
  130. @gen_test
  131. def test_multi_future(self):
  132. results = yield [self.async_future(1), self.async_future(2)]
  133. self.assertEqual(results, [1, 2])
  134. @gen_test
  135. def test_multi_future_duplicate(self):
  136. # Note that this doesn't work with native corotines, only with
  137. # decorated coroutines.
  138. f = self.async_future(2)
  139. results = yield [self.async_future(1), f, self.async_future(3), f]
  140. self.assertEqual(results, [1, 2, 3, 2])
  141. @gen_test
  142. def test_multi_dict_future(self):
  143. results = yield dict(foo=self.async_future(1), bar=self.async_future(2))
  144. self.assertEqual(results, dict(foo=1, bar=2))
  145. @gen_test
  146. def test_multi_exceptions(self):
  147. with ExpectLog(app_log, "Multiple exceptions in yield list"):
  148. with self.assertRaises(RuntimeError) as cm:
  149. yield gen.Multi(
  150. [
  151. self.async_exception(RuntimeError("error 1")),
  152. self.async_exception(RuntimeError("error 2")),
  153. ]
  154. )
  155. self.assertEqual(str(cm.exception), "error 1")
  156. # With only one exception, no error is logged.
  157. with self.assertRaises(RuntimeError):
  158. yield gen.Multi(
  159. [self.async_exception(RuntimeError("error 1")), self.async_future(2)]
  160. )
  161. # Exception logging may be explicitly quieted.
  162. with self.assertRaises(RuntimeError):
  163. yield gen.Multi(
  164. [
  165. self.async_exception(RuntimeError("error 1")),
  166. self.async_exception(RuntimeError("error 2")),
  167. ],
  168. quiet_exceptions=RuntimeError,
  169. )
  170. @gen_test
  171. def test_multi_future_exceptions(self):
  172. with ExpectLog(app_log, "Multiple exceptions in yield list"):
  173. with self.assertRaises(RuntimeError) as cm:
  174. yield [
  175. self.async_exception(RuntimeError("error 1")),
  176. self.async_exception(RuntimeError("error 2")),
  177. ]
  178. self.assertEqual(str(cm.exception), "error 1")
  179. # With only one exception, no error is logged.
  180. with self.assertRaises(RuntimeError):
  181. yield [self.async_exception(RuntimeError("error 1")), self.async_future(2)]
  182. # Exception logging may be explicitly quieted.
  183. with self.assertRaises(RuntimeError):
  184. yield gen.multi_future(
  185. [
  186. self.async_exception(RuntimeError("error 1")),
  187. self.async_exception(RuntimeError("error 2")),
  188. ],
  189. quiet_exceptions=RuntimeError,
  190. )
  191. def test_sync_raise_return(self):
  192. @gen.coroutine
  193. def f():
  194. raise gen.Return()
  195. self.io_loop.run_sync(f)
  196. def test_async_raise_return(self):
  197. @gen.coroutine
  198. def f():
  199. yield gen.moment
  200. raise gen.Return()
  201. self.io_loop.run_sync(f)
  202. def test_sync_raise_return_value(self):
  203. @gen.coroutine
  204. def f():
  205. raise gen.Return(42)
  206. self.assertEqual(42, self.io_loop.run_sync(f))
  207. def test_sync_raise_return_value_tuple(self):
  208. @gen.coroutine
  209. def f():
  210. raise gen.Return((1, 2))
  211. self.assertEqual((1, 2), self.io_loop.run_sync(f))
  212. def test_async_raise_return_value(self):
  213. @gen.coroutine
  214. def f():
  215. yield gen.moment
  216. raise gen.Return(42)
  217. self.assertEqual(42, self.io_loop.run_sync(f))
  218. def test_async_raise_return_value_tuple(self):
  219. @gen.coroutine
  220. def f():
  221. yield gen.moment
  222. raise gen.Return((1, 2))
  223. self.assertEqual((1, 2), self.io_loop.run_sync(f))
  224. class GenCoroutineTest(AsyncTestCase):
  225. def setUp(self):
  226. # Stray StopIteration exceptions can lead to tests exiting prematurely,
  227. # so we need explicit checks here to make sure the tests run all
  228. # the way through.
  229. self.finished = False
  230. super(GenCoroutineTest, self).setUp()
  231. def tearDown(self):
  232. super(GenCoroutineTest, self).tearDown()
  233. assert self.finished
  234. def test_attributes(self):
  235. self.finished = True
  236. def f():
  237. yield gen.moment
  238. coro = gen.coroutine(f)
  239. self.assertEqual(coro.__name__, f.__name__)
  240. self.assertEqual(coro.__module__, f.__module__)
  241. self.assertIs(coro.__wrapped__, f) # type: ignore
  242. def test_is_coroutine_function(self):
  243. self.finished = True
  244. def f():
  245. yield gen.moment
  246. coro = gen.coroutine(f)
  247. self.assertFalse(gen.is_coroutine_function(f))
  248. self.assertTrue(gen.is_coroutine_function(coro))
  249. self.assertFalse(gen.is_coroutine_function(coro()))
  250. @gen_test
  251. def test_sync_gen_return(self):
  252. @gen.coroutine
  253. def f():
  254. raise gen.Return(42)
  255. result = yield f()
  256. self.assertEqual(result, 42)
  257. self.finished = True
  258. @gen_test
  259. def test_async_gen_return(self):
  260. @gen.coroutine
  261. def f():
  262. yield gen.moment
  263. raise gen.Return(42)
  264. result = yield f()
  265. self.assertEqual(result, 42)
  266. self.finished = True
  267. @gen_test
  268. def test_sync_return(self):
  269. @gen.coroutine
  270. def f():
  271. return 42
  272. result = yield f()
  273. self.assertEqual(result, 42)
  274. self.finished = True
  275. @gen_test
  276. def test_async_return(self):
  277. @gen.coroutine
  278. def f():
  279. yield gen.moment
  280. return 42
  281. result = yield f()
  282. self.assertEqual(result, 42)
  283. self.finished = True
  284. @gen_test
  285. def test_async_early_return(self):
  286. # A yield statement exists but is not executed, which means
  287. # this function "returns" via an exception. This exception
  288. # doesn't happen before the exception handling is set up.
  289. @gen.coroutine
  290. def f():
  291. if True:
  292. return 42
  293. yield gen.Task(self.io_loop.add_callback)
  294. result = yield f()
  295. self.assertEqual(result, 42)
  296. self.finished = True
  297. @gen_test
  298. def test_async_await(self):
  299. @gen.coroutine
  300. def f1():
  301. yield gen.moment
  302. raise gen.Return(42)
  303. # This test verifies that an async function can await a
  304. # yield-based gen.coroutine, and that a gen.coroutine
  305. # (the test method itself) can yield an async function.
  306. async def f2():
  307. result = await f1()
  308. return result
  309. result = yield f2()
  310. self.assertEqual(result, 42)
  311. self.finished = True
  312. @gen_test
  313. def test_asyncio_sleep_zero(self):
  314. # asyncio.sleep(0) turns into a special case (equivalent to
  315. # `yield None`)
  316. async def f():
  317. import asyncio
  318. await asyncio.sleep(0)
  319. return 42
  320. result = yield f()
  321. self.assertEqual(result, 42)
  322. self.finished = True
  323. @gen_test
  324. def test_async_await_mixed_multi_native_future(self):
  325. @gen.coroutine
  326. def f1():
  327. yield gen.moment
  328. async def f2():
  329. await f1()
  330. return 42
  331. @gen.coroutine
  332. def f3():
  333. yield gen.moment
  334. raise gen.Return(43)
  335. results = yield [f2(), f3()]
  336. self.assertEqual(results, [42, 43])
  337. self.finished = True
  338. @gen_test
  339. def test_async_with_timeout(self):
  340. async def f1():
  341. return 42
  342. result = yield gen.with_timeout(datetime.timedelta(hours=1), f1())
  343. self.assertEqual(result, 42)
  344. self.finished = True
  345. @gen_test
  346. def test_sync_return_no_value(self):
  347. @gen.coroutine
  348. def f():
  349. return
  350. result = yield f()
  351. self.assertEqual(result, None)
  352. self.finished = True
  353. @gen_test
  354. def test_async_return_no_value(self):
  355. # Without a return value we don't need python 3.3.
  356. @gen.coroutine
  357. def f():
  358. yield gen.moment
  359. return
  360. result = yield f()
  361. self.assertEqual(result, None)
  362. self.finished = True
  363. @gen_test
  364. def test_sync_raise(self):
  365. @gen.coroutine
  366. def f():
  367. 1 / 0
  368. # The exception is raised when the future is yielded
  369. # (or equivalently when its result method is called),
  370. # not when the function itself is called).
  371. future = f()
  372. with self.assertRaises(ZeroDivisionError):
  373. yield future
  374. self.finished = True
  375. @gen_test
  376. def test_async_raise(self):
  377. @gen.coroutine
  378. def f():
  379. yield gen.moment
  380. 1 / 0
  381. future = f()
  382. with self.assertRaises(ZeroDivisionError):
  383. yield future
  384. self.finished = True
  385. @gen_test
  386. def test_replace_yieldpoint_exception(self):
  387. # Test exception handling: a coroutine can catch one exception
  388. # raised by a yield point and raise a different one.
  389. @gen.coroutine
  390. def f1():
  391. 1 / 0
  392. @gen.coroutine
  393. def f2():
  394. try:
  395. yield f1()
  396. except ZeroDivisionError:
  397. raise KeyError()
  398. future = f2()
  399. with self.assertRaises(KeyError):
  400. yield future
  401. self.finished = True
  402. @gen_test
  403. def test_swallow_yieldpoint_exception(self):
  404. # Test exception handling: a coroutine can catch an exception
  405. # raised by a yield point and not raise a different one.
  406. @gen.coroutine
  407. def f1():
  408. 1 / 0
  409. @gen.coroutine
  410. def f2():
  411. try:
  412. yield f1()
  413. except ZeroDivisionError:
  414. raise gen.Return(42)
  415. result = yield f2()
  416. self.assertEqual(result, 42)
  417. self.finished = True
  418. @gen_test
  419. def test_moment(self):
  420. calls = []
  421. @gen.coroutine
  422. def f(name, yieldable):
  423. for i in range(5):
  424. calls.append(name)
  425. yield yieldable
  426. # First, confirm the behavior without moment: each coroutine
  427. # monopolizes the event loop until it finishes.
  428. immediate = Future() # type: Future[None]
  429. immediate.set_result(None)
  430. yield [f("a", immediate), f("b", immediate)]
  431. self.assertEqual("".join(calls), "aaaaabbbbb")
  432. # With moment, they take turns.
  433. calls = []
  434. yield [f("a", gen.moment), f("b", gen.moment)]
  435. self.assertEqual("".join(calls), "ababababab")
  436. self.finished = True
  437. calls = []
  438. yield [f("a", gen.moment), f("b", immediate)]
  439. self.assertEqual("".join(calls), "abbbbbaaaa")
  440. @gen_test
  441. def test_sleep(self):
  442. yield gen.sleep(0.01)
  443. self.finished = True
  444. @gen_test
  445. def test_py3_leak_exception_context(self):
  446. class LeakedException(Exception):
  447. pass
  448. @gen.coroutine
  449. def inner(iteration):
  450. raise LeakedException(iteration)
  451. try:
  452. yield inner(1)
  453. except LeakedException as e:
  454. self.assertEqual(str(e), "1")
  455. self.assertIsNone(e.__context__)
  456. try:
  457. yield inner(2)
  458. except LeakedException as e:
  459. self.assertEqual(str(e), "2")
  460. self.assertIsNone(e.__context__)
  461. self.finished = True
  462. @skipNotCPython
  463. @unittest.skipIf(
  464. (3,) < sys.version_info < (3, 6), "asyncio.Future has reference cycles"
  465. )
  466. def test_coroutine_refcounting(self):
  467. # On CPython, tasks and their arguments should be released immediately
  468. # without waiting for garbage collection.
  469. @gen.coroutine
  470. def inner():
  471. class Foo(object):
  472. pass
  473. local_var = Foo()
  474. self.local_ref = weakref.ref(local_var)
  475. def dummy():
  476. pass
  477. yield gen.coroutine(dummy)()
  478. raise ValueError("Some error")
  479. @gen.coroutine
  480. def inner2():
  481. try:
  482. yield inner()
  483. except ValueError:
  484. pass
  485. self.io_loop.run_sync(inner2, timeout=3)
  486. self.assertIs(self.local_ref(), None)
  487. self.finished = True
  488. def test_asyncio_future_debug_info(self):
  489. self.finished = True
  490. # Enable debug mode
  491. asyncio_loop = asyncio.get_event_loop()
  492. self.addCleanup(asyncio_loop.set_debug, asyncio_loop.get_debug())
  493. asyncio_loop.set_debug(True)
  494. def f():
  495. yield gen.moment
  496. coro = gen.coroutine(f)()
  497. self.assertIsInstance(coro, asyncio.Future)
  498. # We expect the coroutine repr() to show the place where
  499. # it was instantiated
  500. expected = "created at %s:%d" % (__file__, f.__code__.co_firstlineno + 3)
  501. actual = repr(coro)
  502. self.assertIn(expected, actual)
  503. @gen_test
  504. def test_asyncio_gather(self):
  505. # This demonstrates that tornado coroutines can be understood
  506. # by asyncio (This failed prior to Tornado 5.0).
  507. @gen.coroutine
  508. def f():
  509. yield gen.moment
  510. raise gen.Return(1)
  511. ret = yield asyncio.gather(f(), f())
  512. self.assertEqual(ret, [1, 1])
  513. self.finished = True
  514. class GenCoroutineSequenceHandler(RequestHandler):
  515. @gen.coroutine
  516. def get(self):
  517. yield gen.moment
  518. self.write("1")
  519. yield gen.moment
  520. self.write("2")
  521. yield gen.moment
  522. self.finish("3")
  523. class GenCoroutineUnfinishedSequenceHandler(RequestHandler):
  524. @gen.coroutine
  525. def get(self):
  526. yield gen.moment
  527. self.write("1")
  528. yield gen.moment
  529. self.write("2")
  530. yield gen.moment
  531. # just write, don't finish
  532. self.write("3")
  533. # "Undecorated" here refers to the absence of @asynchronous.
  534. class UndecoratedCoroutinesHandler(RequestHandler):
  535. @gen.coroutine
  536. def prepare(self):
  537. self.chunks = [] # type: List[str]
  538. yield gen.moment
  539. self.chunks.append("1")
  540. @gen.coroutine
  541. def get(self):
  542. self.chunks.append("2")
  543. yield gen.moment
  544. self.chunks.append("3")
  545. yield gen.moment
  546. self.write("".join(self.chunks))
  547. class AsyncPrepareErrorHandler(RequestHandler):
  548. @gen.coroutine
  549. def prepare(self):
  550. yield gen.moment
  551. raise HTTPError(403)
  552. def get(self):
  553. self.finish("ok")
  554. class NativeCoroutineHandler(RequestHandler):
  555. async def get(self):
  556. await asyncio.sleep(0)
  557. self.write("ok")
  558. class GenWebTest(AsyncHTTPTestCase):
  559. def get_app(self):
  560. return Application(
  561. [
  562. ("/coroutine_sequence", GenCoroutineSequenceHandler),
  563. (
  564. "/coroutine_unfinished_sequence",
  565. GenCoroutineUnfinishedSequenceHandler,
  566. ),
  567. ("/undecorated_coroutine", UndecoratedCoroutinesHandler),
  568. ("/async_prepare_error", AsyncPrepareErrorHandler),
  569. ("/native_coroutine", NativeCoroutineHandler),
  570. ]
  571. )
  572. def test_coroutine_sequence_handler(self):
  573. response = self.fetch("/coroutine_sequence")
  574. self.assertEqual(response.body, b"123")
  575. def test_coroutine_unfinished_sequence_handler(self):
  576. response = self.fetch("/coroutine_unfinished_sequence")
  577. self.assertEqual(response.body, b"123")
  578. def test_undecorated_coroutines(self):
  579. response = self.fetch("/undecorated_coroutine")
  580. self.assertEqual(response.body, b"123")
  581. def test_async_prepare_error_handler(self):
  582. response = self.fetch("/async_prepare_error")
  583. self.assertEqual(response.code, 403)
  584. def test_native_coroutine_handler(self):
  585. response = self.fetch("/native_coroutine")
  586. self.assertEqual(response.code, 200)
  587. self.assertEqual(response.body, b"ok")
  588. class WithTimeoutTest(AsyncTestCase):
  589. @gen_test
  590. def test_timeout(self):
  591. with self.assertRaises(gen.TimeoutError):
  592. yield gen.with_timeout(datetime.timedelta(seconds=0.1), Future())
  593. @gen_test
  594. def test_completes_before_timeout(self):
  595. future = Future() # type: Future[str]
  596. self.io_loop.add_timeout(
  597. datetime.timedelta(seconds=0.1), lambda: future.set_result("asdf")
  598. )
  599. result = yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
  600. self.assertEqual(result, "asdf")
  601. @gen_test
  602. def test_fails_before_timeout(self):
  603. future = Future() # type: Future[str]
  604. self.io_loop.add_timeout(
  605. datetime.timedelta(seconds=0.1),
  606. lambda: future.set_exception(ZeroDivisionError()),
  607. )
  608. with self.assertRaises(ZeroDivisionError):
  609. yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
  610. @gen_test
  611. def test_already_resolved(self):
  612. future = Future() # type: Future[str]
  613. future.set_result("asdf")
  614. result = yield gen.with_timeout(datetime.timedelta(seconds=3600), future)
  615. self.assertEqual(result, "asdf")
  616. @gen_test
  617. def test_timeout_concurrent_future(self):
  618. # A concurrent future that does not resolve before the timeout.
  619. with futures.ThreadPoolExecutor(1) as executor:
  620. with self.assertRaises(gen.TimeoutError):
  621. yield gen.with_timeout(
  622. self.io_loop.time(), executor.submit(time.sleep, 0.1)
  623. )
  624. @gen_test
  625. def test_completed_concurrent_future(self):
  626. # A concurrent future that is resolved before we even submit it
  627. # to with_timeout.
  628. with futures.ThreadPoolExecutor(1) as executor:
  629. def dummy():
  630. pass
  631. f = executor.submit(dummy)
  632. f.result() # wait for completion
  633. yield gen.with_timeout(datetime.timedelta(seconds=3600), f)
  634. @gen_test
  635. def test_normal_concurrent_future(self):
  636. # A conccurrent future that resolves while waiting for the timeout.
  637. with futures.ThreadPoolExecutor(1) as executor:
  638. yield gen.with_timeout(
  639. datetime.timedelta(seconds=3600),
  640. executor.submit(lambda: time.sleep(0.01)),
  641. )
  642. class WaitIteratorTest(AsyncTestCase):
  643. @gen_test
  644. def test_empty_iterator(self):
  645. g = gen.WaitIterator()
  646. self.assertTrue(g.done(), "empty generator iterated")
  647. with self.assertRaises(ValueError):
  648. g = gen.WaitIterator(Future(), bar=Future())
  649. self.assertEqual(g.current_index, None, "bad nil current index")
  650. self.assertEqual(g.current_future, None, "bad nil current future")
  651. @gen_test
  652. def test_already_done(self):
  653. f1 = Future() # type: Future[int]
  654. f2 = Future() # type: Future[int]
  655. f3 = Future() # type: Future[int]
  656. f1.set_result(24)
  657. f2.set_result(42)
  658. f3.set_result(84)
  659. g = gen.WaitIterator(f1, f2, f3)
  660. i = 0
  661. while not g.done():
  662. r = yield g.next()
  663. # Order is not guaranteed, but the current implementation
  664. # preserves ordering of already-done Futures.
  665. if i == 0:
  666. self.assertEqual(g.current_index, 0)
  667. self.assertIs(g.current_future, f1)
  668. self.assertEqual(r, 24)
  669. elif i == 1:
  670. self.assertEqual(g.current_index, 1)
  671. self.assertIs(g.current_future, f2)
  672. self.assertEqual(r, 42)
  673. elif i == 2:
  674. self.assertEqual(g.current_index, 2)
  675. self.assertIs(g.current_future, f3)
  676. self.assertEqual(r, 84)
  677. i += 1
  678. self.assertEqual(g.current_index, None, "bad nil current index")
  679. self.assertEqual(g.current_future, None, "bad nil current future")
  680. dg = gen.WaitIterator(f1=f1, f2=f2)
  681. while not dg.done():
  682. dr = yield dg.next()
  683. if dg.current_index == "f1":
  684. self.assertTrue(
  685. dg.current_future == f1 and dr == 24,
  686. "WaitIterator dict status incorrect",
  687. )
  688. elif dg.current_index == "f2":
  689. self.assertTrue(
  690. dg.current_future == f2 and dr == 42,
  691. "WaitIterator dict status incorrect",
  692. )
  693. else:
  694. self.fail("got bad WaitIterator index {}".format(dg.current_index))
  695. i += 1
  696. self.assertEqual(dg.current_index, None, "bad nil current index")
  697. self.assertEqual(dg.current_future, None, "bad nil current future")
  698. def finish_coroutines(self, iteration, futures):
  699. if iteration == 3:
  700. futures[2].set_result(24)
  701. elif iteration == 5:
  702. futures[0].set_exception(ZeroDivisionError())
  703. elif iteration == 8:
  704. futures[1].set_result(42)
  705. futures[3].set_result(84)
  706. if iteration < 8:
  707. self.io_loop.add_callback(self.finish_coroutines, iteration + 1, futures)
  708. @gen_test
  709. def test_iterator(self):
  710. futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
  711. self.finish_coroutines(0, futures)
  712. g = gen.WaitIterator(*futures)
  713. i = 0
  714. while not g.done():
  715. try:
  716. r = yield g.next()
  717. except ZeroDivisionError:
  718. self.assertIs(g.current_future, futures[0], "exception future invalid")
  719. else:
  720. if i == 0:
  721. self.assertEqual(r, 24, "iterator value incorrect")
  722. self.assertEqual(g.current_index, 2, "wrong index")
  723. elif i == 2:
  724. self.assertEqual(r, 42, "iterator value incorrect")
  725. self.assertEqual(g.current_index, 1, "wrong index")
  726. elif i == 3:
  727. self.assertEqual(r, 84, "iterator value incorrect")
  728. self.assertEqual(g.current_index, 3, "wrong index")
  729. i += 1
  730. @gen_test
  731. def test_iterator_async_await(self):
  732. # Recreate the previous test with py35 syntax. It's a little clunky
  733. # because of the way the previous test handles an exception on
  734. # a single iteration.
  735. futures = [Future(), Future(), Future(), Future()] # type: List[Future[int]]
  736. self.finish_coroutines(0, futures)
  737. self.finished = False
  738. async def f():
  739. i = 0
  740. g = gen.WaitIterator(*futures)
  741. try:
  742. async for r in g:
  743. if i == 0:
  744. self.assertEqual(r, 24, "iterator value incorrect")
  745. self.assertEqual(g.current_index, 2, "wrong index")
  746. else:
  747. raise Exception("expected exception on iteration 1")
  748. i += 1
  749. except ZeroDivisionError:
  750. i += 1
  751. async for r in g:
  752. if i == 2:
  753. self.assertEqual(r, 42, "iterator value incorrect")
  754. self.assertEqual(g.current_index, 1, "wrong index")
  755. elif i == 3:
  756. self.assertEqual(r, 84, "iterator value incorrect")
  757. self.assertEqual(g.current_index, 3, "wrong index")
  758. else:
  759. raise Exception("didn't expect iteration %d" % i)
  760. i += 1
  761. self.finished = True
  762. yield f()
  763. self.assertTrue(self.finished)
  764. @gen_test
  765. def test_no_ref(self):
  766. # In this usage, there is no direct hard reference to the
  767. # WaitIterator itself, only the Future it returns. Since
  768. # WaitIterator uses weak references internally to improve GC
  769. # performance, this used to cause problems.
  770. yield gen.with_timeout(
  771. datetime.timedelta(seconds=0.1), gen.WaitIterator(gen.sleep(0)).next()
  772. )
  773. class RunnerGCTest(AsyncTestCase):
  774. def is_pypy3(self):
  775. return platform.python_implementation() == "PyPy" and sys.version_info > (3,)
  776. @gen_test
  777. def test_gc(self):
  778. # Github issue 1769: Runner objects can get GCed unexpectedly
  779. # while their future is alive.
  780. weakref_scope = [None] # type: List[Optional[weakref.ReferenceType]]
  781. def callback():
  782. gc.collect(2)
  783. weakref_scope[0]().set_result(123) # type: ignore
  784. @gen.coroutine
  785. def tester():
  786. fut = Future() # type: Future[int]
  787. weakref_scope[0] = weakref.ref(fut)
  788. self.io_loop.add_callback(callback)
  789. yield fut
  790. yield gen.with_timeout(datetime.timedelta(seconds=0.2), tester())
  791. def test_gc_infinite_coro(self):
  792. # Github issue 2229: suspended coroutines should be GCed when
  793. # their loop is closed, even if they're involved in a reference
  794. # cycle.
  795. loop = self.get_new_ioloop()
  796. result = [] # type: List[Optional[bool]]
  797. wfut = []
  798. @gen.coroutine
  799. def infinite_coro():
  800. try:
  801. while True:
  802. yield gen.sleep(1e-3)
  803. result.append(True)
  804. finally:
  805. # coroutine finalizer
  806. result.append(None)
  807. @gen.coroutine
  808. def do_something():
  809. fut = infinite_coro()
  810. fut._refcycle = fut # type: ignore
  811. wfut.append(weakref.ref(fut))
  812. yield gen.sleep(0.2)
  813. loop.run_sync(do_something)
  814. loop.close()
  815. gc.collect()
  816. # Future was collected
  817. self.assertIs(wfut[0](), None)
  818. # At least one wakeup
  819. self.assertGreaterEqual(len(result), 2)
  820. if not self.is_pypy3():
  821. # coroutine finalizer was called (not on PyPy3 apparently)
  822. self.assertIs(result[-1], None)
  823. def test_gc_infinite_async_await(self):
  824. # Same as test_gc_infinite_coro, but with a `async def` function
  825. import asyncio
  826. async def infinite_coro(result):
  827. try:
  828. while True:
  829. await gen.sleep(1e-3)
  830. result.append(True)
  831. finally:
  832. # coroutine finalizer
  833. result.append(None)
  834. loop = self.get_new_ioloop()
  835. result = [] # type: List[Optional[bool]]
  836. wfut = []
  837. @gen.coroutine
  838. def do_something():
  839. fut = asyncio.get_event_loop().create_task(infinite_coro(result))
  840. fut._refcycle = fut # type: ignore
  841. wfut.append(weakref.ref(fut))
  842. yield gen.sleep(0.2)
  843. loop.run_sync(do_something)
  844. with ExpectLog("asyncio", "Task was destroyed but it is pending"):
  845. loop.close()
  846. gc.collect()
  847. # Future was collected
  848. self.assertIs(wfut[0](), None)
  849. # At least one wakeup and one finally
  850. self.assertGreaterEqual(len(result), 2)
  851. if not self.is_pypy3():
  852. # coroutine finalizer was called (not on PyPy3 apparently)
  853. self.assertIs(result[-1], None)
  854. def test_multi_moment(self):
  855. # Test gen.multi with moment
  856. # now that it's not a real Future
  857. @gen.coroutine
  858. def wait_a_moment():
  859. result = yield gen.multi([gen.moment, gen.moment])
  860. raise gen.Return(result)
  861. loop = self.get_new_ioloop()
  862. result = loop.run_sync(wait_a_moment)
  863. self.assertEqual(result, [None, None])
  864. if contextvars is not None:
  865. ctx_var = contextvars.ContextVar("ctx_var") # type: contextvars.ContextVar[int]
  866. @unittest.skipIf(contextvars is None, "contextvars module not present")
  867. class ContextVarsTest(AsyncTestCase):
  868. async def native_root(self, x):
  869. ctx_var.set(x)
  870. await self.inner(x)
  871. @gen.coroutine
  872. def gen_root(self, x):
  873. ctx_var.set(x)
  874. yield
  875. yield self.inner(x)
  876. async def inner(self, x):
  877. self.assertEqual(ctx_var.get(), x)
  878. await self.gen_inner(x)
  879. self.assertEqual(ctx_var.get(), x)
  880. # IOLoop.run_in_executor doesn't automatically copy context
  881. ctx = contextvars.copy_context()
  882. await self.io_loop.run_in_executor(None, lambda: ctx.run(self.thread_inner, x))
  883. self.assertEqual(ctx_var.get(), x)
  884. # Neither does asyncio's run_in_executor.
  885. await asyncio.get_event_loop().run_in_executor(
  886. None, lambda: ctx.run(self.thread_inner, x)
  887. )
  888. self.assertEqual(ctx_var.get(), x)
  889. @gen.coroutine
  890. def gen_inner(self, x):
  891. self.assertEqual(ctx_var.get(), x)
  892. yield
  893. self.assertEqual(ctx_var.get(), x)
  894. def thread_inner(self, x):
  895. self.assertEqual(ctx_var.get(), x)
  896. @gen_test
  897. def test_propagate(self):
  898. # Verify that context vars get propagated across various
  899. # combinations of native and decorated coroutines.
  900. yield [
  901. self.native_root(1),
  902. self.native_root(2),
  903. self.gen_root(3),
  904. self.gen_root(4),
  905. ]
  906. if __name__ == "__main__":
  907. unittest.main()