concurrent_test.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. #
  2. # Copyright 2012 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. from concurrent import futures
  16. import logging
  17. import re
  18. import socket
  19. import unittest
  20. from tornado.concurrent import (
  21. Future,
  22. run_on_executor,
  23. future_set_result_unless_cancelled,
  24. )
  25. from tornado.escape import utf8, to_unicode
  26. from tornado import gen
  27. from tornado.iostream import IOStream
  28. from tornado.tcpserver import TCPServer
  29. from tornado.testing import AsyncTestCase, bind_unused_port, gen_test
  30. class MiscFutureTest(AsyncTestCase):
  31. def test_future_set_result_unless_cancelled(self):
  32. fut = Future() # type: Future[int]
  33. future_set_result_unless_cancelled(fut, 42)
  34. self.assertEqual(fut.result(), 42)
  35. self.assertFalse(fut.cancelled())
  36. fut = Future()
  37. fut.cancel()
  38. is_cancelled = fut.cancelled()
  39. future_set_result_unless_cancelled(fut, 42)
  40. self.assertEqual(fut.cancelled(), is_cancelled)
  41. if not is_cancelled:
  42. self.assertEqual(fut.result(), 42)
  43. # The following series of classes demonstrate and test various styles
  44. # of use, with and without generators and futures.
  45. class CapServer(TCPServer):
  46. @gen.coroutine
  47. def handle_stream(self, stream, address):
  48. data = yield stream.read_until(b"\n")
  49. data = to_unicode(data)
  50. if data == data.upper():
  51. stream.write(b"error\talready capitalized\n")
  52. else:
  53. # data already has \n
  54. stream.write(utf8("ok\t%s" % data.upper()))
  55. stream.close()
  56. class CapError(Exception):
  57. pass
  58. class BaseCapClient(object):
  59. def __init__(self, port):
  60. self.port = port
  61. def process_response(self, data):
  62. m = re.match("(.*)\t(.*)\n", to_unicode(data))
  63. if m is None:
  64. raise Exception("did not match")
  65. status, message = m.groups()
  66. if status == "ok":
  67. return message
  68. else:
  69. raise CapError(message)
  70. class GeneratorCapClient(BaseCapClient):
  71. @gen.coroutine
  72. def capitalize(self, request_data):
  73. logging.debug("capitalize")
  74. stream = IOStream(socket.socket())
  75. logging.debug("connecting")
  76. yield stream.connect(("127.0.0.1", self.port))
  77. stream.write(utf8(request_data + "\n"))
  78. logging.debug("reading")
  79. data = yield stream.read_until(b"\n")
  80. logging.debug("returning")
  81. stream.close()
  82. raise gen.Return(self.process_response(data))
  83. class ClientTestMixin(object):
  84. def setUp(self):
  85. super(ClientTestMixin, self).setUp() # type: ignore
  86. self.server = CapServer()
  87. sock, port = bind_unused_port()
  88. self.server.add_sockets([sock])
  89. self.client = self.client_class(port=port)
  90. def tearDown(self):
  91. self.server.stop()
  92. super(ClientTestMixin, self).tearDown() # type: ignore
  93. def test_future(self):
  94. future = self.client.capitalize("hello")
  95. self.io_loop.add_future(future, self.stop)
  96. self.wait()
  97. self.assertEqual(future.result(), "HELLO")
  98. def test_future_error(self):
  99. future = self.client.capitalize("HELLO")
  100. self.io_loop.add_future(future, self.stop)
  101. self.wait()
  102. self.assertRaisesRegexp(CapError, "already capitalized", future.result)
  103. def test_generator(self):
  104. @gen.coroutine
  105. def f():
  106. result = yield self.client.capitalize("hello")
  107. self.assertEqual(result, "HELLO")
  108. self.io_loop.run_sync(f)
  109. def test_generator_error(self):
  110. @gen.coroutine
  111. def f():
  112. with self.assertRaisesRegexp(CapError, "already capitalized"):
  113. yield self.client.capitalize("HELLO")
  114. self.io_loop.run_sync(f)
  115. class GeneratorClientTest(ClientTestMixin, AsyncTestCase):
  116. client_class = GeneratorCapClient
  117. class RunOnExecutorTest(AsyncTestCase):
  118. @gen_test
  119. def test_no_calling(self):
  120. class Object(object):
  121. def __init__(self):
  122. self.executor = futures.thread.ThreadPoolExecutor(1)
  123. @run_on_executor
  124. def f(self):
  125. return 42
  126. o = Object()
  127. answer = yield o.f()
  128. self.assertEqual(answer, 42)
  129. @gen_test
  130. def test_call_with_no_args(self):
  131. class Object(object):
  132. def __init__(self):
  133. self.executor = futures.thread.ThreadPoolExecutor(1)
  134. @run_on_executor()
  135. def f(self):
  136. return 42
  137. o = Object()
  138. answer = yield o.f()
  139. self.assertEqual(answer, 42)
  140. @gen_test
  141. def test_call_with_executor(self):
  142. class Object(object):
  143. def __init__(self):
  144. self.__executor = futures.thread.ThreadPoolExecutor(1)
  145. @run_on_executor(executor="_Object__executor")
  146. def f(self):
  147. return 42
  148. o = Object()
  149. answer = yield o.f()
  150. self.assertEqual(answer, 42)
  151. @gen_test
  152. def test_async_await(self):
  153. class Object(object):
  154. def __init__(self):
  155. self.executor = futures.thread.ThreadPoolExecutor(1)
  156. @run_on_executor()
  157. def f(self):
  158. return 42
  159. o = Object()
  160. async def f():
  161. answer = await o.f()
  162. return answer
  163. result = yield f()
  164. self.assertEqual(result, 42)
  165. if __name__ == "__main__":
  166. unittest.main()