| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- #
- # Copyright 2014 Facebook
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- from contextlib import closing
- import os
- import socket
- import unittest
- from tornado.concurrent import Future
- from tornado.netutil import bind_sockets, Resolver
- from tornado.queues import Queue
- from tornado.tcpclient import TCPClient, _Connector
- from tornado.tcpserver import TCPServer
- from tornado.testing import AsyncTestCase, gen_test
- from tornado.test.util import skipIfNoIPv6, refusing_port, skipIfNonUnix
- from tornado.gen import TimeoutError
- import typing
- if typing.TYPE_CHECKING:
- from tornado.iostream import IOStream # noqa: F401
- from typing import List, Dict, Tuple # noqa: F401
- # Fake address families for testing. Used in place of AF_INET
- # and AF_INET6 because some installations do not have AF_INET6.
- AF1, AF2 = 1, 2
- class TestTCPServer(TCPServer):
- def __init__(self, family):
- super(TestTCPServer, self).__init__()
- self.streams = [] # type: List[IOStream]
- self.queue = Queue() # type: Queue[IOStream]
- sockets = bind_sockets(0, "localhost", family)
- self.add_sockets(sockets)
- self.port = sockets[0].getsockname()[1]
- def handle_stream(self, stream, address):
- self.streams.append(stream)
- self.queue.put(stream)
- def stop(self):
- super(TestTCPServer, self).stop()
- for stream in self.streams:
- stream.close()
- class TCPClientTest(AsyncTestCase):
- def setUp(self):
- super(TCPClientTest, self).setUp()
- self.server = None
- self.client = TCPClient()
- def start_server(self, family):
- if family == socket.AF_UNSPEC and "TRAVIS" in os.environ:
- self.skipTest("dual-stack servers often have port conflicts on travis")
- self.server = TestTCPServer(family)
- return self.server.port
- def stop_server(self):
- if self.server is not None:
- self.server.stop()
- self.server = None
- def tearDown(self):
- self.client.close()
- self.stop_server()
- super(TCPClientTest, self).tearDown()
- def skipIfLocalhostV4(self):
- # The port used here doesn't matter, but some systems require it
- # to be non-zero if we do not also pass AI_PASSIVE.
- addrinfo = self.io_loop.run_sync(lambda: Resolver().resolve("localhost", 80))
- families = set(addr[0] for addr in addrinfo)
- if socket.AF_INET6 not in families:
- self.skipTest("localhost does not resolve to ipv6")
- @gen_test
- def do_test_connect(self, family, host, source_ip=None, source_port=None):
- port = self.start_server(family)
- stream = yield self.client.connect(
- host, port, source_ip=source_ip, source_port=source_port
- )
- server_stream = yield self.server.queue.get()
- with closing(stream):
- stream.write(b"hello")
- data = yield server_stream.read_bytes(5)
- self.assertEqual(data, b"hello")
- def test_connect_ipv4_ipv4(self):
- self.do_test_connect(socket.AF_INET, "127.0.0.1")
- def test_connect_ipv4_dual(self):
- self.do_test_connect(socket.AF_INET, "localhost")
- @skipIfNoIPv6
- def test_connect_ipv6_ipv6(self):
- self.skipIfLocalhostV4()
- self.do_test_connect(socket.AF_INET6, "::1")
- @skipIfNoIPv6
- def test_connect_ipv6_dual(self):
- self.skipIfLocalhostV4()
- if Resolver.configured_class().__name__.endswith("TwistedResolver"):
- self.skipTest("TwistedResolver does not support multiple addresses")
- self.do_test_connect(socket.AF_INET6, "localhost")
- def test_connect_unspec_ipv4(self):
- self.do_test_connect(socket.AF_UNSPEC, "127.0.0.1")
- @skipIfNoIPv6
- def test_connect_unspec_ipv6(self):
- self.skipIfLocalhostV4()
- self.do_test_connect(socket.AF_UNSPEC, "::1")
- def test_connect_unspec_dual(self):
- self.do_test_connect(socket.AF_UNSPEC, "localhost")
- @gen_test
- def test_refused_ipv4(self):
- cleanup_func, port = refusing_port()
- self.addCleanup(cleanup_func)
- with self.assertRaises(IOError):
- yield self.client.connect("127.0.0.1", port)
- def test_source_ip_fail(self):
- """
- Fail when trying to use the source IP Address '8.8.8.8'.
- """
- self.assertRaises(
- socket.error,
- self.do_test_connect,
- socket.AF_INET,
- "127.0.0.1",
- source_ip="8.8.8.8",
- )
- def test_source_ip_success(self):
- """
- Success when trying to use the source IP Address '127.0.0.1'
- """
- self.do_test_connect(socket.AF_INET, "127.0.0.1", source_ip="127.0.0.1")
- @skipIfNonUnix
- def test_source_port_fail(self):
- """
- Fail when trying to use source port 1.
- """
- self.assertRaises(
- socket.error,
- self.do_test_connect,
- socket.AF_INET,
- "127.0.0.1",
- source_port=1,
- )
- @gen_test
- def test_connect_timeout(self):
- timeout = 0.05
- class TimeoutResolver(Resolver):
- def resolve(self, *args, **kwargs):
- return Future() # never completes
- with self.assertRaises(TimeoutError):
- yield TCPClient(resolver=TimeoutResolver()).connect(
- "1.2.3.4", 12345, timeout=timeout
- )
- class TestConnectorSplit(unittest.TestCase):
- def test_one_family(self):
- # These addresses aren't in the right format, but split doesn't care.
- primary, secondary = _Connector.split([(AF1, "a"), (AF1, "b")])
- self.assertEqual(primary, [(AF1, "a"), (AF1, "b")])
- self.assertEqual(secondary, [])
- def test_mixed(self):
- primary, secondary = _Connector.split(
- [(AF1, "a"), (AF2, "b"), (AF1, "c"), (AF2, "d")]
- )
- self.assertEqual(primary, [(AF1, "a"), (AF1, "c")])
- self.assertEqual(secondary, [(AF2, "b"), (AF2, "d")])
- class ConnectorTest(AsyncTestCase):
- class FakeStream(object):
- def __init__(self):
- self.closed = False
- def close(self):
- self.closed = True
- def setUp(self):
- super(ConnectorTest, self).setUp()
- self.connect_futures = (
- {}
- ) # type: Dict[Tuple[int, Tuple], Future[ConnectorTest.FakeStream]]
- self.streams = {} # type: Dict[Tuple, ConnectorTest.FakeStream]
- self.addrinfo = [(AF1, "a"), (AF1, "b"), (AF2, "c"), (AF2, "d")]
- def tearDown(self):
- # Unless explicitly checked (and popped) in the test, we shouldn't
- # be closing any streams
- for stream in self.streams.values():
- self.assertFalse(stream.closed)
- super(ConnectorTest, self).tearDown()
- def create_stream(self, af, addr):
- stream = ConnectorTest.FakeStream()
- self.streams[addr] = stream
- future = Future() # type: Future[ConnectorTest.FakeStream]
- self.connect_futures[(af, addr)] = future
- return stream, future
- def assert_pending(self, *keys):
- self.assertEqual(sorted(self.connect_futures.keys()), sorted(keys))
- def resolve_connect(self, af, addr, success):
- future = self.connect_futures.pop((af, addr))
- if success:
- future.set_result(self.streams[addr])
- else:
- self.streams.pop(addr)
- future.set_exception(IOError())
- # Run the loop to allow callbacks to be run.
- self.io_loop.add_callback(self.stop)
- self.wait()
- def assert_connector_streams_closed(self, conn):
- for stream in conn.streams:
- self.assertTrue(stream.closed)
- def start_connect(self, addrinfo):
- conn = _Connector(addrinfo, self.create_stream)
- # Give it a huge timeout; we'll trigger timeouts manually.
- future = conn.start(3600, connect_timeout=self.io_loop.time() + 3600)
- return conn, future
- def test_immediate_success(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assertEqual(list(self.connect_futures.keys()), [(AF1, "a")])
- self.resolve_connect(AF1, "a", True)
- self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
- def test_immediate_failure(self):
- # Fail with just one address.
- conn, future = self.start_connect([(AF1, "a")])
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assertRaises(IOError, future.result)
- def test_one_family_second_try(self):
- conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assert_pending((AF1, "b"))
- self.resolve_connect(AF1, "b", True)
- self.assertEqual(future.result(), (AF1, "b", self.streams["b"]))
- def test_one_family_second_try_failure(self):
- conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assert_pending((AF1, "b"))
- self.resolve_connect(AF1, "b", False)
- self.assertRaises(IOError, future.result)
- def test_one_family_second_try_timeout(self):
- conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
- self.assert_pending((AF1, "a"))
- # trigger the timeout while the first lookup is pending;
- # nothing happens.
- conn.on_timeout()
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assert_pending((AF1, "b"))
- self.resolve_connect(AF1, "b", True)
- self.assertEqual(future.result(), (AF1, "b", self.streams["b"]))
- def test_two_families_immediate_failure(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assert_pending((AF1, "b"), (AF2, "c"))
- self.resolve_connect(AF1, "b", False)
- self.resolve_connect(AF2, "c", True)
- self.assertEqual(future.result(), (AF2, "c", self.streams["c"]))
- def test_two_families_timeout(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assert_pending((AF1, "a"))
- conn.on_timeout()
- self.assert_pending((AF1, "a"), (AF2, "c"))
- self.resolve_connect(AF2, "c", True)
- self.assertEqual(future.result(), (AF2, "c", self.streams["c"]))
- # resolving 'a' after the connection has completed doesn't start 'b'
- self.resolve_connect(AF1, "a", False)
- self.assert_pending()
- def test_success_after_timeout(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assert_pending((AF1, "a"))
- conn.on_timeout()
- self.assert_pending((AF1, "a"), (AF2, "c"))
- self.resolve_connect(AF1, "a", True)
- self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
- # resolving 'c' after completion closes the connection.
- self.resolve_connect(AF2, "c", True)
- self.assertTrue(self.streams.pop("c").closed)
- def test_all_fail(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assert_pending((AF1, "a"))
- conn.on_timeout()
- self.assert_pending((AF1, "a"), (AF2, "c"))
- self.resolve_connect(AF2, "c", False)
- self.assert_pending((AF1, "a"), (AF2, "d"))
- self.resolve_connect(AF2, "d", False)
- # one queue is now empty
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assert_pending((AF1, "b"))
- self.assertFalse(future.done())
- self.resolve_connect(AF1, "b", False)
- self.assertRaises(IOError, future.result)
- def test_one_family_timeout_after_connect_timeout(self):
- conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
- self.assert_pending((AF1, "a"))
- conn.on_connect_timeout()
- # the connector will close all streams on connect timeout, we
- # should explicitly pop the connect_future.
- self.connect_futures.pop((AF1, "a"))
- self.assertTrue(self.streams.pop("a").closed)
- conn.on_timeout()
- # if the future is set with TimeoutError, we will not iterate next
- # possible address.
- self.assert_pending()
- self.assertEqual(len(conn.streams), 1)
- self.assert_connector_streams_closed(conn)
- self.assertRaises(TimeoutError, future.result)
- def test_one_family_success_before_connect_timeout(self):
- conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", True)
- conn.on_connect_timeout()
- self.assert_pending()
- self.assertEqual(self.streams["a"].closed, False)
- # success stream will be pop
- self.assertEqual(len(conn.streams), 0)
- # streams in connector should be closed after connect timeout
- self.assert_connector_streams_closed(conn)
- self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
- def test_one_family_second_try_after_connect_timeout(self):
- conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assert_pending((AF1, "b"))
- conn.on_connect_timeout()
- self.connect_futures.pop((AF1, "b"))
- self.assertTrue(self.streams.pop("b").closed)
- self.assert_pending()
- self.assertEqual(len(conn.streams), 2)
- self.assert_connector_streams_closed(conn)
- self.assertRaises(TimeoutError, future.result)
- def test_one_family_second_try_failure_before_connect_timeout(self):
- conn, future = self.start_connect([(AF1, "a"), (AF1, "b")])
- self.assert_pending((AF1, "a"))
- self.resolve_connect(AF1, "a", False)
- self.assert_pending((AF1, "b"))
- self.resolve_connect(AF1, "b", False)
- conn.on_connect_timeout()
- self.assert_pending()
- self.assertEqual(len(conn.streams), 2)
- self.assert_connector_streams_closed(conn)
- self.assertRaises(IOError, future.result)
- def test_two_family_timeout_before_connect_timeout(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assert_pending((AF1, "a"))
- conn.on_timeout()
- self.assert_pending((AF1, "a"), (AF2, "c"))
- conn.on_connect_timeout()
- self.connect_futures.pop((AF1, "a"))
- self.assertTrue(self.streams.pop("a").closed)
- self.connect_futures.pop((AF2, "c"))
- self.assertTrue(self.streams.pop("c").closed)
- self.assert_pending()
- self.assertEqual(len(conn.streams), 2)
- self.assert_connector_streams_closed(conn)
- self.assertRaises(TimeoutError, future.result)
- def test_two_family_success_after_timeout(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assert_pending((AF1, "a"))
- conn.on_timeout()
- self.assert_pending((AF1, "a"), (AF2, "c"))
- self.resolve_connect(AF1, "a", True)
- # if one of streams succeed, connector will close all other streams
- self.connect_futures.pop((AF2, "c"))
- self.assertTrue(self.streams.pop("c").closed)
- self.assert_pending()
- self.assertEqual(len(conn.streams), 1)
- self.assert_connector_streams_closed(conn)
- self.assertEqual(future.result(), (AF1, "a", self.streams["a"]))
- def test_two_family_timeout_after_connect_timeout(self):
- conn, future = self.start_connect(self.addrinfo)
- self.assert_pending((AF1, "a"))
- conn.on_connect_timeout()
- self.connect_futures.pop((AF1, "a"))
- self.assertTrue(self.streams.pop("a").closed)
- self.assert_pending()
- conn.on_timeout()
- # if the future is set with TimeoutError, connector will not
- # trigger secondary address.
- self.assert_pending()
- self.assertEqual(len(conn.streams), 1)
- self.assert_connector_streams_closed(conn)
- self.assertRaises(TimeoutError, future.result)
|