tcpserver_test.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import socket
  2. import subprocess
  3. import sys
  4. import textwrap
  5. import unittest
  6. from tornado.escape import utf8, to_unicode
  7. from tornado import gen
  8. from tornado.iostream import IOStream
  9. from tornado.log import app_log
  10. from tornado.tcpserver import TCPServer
  11. from tornado.test.util import skipIfNonUnix
  12. from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
  13. class TCPServerTest(AsyncTestCase):
  14. @gen_test
  15. def test_handle_stream_coroutine_logging(self):
  16. # handle_stream may be a coroutine and any exception in its
  17. # Future will be logged.
  18. class TestServer(TCPServer):
  19. @gen.coroutine
  20. def handle_stream(self, stream, address):
  21. yield stream.read_bytes(len(b"hello"))
  22. stream.close()
  23. 1 / 0
  24. server = client = None
  25. try:
  26. sock, port = bind_unused_port()
  27. server = TestServer()
  28. server.add_socket(sock)
  29. client = IOStream(socket.socket())
  30. with ExpectLog(app_log, "Exception in callback"):
  31. yield client.connect(("localhost", port))
  32. yield client.write(b"hello")
  33. yield client.read_until_close()
  34. yield gen.moment
  35. finally:
  36. if server is not None:
  37. server.stop()
  38. if client is not None:
  39. client.close()
  40. @gen_test
  41. def test_handle_stream_native_coroutine(self):
  42. # handle_stream may be a native coroutine.
  43. class TestServer(TCPServer):
  44. async def handle_stream(self, stream, address):
  45. stream.write(b"data")
  46. stream.close()
  47. sock, port = bind_unused_port()
  48. server = TestServer()
  49. server.add_socket(sock)
  50. client = IOStream(socket.socket())
  51. yield client.connect(("localhost", port))
  52. result = yield client.read_until_close()
  53. self.assertEqual(result, b"data")
  54. server.stop()
  55. client.close()
  56. def test_stop_twice(self):
  57. sock, port = bind_unused_port()
  58. server = TCPServer()
  59. server.add_socket(sock)
  60. server.stop()
  61. server.stop()
  62. @gen_test
  63. def test_stop_in_callback(self):
  64. # Issue #2069: calling server.stop() in a loop callback should not
  65. # raise EBADF when the loop handles other server connection
  66. # requests in the same loop iteration
  67. class TestServer(TCPServer):
  68. @gen.coroutine
  69. def handle_stream(self, stream, address):
  70. server.stop() # type: ignore
  71. yield stream.read_until_close()
  72. sock, port = bind_unused_port()
  73. server = TestServer()
  74. server.add_socket(sock)
  75. server_addr = ("localhost", port)
  76. N = 40
  77. clients = [IOStream(socket.socket()) for i in range(N)]
  78. connected_clients = []
  79. @gen.coroutine
  80. def connect(c):
  81. try:
  82. yield c.connect(server_addr)
  83. except EnvironmentError:
  84. pass
  85. else:
  86. connected_clients.append(c)
  87. yield [connect(c) for c in clients]
  88. self.assertGreater(len(connected_clients), 0, "all clients failed connecting")
  89. try:
  90. if len(connected_clients) == N:
  91. # Ideally we'd make the test deterministic, but we're testing
  92. # for a race condition in combination with the system's TCP stack...
  93. self.skipTest(
  94. "at least one client should fail connecting "
  95. "for the test to be meaningful"
  96. )
  97. finally:
  98. for c in connected_clients:
  99. c.close()
  100. # Here tearDown() would re-raise the EBADF encountered in the IO loop
  101. @skipIfNonUnix
  102. class TestMultiprocess(unittest.TestCase):
  103. # These tests verify that the two multiprocess examples from the
  104. # TCPServer docs work. Both tests start a server with three worker
  105. # processes, each of which prints its task id to stdout (a single
  106. # byte, so we don't have to worry about atomicity of the shared
  107. # stdout stream) and then exits.
  108. def run_subproc(self, code):
  109. proc = subprocess.Popen(
  110. sys.executable, stdin=subprocess.PIPE, stdout=subprocess.PIPE
  111. )
  112. proc.stdin.write(utf8(code))
  113. proc.stdin.close()
  114. proc.wait()
  115. stdout = proc.stdout.read()
  116. proc.stdout.close()
  117. if proc.returncode != 0:
  118. raise RuntimeError(
  119. "Process returned %d. stdout=%r" % (proc.returncode, stdout)
  120. )
  121. return to_unicode(stdout)
  122. def test_single(self):
  123. # As a sanity check, run the single-process version through this test
  124. # harness too.
  125. code = textwrap.dedent(
  126. """
  127. from tornado.ioloop import IOLoop
  128. from tornado.tcpserver import TCPServer
  129. server = TCPServer()
  130. server.listen(0, address='127.0.0.1')
  131. IOLoop.current().run_sync(lambda: None)
  132. print('012', end='')
  133. """
  134. )
  135. out = self.run_subproc(code)
  136. self.assertEqual("".join(sorted(out)), "012")
  137. def test_simple(self):
  138. code = textwrap.dedent(
  139. """
  140. from tornado.ioloop import IOLoop
  141. from tornado.process import task_id
  142. from tornado.tcpserver import TCPServer
  143. server = TCPServer()
  144. server.bind(0, address='127.0.0.1')
  145. server.start(3)
  146. IOLoop.current().run_sync(lambda: None)
  147. print(task_id(), end='')
  148. """
  149. )
  150. out = self.run_subproc(code)
  151. self.assertEqual("".join(sorted(out)), "012")
  152. def test_advanced(self):
  153. code = textwrap.dedent(
  154. """
  155. from tornado.ioloop import IOLoop
  156. from tornado.netutil import bind_sockets
  157. from tornado.process import fork_processes, task_id
  158. from tornado.ioloop import IOLoop
  159. from tornado.tcpserver import TCPServer
  160. sockets = bind_sockets(0, address='127.0.0.1')
  161. fork_processes(3)
  162. server = TCPServer()
  163. server.add_sockets(sockets)
  164. IOLoop.current().run_sync(lambda: None)
  165. print(task_id(), end='')
  166. """
  167. )
  168. out = self.run_subproc(code)
  169. self.assertEqual("".join(sorted(out)), "012")