asyncio_test.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  2. # not use this file except in compliance with the License. You may obtain
  3. # a copy of the License at
  4. #
  5. # http://www.apache.org/licenses/LICENSE-2.0
  6. #
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  9. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  10. # License for the specific language governing permissions and limitations
  11. # under the License.
  12. import asyncio
  13. import unittest
  14. from concurrent.futures import ThreadPoolExecutor
  15. from tornado import gen
  16. from tornado.ioloop import IOLoop
  17. from tornado.platform.asyncio import (
  18. AsyncIOLoop,
  19. to_asyncio_future,
  20. AnyThreadEventLoopPolicy,
  21. )
  22. from tornado.testing import AsyncTestCase, gen_test
  23. class AsyncIOLoopTest(AsyncTestCase):
  24. def get_new_ioloop(self):
  25. io_loop = AsyncIOLoop()
  26. return io_loop
  27. def test_asyncio_callback(self):
  28. # Basic test that the asyncio loop is set up correctly.
  29. asyncio.get_event_loop().call_soon(self.stop)
  30. self.wait()
  31. @gen_test
  32. def test_asyncio_future(self):
  33. # Test that we can yield an asyncio future from a tornado coroutine.
  34. # Without 'yield from', we must wrap coroutines in ensure_future,
  35. # which was introduced during Python 3.4, deprecating the prior "async".
  36. if hasattr(asyncio, "ensure_future"):
  37. ensure_future = asyncio.ensure_future
  38. else:
  39. # async is a reserved word in Python 3.7
  40. ensure_future = getattr(asyncio, "async")
  41. x = yield ensure_future(
  42. asyncio.get_event_loop().run_in_executor(None, lambda: 42)
  43. )
  44. self.assertEqual(x, 42)
  45. @gen_test
  46. def test_asyncio_yield_from(self):
  47. @gen.coroutine
  48. def f():
  49. event_loop = asyncio.get_event_loop()
  50. x = yield from event_loop.run_in_executor(None, lambda: 42)
  51. return x
  52. result = yield f()
  53. self.assertEqual(result, 42)
  54. def test_asyncio_adapter(self):
  55. # This test demonstrates that when using the asyncio coroutine
  56. # runner (i.e. run_until_complete), the to_asyncio_future
  57. # adapter is needed. No adapter is needed in the other direction,
  58. # as demonstrated by other tests in the package.
  59. @gen.coroutine
  60. def tornado_coroutine():
  61. yield gen.moment
  62. raise gen.Return(42)
  63. async def native_coroutine_without_adapter():
  64. return await tornado_coroutine()
  65. async def native_coroutine_with_adapter():
  66. return await to_asyncio_future(tornado_coroutine())
  67. # Use the adapter, but two degrees from the tornado coroutine.
  68. async def native_coroutine_with_adapter2():
  69. return await to_asyncio_future(native_coroutine_without_adapter())
  70. # Tornado supports native coroutines both with and without adapters
  71. self.assertEqual(self.io_loop.run_sync(native_coroutine_without_adapter), 42)
  72. self.assertEqual(self.io_loop.run_sync(native_coroutine_with_adapter), 42)
  73. self.assertEqual(self.io_loop.run_sync(native_coroutine_with_adapter2), 42)
  74. # Asyncio only supports coroutines that yield asyncio-compatible
  75. # Futures (which our Future is since 5.0).
  76. self.assertEqual(
  77. asyncio.get_event_loop().run_until_complete(
  78. native_coroutine_without_adapter()
  79. ),
  80. 42,
  81. )
  82. self.assertEqual(
  83. asyncio.get_event_loop().run_until_complete(
  84. native_coroutine_with_adapter()
  85. ),
  86. 42,
  87. )
  88. self.assertEqual(
  89. asyncio.get_event_loop().run_until_complete(
  90. native_coroutine_with_adapter2()
  91. ),
  92. 42,
  93. )
  94. class LeakTest(unittest.TestCase):
  95. def setUp(self):
  96. # Trigger a cleanup of the mapping so we start with a clean slate.
  97. AsyncIOLoop().close()
  98. # If we don't clean up after ourselves other tests may fail on
  99. # py34.
  100. self.orig_policy = asyncio.get_event_loop_policy()
  101. asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
  102. def tearDown(self):
  103. asyncio.get_event_loop().close()
  104. asyncio.set_event_loop_policy(self.orig_policy)
  105. def test_ioloop_close_leak(self):
  106. orig_count = len(IOLoop._ioloop_for_asyncio)
  107. for i in range(10):
  108. # Create and close an AsyncIOLoop using Tornado interfaces.
  109. loop = AsyncIOLoop()
  110. loop.close()
  111. new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
  112. self.assertEqual(new_count, 0)
  113. def test_asyncio_close_leak(self):
  114. orig_count = len(IOLoop._ioloop_for_asyncio)
  115. for i in range(10):
  116. # Create and close an AsyncIOMainLoop using asyncio interfaces.
  117. loop = asyncio.new_event_loop()
  118. loop.call_soon(IOLoop.current)
  119. loop.call_soon(loop.stop)
  120. loop.run_forever()
  121. loop.close()
  122. new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
  123. # Because the cleanup is run on new loop creation, we have one
  124. # dangling entry in the map (but only one).
  125. self.assertEqual(new_count, 1)
  126. class AnyThreadEventLoopPolicyTest(unittest.TestCase):
  127. def setUp(self):
  128. self.orig_policy = asyncio.get_event_loop_policy()
  129. self.executor = ThreadPoolExecutor(1)
  130. def tearDown(self):
  131. asyncio.set_event_loop_policy(self.orig_policy)
  132. self.executor.shutdown()
  133. def get_event_loop_on_thread(self):
  134. def get_and_close_event_loop():
  135. """Get the event loop. Close it if one is returned.
  136. Returns the (closed) event loop. This is a silly thing
  137. to do and leaves the thread in a broken state, but it's
  138. enough for this test. Closing the loop avoids resource
  139. leak warnings.
  140. """
  141. loop = asyncio.get_event_loop()
  142. loop.close()
  143. return loop
  144. future = self.executor.submit(get_and_close_event_loop)
  145. return future.result()
  146. def run_policy_test(self, accessor, expected_type):
  147. # With the default policy, non-main threads don't get an event
  148. # loop.
  149. self.assertRaises(
  150. (RuntimeError, AssertionError), self.executor.submit(accessor).result
  151. )
  152. # Set the policy and we can get a loop.
  153. asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
  154. self.assertIsInstance(self.executor.submit(accessor).result(), expected_type)
  155. # Clean up to silence leak warnings. Always use asyncio since
  156. # IOLoop doesn't (currently) close the underlying loop.
  157. self.executor.submit(lambda: asyncio.get_event_loop().close()).result()
  158. def test_asyncio_accessor(self):
  159. self.run_policy_test(asyncio.get_event_loop, asyncio.AbstractEventLoop)
  160. def test_tornado_accessor(self):
  161. self.run_policy_test(IOLoop.current, IOLoop)