tcpclient_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. #
  2. # Copyright 2014 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. from contextlib import closing
  16. import os
  17. import socket
  18. import unittest
  19. from tornado.concurrent import Future
  20. from tornado.netutil import bind_sockets, Resolver
  21. from tornado.queues import Queue
  22. from tornado.tcpclient import TCPClient, _Connector
  23. from tornado.tcpserver import TCPServer
  24. from tornado.testing import AsyncTestCase, gen_test
  25. from tornado.test.util import skipIfNoIPv6, refusing_port, skipIfNonUnix
  26. from tornado.gen import TimeoutError
  27. import typing
  28. if typing.TYPE_CHECKING:
  29. from tornado.iostream import IOStream # noqa: F401
  30. from typing import List, Dict, Tuple # noqa: F401
  31. # Fake address families for testing. Used in place of AF_INET
  32. # and AF_INET6 because some installations do not have AF_INET6.
  33. AF1, AF2 = 1, 2
  34. class TestTCPServer(TCPServer):
  35. def __init__(self, family):
  36. super(TestTCPServer, self).__init__()
  37. self.streams = [] # type: List[IOStream]
  38. self.queue = Queue() # type: Queue[IOStream]
  39. sockets = bind_sockets(0, "localhost", family)
  40. self.add_sockets(sockets)
  41. self.port = sockets[0].getsockname()[1]
  42. def handle_stream(self, stream, address):
  43. self.streams.append(stream)
  44. self.queue.put(stream)
  45. def stop(self):
  46. super(TestTCPServer, self).stop()
  47. for stream in self.streams:
  48. stream.close()
  49. class TCPClientTest(AsyncTestCase):
  50. def setUp(self):
  51. super(TCPClientTest, self).setUp()
  52. self.server = None
  53. self.client = TCPClient()
  54. def start_server(self, family):
  55. if family == socket.AF_UNSPEC and "TRAVIS" in os.environ:
  56. self.skipTest("dual-stack servers often have port conflicts on travis")
  57. self.server = TestTCPServer(family)
  58. return self.server.port
  59. def stop_server(self):
  60. if self.server is not None:
  61. self.server.stop()
  62. self.server = None
  63. def tearDown(self):
  64. self.client.close()
  65. self.stop_server()
  66. super(TCPClientTest, self).tearDown()
  67. def skipIfLocalhostV4(self):
  68. # The port used here doesn't matter, but some systems require it
  69. # to be non-zero if we do not also pass AI_PASSIVE.
  70. addrinfo = self.io_loop.run_sync(lambda: Resolver().resolve("localhost", 80))
  71. families = set(addr[0] for addr in addrinfo)
  72. if socket.AF_INET6 not in families:
  73. self.skipTest("localhost does not resolve to ipv6")
  74. @gen_test
  75. def do_test_connect(self, family, host, source_ip=None, source_port=None):
  76. port = self.start_server(family)
  77. stream = yield self.client.connect(
  78. host, port, source_ip=source_ip, source_port=source_port
  79. )
  80. server_stream = yield self.server.queue.get()
  81. with closing(stream):
  82. stream.write(b"hello")
  83. data = yield server_stream.read_bytes(5)
  84. self.assertEqual(data, b"hello")
  85. def test_connect_ipv4_ipv4(self):
  86. self.do_test_connect(socket.AF_INET, "127.0.0.1")
  87. def test_connect_ipv4_dual(self):
  88. self.do_test_connect(socket.AF_INET, "localhost")
  89. @skipIfNoIPv6
  90. def test_connect_ipv6_ipv6(self):
  91. self.skipIfLocalhostV4()
  92. self.do_test_connect(socket.AF_INET6, "::1")
  93. @skipIfNoIPv6
  94. def test_connect_ipv6_dual(self):
  95. self.skipIfLocalhostV4()
  96. if Resolver.configured_class().__name__.endswith("TwistedResolver"):
  97. self.skipTest("TwistedResolver does not support multiple addresses")
  98. self.do_test_connect(socket.AF_INET6, "localhost")
  99. def test_connect_unspec_ipv4(self):
  100. self.do_test_connect(socket.AF_UNSPEC, "127.0.0.1")
  101. @skipIfNoIPv6
  102. def test_connect_unspec_ipv6(self):
  103. self.skipIfLocalhostV4()
  104. self.do_test_connect(socket.AF_UNSPEC, "::1")
  105. def test_connect_unspec_dual(self):
  106. self.do_test_connect(socket.AF_UNSPEC, "localhost")
  107. @gen_test
  108. def test_refused_ipv4(self):
  109. cleanup_func, port = refusing_port()
  110. self.addCleanup(cleanup_func)
  111. with self.assertRaises(IOError):
  112. yield self.client.connect("127.0.0.1", port)
  113. def test_source_ip_fail(self):
  114. """
  115. Fail when trying to use the source IP Address '8.8.8.8'.
  116. """
  117. self.assertRaises(
  118. socket.error,
  119. self.do_test_connect,
  120. socket.AF_INET,
  121. "127.0.0.1",
  122. source_ip="8.8.8.8",
  123. )
  124. def test_source_ip_success(self):
  125. """
  126. Success when trying to use the source IP Address '127.0.0.1'
  127. """
  128. self.do_test_connect(socket.AF_INET, "127.0.0.1", source_ip="127.0.0.1")
  129. @skipIfNonUnix
  130. def test_source_port_fail(self):
  131. """
  132. Fail when trying to use source port 1.
  133. """
  134. self.assertRaises(
  135. socket.error,
  136. self.do_test_connect,
  137. socket.AF_INET,
  138. "127.0.0.1",
  139. source_port=1,
  140. )
  141. @gen_test
  142. def test_connect_timeout(self):
  143. timeout = 0.05
  144. class TimeoutResolver(Resolver):
  145. def resolve(self, *args, **kwargs):
  146. return Future() # never completes
  147. with self.assertRaises(TimeoutError):
  148. yield TCPClient(resolver=TimeoutResolver()).connect(
  149. "1.2.3.4", 12345, timeout=timeout
  150. )
  151. class TestConnectorSplit(unittest.TestCase):
  152. def test_one_family(self):
  153. # These addresses aren't in the right format, but split doesn't care.
  154. primary, secondary = _Connector.split([(AF1, "a"), (AF1, "b")])
  155. self.assertEqual(primary, [(AF1, "a"), (AF1, "b")])
  156. self.assertEqual(secondary, [])
  157. def test_mixed(self):
  158. primary, secondary = _Connector.split(
  159. [(AF1, "a"), (AF2, "b"), (AF1, "c"), (AF2, "d")]
  160. )
  161. self.assertEqual(primary, [(AF1, "a"), (AF1, "c")])
  162. self.assertEqual(secondary, [(AF2, "b"), (AF2, "d")])
  163. class ConnectorTest(AsyncTestCase):
  164. class FakeStream(object):
  165. def __init__(self):
  166. self.closed = False
  167. def close(self):
  168. self.closed = True
  169. def setUp(self):
  170. super(ConnectorTest, self).setUp()
  171. self.connect_futures = (
  172. {}
  173. ) # type: Dict[Tuple[int, Tuple], Future[ConnectorTest.FakeStream]]
  174. self.streams = {} # type: Dict[Tuple, ConnectorTest.FakeStream]
  175. self.addrinfo = [(AF1, "a"), (AF1, "b"), (AF2, "c"), (AF2, "d")]
  176. def tearDown(self):
  177. # Unless explicitly checked (and popped) in the test, we shouldn't
  178. # be closing any streams
  179. for stream in self.streams.values():
  180. self.assertFalse(stream.closed)
  181. super(ConnectorTest, self).tearDown()
  182. def create_stream(self, af, addr):
  183. stream = ConnectorTest.FakeStream()
  184. self.streams[addr] = stream
  185. future = Future() # type: Future[ConnectorTest.FakeStream]
  186. self.connect_futures[(af, addr)] = future
  187. return stream, future
  188. def assert_pending(self, *keys):
  189. self.assertEqual(sorted(self.connect_futures.keys()), sorted(keys))
  190. def resolve_connect(self, af, addr, success):
  191. future = self.connect_futures.pop((af, addr))
  192. if success:
  193. future.set_result(self.streams[addr])
  194. else:
  195. self.streams.pop(addr)
  196. future.set_exception(IOError())
  197. # Run the loop to allow callbacks to be run.
  198. self.io_loop.add_callback(self.stop)
  199. self.wait()
  200. def assert_connector_streams_closed(self, conn):
  201. for stream in conn.streams:
  202. self.assertTrue(stream.closed)
  203. def start_connect(self, addrinfo):
  204. conn = _Connector(addrinfo, self.create_stream)
  205. # Give it a huge timeout; we'll trigger timeouts manually.
  206. future = conn.start(3600, connect_timeout=self.io_loop.time() + 3600)
  207. return conn, future
  208. def test_immediate_success(self):
  209. conn, future = self.start_connect(self.addrinfo)
  210. self.assertEqual(list(self.connect_futures.keys()), [(AF1, "a")])
  211. self.resolve_connect(AF1, "a", True)
  212. self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
  213. def test_immediate_failure(self):
  214. # Fail with just one address.
  215. conn, future = self.start_connect([(AF1, "a")])
  216. self.assert_pending((AF1, "a"))
  217. self.resolve_connect(AF1, "a", False)
  218. self.assertRaises(IOError, future.result)
  219. def test_one_family_second_try(self):
  220. conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
  221. self.assert_pending((AF1, "a"))
  222. self.resolve_connect(AF1, "a", False)
  223. self.assert_pending((AF1, "b"))
  224. self.resolve_connect(AF1, "b", True)
  225. self.assertEqual(future.result(), (AF1, "b", self.streams["b"]))
  226. def test_one_family_second_try_failure(self):
  227. conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
  228. self.assert_pending((AF1, "a"))
  229. self.resolve_connect(AF1, "a", False)
  230. self.assert_pending((AF1, "b"))
  231. self.resolve_connect(AF1, "b", False)
  232. self.assertRaises(IOError, future.result)
  233. def test_one_family_second_try_timeout(self):
  234. conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
  235. self.assert_pending((AF1, "a"))
  236. # trigger the timeout while the first lookup is pending;
  237. # nothing happens.
  238. conn.on_timeout()
  239. self.assert_pending((AF1, "a"))
  240. self.resolve_connect(AF1, "a", False)
  241. self.assert_pending((AF1, "b"))
  242. self.resolve_connect(AF1, "b", True)
  243. self.assertEqual(future.result(), (AF1, "b", self.streams["b"]))
  244. def test_two_families_immediate_failure(self):
  245. conn, future = self.start_connect(self.addrinfo)
  246. self.assert_pending((AF1, "a"))
  247. self.resolve_connect(AF1, "a", False)
  248. self.assert_pending((AF1, "b"), (AF2, "c"))
  249. self.resolve_connect(AF1, "b", False)
  250. self.resolve_connect(AF2, "c", True)
  251. self.assertEqual(future.result(), (AF2, "c", self.streams["c"]))
  252. def test_two_families_timeout(self):
  253. conn, future = self.start_connect(self.addrinfo)
  254. self.assert_pending((AF1, "a"))
  255. conn.on_timeout()
  256. self.assert_pending((AF1, "a"), (AF2, "c"))
  257. self.resolve_connect(AF2, "c", True)
  258. self.assertEqual(future.result(), (AF2, "c", self.streams["c"]))
  259. # resolving 'a' after the connection has completed doesn't start 'b'
  260. self.resolve_connect(AF1, "a", False)
  261. self.assert_pending()
  262. def test_success_after_timeout(self):
  263. conn, future = self.start_connect(self.addrinfo)
  264. self.assert_pending((AF1, "a"))
  265. conn.on_timeout()
  266. self.assert_pending((AF1, "a"), (AF2, "c"))
  267. self.resolve_connect(AF1, "a", True)
  268. self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
  269. # resolving 'c' after completion closes the connection.
  270. self.resolve_connect(AF2, "c", True)
  271. self.assertTrue(self.streams.pop("c").closed)
  272. def test_all_fail(self):
  273. conn, future = self.start_connect(self.addrinfo)
  274. self.assert_pending((AF1, "a"))
  275. conn.on_timeout()
  276. self.assert_pending((AF1, "a"), (AF2, "c"))
  277. self.resolve_connect(AF2, "c", False)
  278. self.assert_pending((AF1, "a"), (AF2, "d"))
  279. self.resolve_connect(AF2, "d", False)
  280. # one queue is now empty
  281. self.assert_pending((AF1, "a"))
  282. self.resolve_connect(AF1, "a", False)
  283. self.assert_pending((AF1, "b"))
  284. self.assertFalse(future.done())
  285. self.resolve_connect(AF1, "b", False)
  286. self.assertRaises(IOError, future.result)
  287. def test_one_family_timeout_after_connect_timeout(self):
  288. conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
  289. self.assert_pending((AF1, "a"))
  290. conn.on_connect_timeout()
  291. # the connector will close all streams on connect timeout, we
  292. # should explicitly pop the connect_future.
  293. self.connect_futures.pop((AF1, "a"))
  294. self.assertTrue(self.streams.pop("a").closed)
  295. conn.on_timeout()
  296. # if the future is set with TimeoutError, we will not iterate next
  297. # possible address.
  298. self.assert_pending()
  299. self.assertEqual(len(conn.streams), 1)
  300. self.assert_connector_streams_closed(conn)
  301. self.assertRaises(TimeoutError, future.result)
  302. def test_one_family_success_before_connect_timeout(self):
  303. conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
  304. self.assert_pending((AF1, "a"))
  305. self.resolve_connect(AF1, "a", True)
  306. conn.on_connect_timeout()
  307. self.assert_pending()
  308. self.assertEqual(self.streams["a"].closed, False)
  309. # success stream will be pop
  310. self.assertEqual(len(conn.streams), 0)
  311. # streams in connector should be closed after connect timeout
  312. self.assert_connector_streams_closed(conn)
  313. self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
  314. def test_one_family_second_try_after_connect_timeout(self):
  315. conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
  316. self.assert_pending((AF1, "a"))
  317. self.resolve_connect(AF1, "a", False)
  318. self.assert_pending((AF1, "b"))
  319. conn.on_connect_timeout()
  320. self.connect_futures.pop((AF1, "b"))
  321. self.assertTrue(self.streams.pop("b").closed)
  322. self.assert_pending()
  323. self.assertEqual(len(conn.streams), 2)
  324. self.assert_connector_streams_closed(conn)
  325. self.assertRaises(TimeoutError, future.result)
  326. def test_one_family_second_try_failure_before_connect_timeout(self):
  327. conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
  328. self.assert_pending((AF1, "a"))
  329. self.resolve_connect(AF1, "a", False)
  330. self.assert_pending((AF1, "b"))
  331. self.resolve_connect(AF1, "b", False)
  332. conn.on_connect_timeout()
  333. self.assert_pending()
  334. self.assertEqual(len(conn.streams), 2)
  335. self.assert_connector_streams_closed(conn)
  336. self.assertRaises(IOError, future.result)
  337. def test_two_family_timeout_before_connect_timeout(self):
  338. conn, future = self.start_connect(self.addrinfo)
  339. self.assert_pending((AF1, "a"))
  340. conn.on_timeout()
  341. self.assert_pending((AF1, "a"), (AF2, "c"))
  342. conn.on_connect_timeout()
  343. self.connect_futures.pop((AF1, "a"))
  344. self.assertTrue(self.streams.pop("a").closed)
  345. self.connect_futures.pop((AF2, "c"))
  346. self.assertTrue(self.streams.pop("c").closed)
  347. self.assert_pending()
  348. self.assertEqual(len(conn.streams), 2)
  349. self.assert_connector_streams_closed(conn)
  350. self.assertRaises(TimeoutError, future.result)
  351. def test_two_family_success_after_timeout(self):
  352. conn, future = self.start_connect(self.addrinfo)
  353. self.assert_pending((AF1, "a"))
  354. conn.on_timeout()
  355. self.assert_pending((AF1, "a"), (AF2, "c"))
  356. self.resolve_connect(AF1, "a", True)
  357. # if one of streams succeed, connector will close all other streams
  358. self.connect_futures.pop((AF2, "c"))
  359. self.assertTrue(self.streams.pop("c").closed)
  360. self.assert_pending()
  361. self.assertEqual(len(conn.streams), 1)
  362. self.assert_connector_streams_closed(conn)
  363. self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
  364. def test_two_family_timeout_after_connect_timeout(self):
  365. conn, future = self.start_connect(self.addrinfo)
  366. self.assert_pending((AF1, "a"))
  367. conn.on_connect_timeout()
  368. self.connect_futures.pop((AF1, "a"))
  369. self.assertTrue(self.streams.pop("a").closed)
  370. self.assert_pending()
  371. conn.on_timeout()
  372. # if the future is set with TimeoutError, connector will not
  373. # trigger secondary address.
  374. self.assert_pending()
  375. self.assertEqual(len(conn.streams), 1)
  376. self.assert_connector_streams_closed(conn)
  377. self.assertRaises(TimeoutError, future.result)