iostream_test.py 44 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262
  1. from tornado.concurrent import Future
  2. from tornado import gen
  3. from tornado import netutil
  4. from tornado.iostream import (
  5. IOStream,
  6. SSLIOStream,
  7. PipeIOStream,
  8. StreamClosedError,
  9. _StreamBuffer,
  10. )
  11. from tornado.httputil import HTTPHeaders
  12. from tornado.locks import Condition, Event
  13. from tornado.log import gen_log
  14. from tornado.netutil import ssl_wrap_socket
  15. from tornado.tcpserver import TCPServer
  16. from tornado.testing import (
  17. AsyncHTTPTestCase,
  18. AsyncHTTPSTestCase,
  19. AsyncTestCase,
  20. bind_unused_port,
  21. ExpectLog,
  22. gen_test,
  23. )
  24. from tornado.test.util import skipIfNonUnix, refusing_port, skipPypy3V58
  25. from tornado.web import RequestHandler, Application
  26. import asyncio
  27. import errno
  28. import hashlib
  29. import os
  30. import platform
  31. import random
  32. import socket
  33. import ssl
  34. import sys
  35. import typing
  36. from unittest import mock
  37. import unittest
  38. def _server_ssl_options():
  39. return dict(
  40. certfile=os.path.join(os.path.dirname(__file__), "test.crt"),
  41. keyfile=os.path.join(os.path.dirname(__file__), "test.key"),
  42. )
  43. class HelloHandler(RequestHandler):
  44. def get(self):
  45. self.write("Hello")
  46. class TestIOStreamWebMixin(object):
  47. def _make_client_iostream(self):
  48. raise NotImplementedError()
  49. def get_app(self):
  50. return Application([("/", HelloHandler)])
  51. def test_connection_closed(self):
  52. # When a server sends a response and then closes the connection,
  53. # the client must be allowed to read the data before the IOStream
  54. # closes itself. Epoll reports closed connections with a separate
  55. # EPOLLRDHUP event delivered at the same time as the read event,
  56. # while kqueue reports them as a second read/write event with an EOF
  57. # flag.
  58. response = self.fetch("/", headers={"Connection": "close"})
  59. response.rethrow()
  60. @gen_test
  61. def test_read_until_close(self):
  62. stream = self._make_client_iostream()
  63. yield stream.connect(("127.0.0.1", self.get_http_port()))
  64. stream.write(b"GET / HTTP/1.0\r\n\r\n")
  65. data = yield stream.read_until_close()
  66. self.assertTrue(data.startswith(b"HTTP/1.1 200"))
  67. self.assertTrue(data.endswith(b"Hello"))
  68. @gen_test
  69. def test_read_zero_bytes(self):
  70. self.stream = self._make_client_iostream()
  71. yield self.stream.connect(("127.0.0.1", self.get_http_port()))
  72. self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
  73. # normal read
  74. data = yield self.stream.read_bytes(9)
  75. self.assertEqual(data, b"HTTP/1.1 ")
  76. # zero bytes
  77. data = yield self.stream.read_bytes(0)
  78. self.assertEqual(data, b"")
  79. # another normal read
  80. data = yield self.stream.read_bytes(3)
  81. self.assertEqual(data, b"200")
  82. self.stream.close()
  83. @gen_test
  84. def test_write_while_connecting(self):
  85. stream = self._make_client_iostream()
  86. connect_fut = stream.connect(("127.0.0.1", self.get_http_port()))
  87. # unlike the previous tests, try to write before the connection
  88. # is complete.
  89. write_fut = stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n")
  90. self.assertFalse(connect_fut.done())
  91. # connect will always complete before write.
  92. it = gen.WaitIterator(connect_fut, write_fut)
  93. resolved_order = []
  94. while not it.done():
  95. yield it.next()
  96. resolved_order.append(it.current_future)
  97. self.assertEqual(resolved_order, [connect_fut, write_fut])
  98. data = yield stream.read_until_close()
  99. self.assertTrue(data.endswith(b"Hello"))
  100. stream.close()
  101. @gen_test
  102. def test_future_interface(self):
  103. """Basic test of IOStream's ability to return Futures."""
  104. stream = self._make_client_iostream()
  105. connect_result = yield stream.connect(("127.0.0.1", self.get_http_port()))
  106. self.assertIs(connect_result, stream)
  107. yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
  108. first_line = yield stream.read_until(b"\r\n")
  109. self.assertEqual(first_line, b"HTTP/1.1 200 OK\r\n")
  110. # callback=None is equivalent to no callback.
  111. header_data = yield stream.read_until(b"\r\n\r\n")
  112. headers = HTTPHeaders.parse(header_data.decode("latin1"))
  113. content_length = int(headers["Content-Length"])
  114. body = yield stream.read_bytes(content_length)
  115. self.assertEqual(body, b"Hello")
  116. stream.close()
  117. @gen_test
  118. def test_future_close_while_reading(self):
  119. stream = self._make_client_iostream()
  120. yield stream.connect(("127.0.0.1", self.get_http_port()))
  121. yield stream.write(b"GET / HTTP/1.0\r\n\r\n")
  122. with self.assertRaises(StreamClosedError):
  123. yield stream.read_bytes(1024 * 1024)
  124. stream.close()
  125. @gen_test
  126. def test_future_read_until_close(self):
  127. # Ensure that the data comes through before the StreamClosedError.
  128. stream = self._make_client_iostream()
  129. yield stream.connect(("127.0.0.1", self.get_http_port()))
  130. yield stream.write(b"GET / HTTP/1.0\r\nConnection: close\r\n\r\n")
  131. yield stream.read_until(b"\r\n\r\n")
  132. body = yield stream.read_until_close()
  133. self.assertEqual(body, b"Hello")
  134. # Nothing else to read; the error comes immediately without waiting
  135. # for yield.
  136. with self.assertRaises(StreamClosedError):
  137. stream.read_bytes(1)
  138. class TestReadWriteMixin(object):
  139. # Tests where one stream reads and the other writes.
  140. # These should work for BaseIOStream implementations.
  141. def make_iostream_pair(self, **kwargs):
  142. raise NotImplementedError
  143. def iostream_pair(self, **kwargs):
  144. """Like make_iostream_pair, but called by ``async with``.
  145. In py37 this becomes simpler with contextlib.asynccontextmanager.
  146. """
  147. class IOStreamPairContext:
  148. def __init__(self, test, kwargs):
  149. self.test = test
  150. self.kwargs = kwargs
  151. async def __aenter__(self):
  152. self.pair = await self.test.make_iostream_pair(**self.kwargs)
  153. return self.pair
  154. async def __aexit__(self, typ, value, tb):
  155. for s in self.pair:
  156. s.close()
  157. return IOStreamPairContext(self, kwargs)
  158. @gen_test
  159. def test_write_zero_bytes(self):
  160. # Attempting to write zero bytes should run the callback without
  161. # going into an infinite loop.
  162. rs, ws = yield self.make_iostream_pair()
  163. yield ws.write(b"")
  164. ws.close()
  165. rs.close()
  166. @gen_test
  167. def test_future_delayed_close_callback(self):
  168. # Same as test_delayed_close_callback, but with the future interface.
  169. rs, ws = yield self.make_iostream_pair()
  170. try:
  171. ws.write(b"12")
  172. chunks = []
  173. chunks.append((yield rs.read_bytes(1)))
  174. ws.close()
  175. chunks.append((yield rs.read_bytes(1)))
  176. self.assertEqual(chunks, [b"1", b"2"])
  177. finally:
  178. ws.close()
  179. rs.close()
  180. @gen_test
  181. def test_close_buffered_data(self):
  182. # Similar to the previous test, but with data stored in the OS's
  183. # socket buffers instead of the IOStream's read buffer. Out-of-band
  184. # close notifications must be delayed until all data has been
  185. # drained into the IOStream buffer. (epoll used to use out-of-band
  186. # close events with EPOLLRDHUP, but no longer)
  187. #
  188. # This depends on the read_chunk_size being smaller than the
  189. # OS socket buffer, so make it small.
  190. rs, ws = yield self.make_iostream_pair(read_chunk_size=256)
  191. try:
  192. ws.write(b"A" * 512)
  193. data = yield rs.read_bytes(256)
  194. self.assertEqual(b"A" * 256, data)
  195. ws.close()
  196. # Allow the close to propagate to the `rs` side of the
  197. # connection. Using add_callback instead of add_timeout
  198. # doesn't seem to work, even with multiple iterations
  199. yield gen.sleep(0.01)
  200. data = yield rs.read_bytes(256)
  201. self.assertEqual(b"A" * 256, data)
  202. finally:
  203. ws.close()
  204. rs.close()
  205. @gen_test
  206. def test_read_until_close_after_close(self):
  207. # Similar to test_delayed_close_callback, but read_until_close takes
  208. # a separate code path so test it separately.
  209. rs, ws = yield self.make_iostream_pair()
  210. try:
  211. ws.write(b"1234")
  212. ws.close()
  213. # Read one byte to make sure the client has received the data.
  214. # It won't run the close callback as long as there is more buffered
  215. # data that could satisfy a later read.
  216. data = yield rs.read_bytes(1)
  217. self.assertEqual(data, b"1")
  218. data = yield rs.read_until_close()
  219. self.assertEqual(data, b"234")
  220. finally:
  221. ws.close()
  222. rs.close()
  223. @gen_test
  224. def test_large_read_until(self):
  225. # Performance test: read_until used to have a quadratic component
  226. # so a read_until of 4MB would take 8 seconds; now it takes 0.25
  227. # seconds.
  228. rs, ws = yield self.make_iostream_pair()
  229. try:
  230. # This test fails on pypy with ssl. I think it's because
  231. # pypy's gc defeats moves objects, breaking the
  232. # "frozen write buffer" assumption.
  233. if (
  234. isinstance(rs, SSLIOStream)
  235. and platform.python_implementation() == "PyPy"
  236. ):
  237. raise unittest.SkipTest("pypy gc causes problems with openssl")
  238. NUM_KB = 4096
  239. for i in range(NUM_KB):
  240. ws.write(b"A" * 1024)
  241. ws.write(b"\r\n")
  242. data = yield rs.read_until(b"\r\n")
  243. self.assertEqual(len(data), NUM_KB * 1024 + 2)
  244. finally:
  245. ws.close()
  246. rs.close()
  247. @gen_test
  248. async def test_read_until_with_close_after_second_packet(self):
  249. # This is a regression test for a regression in Tornado 6.0
  250. # (maybe 6.0.3?) reported in
  251. # https://github.com/tornadoweb/tornado/issues/2717
  252. #
  253. # The data arrives in two chunks; the stream is closed at the
  254. # same time that the second chunk is received. If the second
  255. # chunk is larger than the first, it works, but when this bug
  256. # existed it would fail if the second chunk were smaller than
  257. # the first. This is due to the optimization that the
  258. # read_until condition is only checked when the buffer doubles
  259. # in size
  260. async with self.iostream_pair() as (rs, ws):
  261. rf = asyncio.ensure_future(rs.read_until(b"done"))
  262. await ws.write(b"x" * 2048)
  263. ws.write(b"done")
  264. ws.close()
  265. await rf
  266. @gen_test
  267. async def test_read_until_unsatisfied_after_close(self: typing.Any):
  268. # If a stream is closed while reading, it raises
  269. # StreamClosedError instead of UnsatisfiableReadError (the
  270. # latter should only be raised when byte limits are reached).
  271. # The particular scenario tested here comes from #2717.
  272. async with self.iostream_pair() as (rs, ws):
  273. rf = asyncio.ensure_future(rs.read_until(b"done"))
  274. await ws.write(b"x" * 2048)
  275. ws.write(b"foo")
  276. ws.close()
  277. with self.assertRaises(StreamClosedError):
  278. await rf
  279. @gen_test
  280. def test_close_callback_with_pending_read(self: typing.Any):
  281. # Regression test for a bug that was introduced in 2.3
  282. # where the IOStream._close_callback would never be called
  283. # if there were pending reads.
  284. OK = b"OK\r\n"
  285. rs, ws = yield self.make_iostream_pair()
  286. event = Event()
  287. rs.set_close_callback(event.set)
  288. try:
  289. ws.write(OK)
  290. res = yield rs.read_until(b"\r\n")
  291. self.assertEqual(res, OK)
  292. ws.close()
  293. rs.read_until(b"\r\n")
  294. # If _close_callback (self.stop) is not called,
  295. # an AssertionError: Async operation timed out after 5 seconds
  296. # will be raised.
  297. yield event.wait()
  298. finally:
  299. ws.close()
  300. rs.close()
  301. @gen_test
  302. def test_future_close_callback(self):
  303. # Regression test for interaction between the Future read interfaces
  304. # and IOStream._maybe_add_error_listener.
  305. rs, ws = yield self.make_iostream_pair()
  306. closed = [False]
  307. cond = Condition()
  308. def close_callback():
  309. closed[0] = True
  310. cond.notify()
  311. rs.set_close_callback(close_callback)
  312. try:
  313. ws.write(b"a")
  314. res = yield rs.read_bytes(1)
  315. self.assertEqual(res, b"a")
  316. self.assertFalse(closed[0])
  317. ws.close()
  318. yield cond.wait()
  319. self.assertTrue(closed[0])
  320. finally:
  321. rs.close()
  322. ws.close()
  323. @gen_test
  324. def test_write_memoryview(self):
  325. rs, ws = yield self.make_iostream_pair()
  326. try:
  327. fut = rs.read_bytes(4)
  328. ws.write(memoryview(b"hello"))
  329. data = yield fut
  330. self.assertEqual(data, b"hell")
  331. finally:
  332. ws.close()
  333. rs.close()
  334. @gen_test
  335. def test_read_bytes_partial(self):
  336. rs, ws = yield self.make_iostream_pair()
  337. try:
  338. # Ask for more than is available with partial=True
  339. fut = rs.read_bytes(50, partial=True)
  340. ws.write(b"hello")
  341. data = yield fut
  342. self.assertEqual(data, b"hello")
  343. # Ask for less than what is available; num_bytes is still
  344. # respected.
  345. fut = rs.read_bytes(3, partial=True)
  346. ws.write(b"world")
  347. data = yield fut
  348. self.assertEqual(data, b"wor")
  349. # Partial reads won't return an empty string, but read_bytes(0)
  350. # will.
  351. data = yield rs.read_bytes(0, partial=True)
  352. self.assertEqual(data, b"")
  353. finally:
  354. ws.close()
  355. rs.close()
  356. @gen_test
  357. def test_read_until_max_bytes(self):
  358. rs, ws = yield self.make_iostream_pair()
  359. closed = Event()
  360. rs.set_close_callback(closed.set)
  361. try:
  362. # Extra room under the limit
  363. fut = rs.read_until(b"def", max_bytes=50)
  364. ws.write(b"abcdef")
  365. data = yield fut
  366. self.assertEqual(data, b"abcdef")
  367. # Just enough space
  368. fut = rs.read_until(b"def", max_bytes=6)
  369. ws.write(b"abcdef")
  370. data = yield fut
  371. self.assertEqual(data, b"abcdef")
  372. # Not enough space, but we don't know it until all we can do is
  373. # log a warning and close the connection.
  374. with ExpectLog(gen_log, "Unsatisfiable read"):
  375. fut = rs.read_until(b"def", max_bytes=5)
  376. ws.write(b"123456")
  377. yield closed.wait()
  378. finally:
  379. ws.close()
  380. rs.close()
  381. @gen_test
  382. def test_read_until_max_bytes_inline(self):
  383. rs, ws = yield self.make_iostream_pair()
  384. closed = Event()
  385. rs.set_close_callback(closed.set)
  386. try:
  387. # Similar to the error case in the previous test, but the
  388. # ws writes first so rs reads are satisfied
  389. # inline. For consistency with the out-of-line case, we
  390. # do not raise the error synchronously.
  391. ws.write(b"123456")
  392. with ExpectLog(gen_log, "Unsatisfiable read"):
  393. with self.assertRaises(StreamClosedError):
  394. yield rs.read_until(b"def", max_bytes=5)
  395. yield closed.wait()
  396. finally:
  397. ws.close()
  398. rs.close()
  399. @gen_test
  400. def test_read_until_max_bytes_ignores_extra(self):
  401. rs, ws = yield self.make_iostream_pair()
  402. closed = Event()
  403. rs.set_close_callback(closed.set)
  404. try:
  405. # Even though data that matches arrives the same packet that
  406. # puts us over the limit, we fail the request because it was not
  407. # found within the limit.
  408. ws.write(b"abcdef")
  409. with ExpectLog(gen_log, "Unsatisfiable read"):
  410. rs.read_until(b"def", max_bytes=5)
  411. yield closed.wait()
  412. finally:
  413. ws.close()
  414. rs.close()
  415. @gen_test
  416. def test_read_until_regex_max_bytes(self):
  417. rs, ws = yield self.make_iostream_pair()
  418. closed = Event()
  419. rs.set_close_callback(closed.set)
  420. try:
  421. # Extra room under the limit
  422. fut = rs.read_until_regex(b"def", max_bytes=50)
  423. ws.write(b"abcdef")
  424. data = yield fut
  425. self.assertEqual(data, b"abcdef")
  426. # Just enough space
  427. fut = rs.read_until_regex(b"def", max_bytes=6)
  428. ws.write(b"abcdef")
  429. data = yield fut
  430. self.assertEqual(data, b"abcdef")
  431. # Not enough space, but we don't know it until all we can do is
  432. # log a warning and close the connection.
  433. with ExpectLog(gen_log, "Unsatisfiable read"):
  434. rs.read_until_regex(b"def", max_bytes=5)
  435. ws.write(b"123456")
  436. yield closed.wait()
  437. finally:
  438. ws.close()
  439. rs.close()
  440. @gen_test
  441. def test_read_until_regex_max_bytes_inline(self):
  442. rs, ws = yield self.make_iostream_pair()
  443. closed = Event()
  444. rs.set_close_callback(closed.set)
  445. try:
  446. # Similar to the error case in the previous test, but the
  447. # ws writes first so rs reads are satisfied
  448. # inline. For consistency with the out-of-line case, we
  449. # do not raise the error synchronously.
  450. ws.write(b"123456")
  451. with ExpectLog(gen_log, "Unsatisfiable read"):
  452. rs.read_until_regex(b"def", max_bytes=5)
  453. yield closed.wait()
  454. finally:
  455. ws.close()
  456. rs.close()
  457. @gen_test
  458. def test_read_until_regex_max_bytes_ignores_extra(self):
  459. rs, ws = yield self.make_iostream_pair()
  460. closed = Event()
  461. rs.set_close_callback(closed.set)
  462. try:
  463. # Even though data that matches arrives the same packet that
  464. # puts us over the limit, we fail the request because it was not
  465. # found within the limit.
  466. ws.write(b"abcdef")
  467. with ExpectLog(gen_log, "Unsatisfiable read"):
  468. rs.read_until_regex(b"def", max_bytes=5)
  469. yield closed.wait()
  470. finally:
  471. ws.close()
  472. rs.close()
  473. @gen_test
  474. def test_small_reads_from_large_buffer(self):
  475. # 10KB buffer size, 100KB available to read.
  476. # Read 1KB at a time and make sure that the buffer is not eagerly
  477. # filled.
  478. rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
  479. try:
  480. ws.write(b"a" * 1024 * 100)
  481. for i in range(100):
  482. data = yield rs.read_bytes(1024)
  483. self.assertEqual(data, b"a" * 1024)
  484. finally:
  485. ws.close()
  486. rs.close()
  487. @gen_test
  488. def test_small_read_untils_from_large_buffer(self):
  489. # 10KB buffer size, 100KB available to read.
  490. # Read 1KB at a time and make sure that the buffer is not eagerly
  491. # filled.
  492. rs, ws = yield self.make_iostream_pair(max_buffer_size=10 * 1024)
  493. try:
  494. ws.write((b"a" * 1023 + b"\n") * 100)
  495. for i in range(100):
  496. data = yield rs.read_until(b"\n", max_bytes=4096)
  497. self.assertEqual(data, b"a" * 1023 + b"\n")
  498. finally:
  499. ws.close()
  500. rs.close()
  501. @gen_test
  502. def test_flow_control(self):
  503. MB = 1024 * 1024
  504. rs, ws = yield self.make_iostream_pair(max_buffer_size=5 * MB)
  505. try:
  506. # Client writes more than the rs will accept.
  507. ws.write(b"a" * 10 * MB)
  508. # The rs pauses while reading.
  509. yield rs.read_bytes(MB)
  510. yield gen.sleep(0.1)
  511. # The ws's writes have been blocked; the rs can
  512. # continue to read gradually.
  513. for i in range(9):
  514. yield rs.read_bytes(MB)
  515. finally:
  516. rs.close()
  517. ws.close()
  518. @gen_test
  519. def test_read_into(self):
  520. rs, ws = yield self.make_iostream_pair()
  521. def sleep_some():
  522. self.io_loop.run_sync(lambda: gen.sleep(0.05))
  523. try:
  524. buf = bytearray(10)
  525. fut = rs.read_into(buf)
  526. ws.write(b"hello")
  527. yield gen.sleep(0.05)
  528. self.assertTrue(rs.reading())
  529. ws.write(b"world!!")
  530. data = yield fut
  531. self.assertFalse(rs.reading())
  532. self.assertEqual(data, 10)
  533. self.assertEqual(bytes(buf), b"helloworld")
  534. # Existing buffer is fed into user buffer
  535. fut = rs.read_into(buf)
  536. yield gen.sleep(0.05)
  537. self.assertTrue(rs.reading())
  538. ws.write(b"1234567890")
  539. data = yield fut
  540. self.assertFalse(rs.reading())
  541. self.assertEqual(data, 10)
  542. self.assertEqual(bytes(buf), b"!!12345678")
  543. # Existing buffer can satisfy read immediately
  544. buf = bytearray(4)
  545. ws.write(b"abcdefghi")
  546. data = yield rs.read_into(buf)
  547. self.assertEqual(data, 4)
  548. self.assertEqual(bytes(buf), b"90ab")
  549. data = yield rs.read_bytes(7)
  550. self.assertEqual(data, b"cdefghi")
  551. finally:
  552. ws.close()
  553. rs.close()
  554. @gen_test
  555. def test_read_into_partial(self):
  556. rs, ws = yield self.make_iostream_pair()
  557. try:
  558. # Partial read
  559. buf = bytearray(10)
  560. fut = rs.read_into(buf, partial=True)
  561. ws.write(b"hello")
  562. data = yield fut
  563. self.assertFalse(rs.reading())
  564. self.assertEqual(data, 5)
  565. self.assertEqual(bytes(buf), b"hello\0\0\0\0\0")
  566. # Full read despite partial=True
  567. ws.write(b"world!1234567890")
  568. data = yield rs.read_into(buf, partial=True)
  569. self.assertEqual(data, 10)
  570. self.assertEqual(bytes(buf), b"world!1234")
  571. # Existing buffer can satisfy read immediately
  572. data = yield rs.read_into(buf, partial=True)
  573. self.assertEqual(data, 6)
  574. self.assertEqual(bytes(buf), b"5678901234")
  575. finally:
  576. ws.close()
  577. rs.close()
  578. @gen_test
  579. def test_read_into_zero_bytes(self):
  580. rs, ws = yield self.make_iostream_pair()
  581. try:
  582. buf = bytearray()
  583. fut = rs.read_into(buf)
  584. self.assertEqual(fut.result(), 0)
  585. finally:
  586. ws.close()
  587. rs.close()
  588. @gen_test
  589. def test_many_mixed_reads(self):
  590. # Stress buffer handling when going back and forth between
  591. # read_bytes() (using an internal buffer) and read_into()
  592. # (using a user-allocated buffer).
  593. r = random.Random(42)
  594. nbytes = 1000000
  595. rs, ws = yield self.make_iostream_pair()
  596. produce_hash = hashlib.sha1()
  597. consume_hash = hashlib.sha1()
  598. @gen.coroutine
  599. def produce():
  600. remaining = nbytes
  601. while remaining > 0:
  602. size = r.randint(1, min(1000, remaining))
  603. data = os.urandom(size)
  604. produce_hash.update(data)
  605. yield ws.write(data)
  606. remaining -= size
  607. assert remaining == 0
  608. @gen.coroutine
  609. def consume():
  610. remaining = nbytes
  611. while remaining > 0:
  612. if r.random() > 0.5:
  613. # read_bytes()
  614. size = r.randint(1, min(1000, remaining))
  615. data = yield rs.read_bytes(size)
  616. consume_hash.update(data)
  617. remaining -= size
  618. else:
  619. # read_into()
  620. size = r.randint(1, min(1000, remaining))
  621. buf = bytearray(size)
  622. n = yield rs.read_into(buf)
  623. assert n == size
  624. consume_hash.update(buf)
  625. remaining -= size
  626. assert remaining == 0
  627. try:
  628. yield [produce(), consume()]
  629. assert produce_hash.hexdigest() == consume_hash.hexdigest()
  630. finally:
  631. ws.close()
  632. rs.close()
  633. class TestIOStreamMixin(TestReadWriteMixin):
  634. def _make_server_iostream(self, connection, **kwargs):
  635. raise NotImplementedError()
  636. def _make_client_iostream(self, connection, **kwargs):
  637. raise NotImplementedError()
  638. @gen.coroutine
  639. def make_iostream_pair(self, **kwargs):
  640. listener, port = bind_unused_port()
  641. server_stream_fut = Future() # type: Future[IOStream]
  642. def accept_callback(connection, address):
  643. server_stream_fut.set_result(
  644. self._make_server_iostream(connection, **kwargs)
  645. )
  646. netutil.add_accept_handler(listener, accept_callback)
  647. client_stream = self._make_client_iostream(socket.socket(), **kwargs)
  648. connect_fut = client_stream.connect(("127.0.0.1", port))
  649. server_stream, client_stream = yield [server_stream_fut, connect_fut]
  650. self.io_loop.remove_handler(listener.fileno())
  651. listener.close()
  652. raise gen.Return((server_stream, client_stream))
  653. @gen_test
  654. def test_connection_refused(self):
  655. # When a connection is refused, the connect callback should not
  656. # be run. (The kqueue IOLoop used to behave differently from the
  657. # epoll IOLoop in this respect)
  658. cleanup_func, port = refusing_port()
  659. self.addCleanup(cleanup_func)
  660. stream = IOStream(socket.socket())
  661. stream.set_close_callback(self.stop)
  662. # log messages vary by platform and ioloop implementation
  663. with ExpectLog(gen_log, ".*", required=False):
  664. with self.assertRaises(StreamClosedError):
  665. yield stream.connect(("127.0.0.1", port))
  666. self.assertTrue(isinstance(stream.error, socket.error), stream.error)
  667. if sys.platform != "cygwin":
  668. _ERRNO_CONNREFUSED = [errno.ECONNREFUSED]
  669. if hasattr(errno, "WSAECONNREFUSED"):
  670. _ERRNO_CONNREFUSED.append(errno.WSAECONNREFUSED) # type: ignore
  671. # cygwin's errnos don't match those used on native windows python
  672. self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED) # type: ignore
  673. @gen_test
  674. def test_gaierror(self):
  675. # Test that IOStream sets its exc_info on getaddrinfo error.
  676. # It's difficult to reliably trigger a getaddrinfo error;
  677. # some resolvers own't even return errors for malformed names,
  678. # so we mock it instead. If IOStream changes to call a Resolver
  679. # before sock.connect, the mock target will need to change too.
  680. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  681. stream = IOStream(s)
  682. stream.set_close_callback(self.stop)
  683. with mock.patch(
  684. "socket.socket.connect", side_effect=socket.gaierror(errno.EIO, "boom")
  685. ):
  686. with self.assertRaises(StreamClosedError):
  687. yield stream.connect(("localhost", 80))
  688. self.assertTrue(isinstance(stream.error, socket.gaierror))
  689. @gen_test
  690. def test_read_until_close_with_error(self):
  691. server, client = yield self.make_iostream_pair()
  692. try:
  693. with mock.patch(
  694. "tornado.iostream.BaseIOStream._try_inline_read",
  695. side_effect=IOError("boom"),
  696. ):
  697. with self.assertRaisesRegexp(IOError, "boom"):
  698. client.read_until_close()
  699. finally:
  700. server.close()
  701. client.close()
  702. @skipIfNonUnix
  703. @skipPypy3V58
  704. @gen_test
  705. def test_inline_read_error(self):
  706. # An error on an inline read is raised without logging (on the
  707. # assumption that it will eventually be noticed or logged further
  708. # up the stack).
  709. #
  710. # This test is posix-only because windows os.close() doesn't work
  711. # on socket FDs, but we can't close the socket object normally
  712. # because we won't get the error we want if the socket knows
  713. # it's closed.
  714. server, client = yield self.make_iostream_pair()
  715. try:
  716. os.close(server.socket.fileno())
  717. with self.assertRaises(socket.error):
  718. server.read_bytes(1)
  719. finally:
  720. server.close()
  721. client.close()
  722. @skipPypy3V58
  723. @gen_test
  724. def test_async_read_error_logging(self):
  725. # Socket errors on asynchronous reads should be logged (but only
  726. # once).
  727. server, client = yield self.make_iostream_pair()
  728. closed = Event()
  729. server.set_close_callback(closed.set)
  730. try:
  731. # Start a read that will be fulfilled asynchronously.
  732. server.read_bytes(1)
  733. client.write(b"a")
  734. # Stub out read_from_fd to make it fail.
  735. def fake_read_from_fd():
  736. os.close(server.socket.fileno())
  737. server.__class__.read_from_fd(server)
  738. server.read_from_fd = fake_read_from_fd
  739. # This log message is from _handle_read (not read_from_fd).
  740. with ExpectLog(gen_log, "error on read"):
  741. yield closed.wait()
  742. finally:
  743. server.close()
  744. client.close()
  745. @gen_test
  746. def test_future_write(self):
  747. """
  748. Test that write() Futures are never orphaned.
  749. """
  750. # Run concurrent writers that will write enough bytes so as to
  751. # clog the socket buffer and accumulate bytes in our write buffer.
  752. m, n = 5000, 1000
  753. nproducers = 10
  754. total_bytes = m * n * nproducers
  755. server, client = yield self.make_iostream_pair(max_buffer_size=total_bytes)
  756. @gen.coroutine
  757. def produce():
  758. data = b"x" * m
  759. for i in range(n):
  760. yield server.write(data)
  761. @gen.coroutine
  762. def consume():
  763. nread = 0
  764. while nread < total_bytes:
  765. res = yield client.read_bytes(m)
  766. nread += len(res)
  767. try:
  768. yield [produce() for i in range(nproducers)] + [consume()]
  769. finally:
  770. server.close()
  771. client.close()
  772. class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
  773. def _make_client_iostream(self):
  774. return IOStream(socket.socket())
  775. class TestIOStreamWebHTTPS(TestIOStreamWebMixin, AsyncHTTPSTestCase):
  776. def _make_client_iostream(self):
  777. return SSLIOStream(socket.socket(), ssl_options=dict(cert_reqs=ssl.CERT_NONE))
  778. class TestIOStream(TestIOStreamMixin, AsyncTestCase):
  779. def _make_server_iostream(self, connection, **kwargs):
  780. return IOStream(connection, **kwargs)
  781. def _make_client_iostream(self, connection, **kwargs):
  782. return IOStream(connection, **kwargs)
  783. class TestIOStreamSSL(TestIOStreamMixin, AsyncTestCase):
  784. def _make_server_iostream(self, connection, **kwargs):
  785. connection = ssl.wrap_socket(
  786. connection,
  787. server_side=True,
  788. do_handshake_on_connect=False,
  789. **_server_ssl_options()
  790. )
  791. return SSLIOStream(connection, **kwargs)
  792. def _make_client_iostream(self, connection, **kwargs):
  793. return SSLIOStream(
  794. connection, ssl_options=dict(cert_reqs=ssl.CERT_NONE), **kwargs
  795. )
  796. # This will run some tests that are basically redundant but it's the
  797. # simplest way to make sure that it works to pass an SSLContext
  798. # instead of an ssl_options dict to the SSLIOStream constructor.
  799. class TestIOStreamSSLContext(TestIOStreamMixin, AsyncTestCase):
  800. def _make_server_iostream(self, connection, **kwargs):
  801. context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
  802. context.load_cert_chain(
  803. os.path.join(os.path.dirname(__file__), "test.crt"),
  804. os.path.join(os.path.dirname(__file__), "test.key"),
  805. )
  806. connection = ssl_wrap_socket(
  807. connection, context, server_side=True, do_handshake_on_connect=False
  808. )
  809. return SSLIOStream(connection, **kwargs)
  810. def _make_client_iostream(self, connection, **kwargs):
  811. context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
  812. return SSLIOStream(connection, ssl_options=context, **kwargs)
  813. class TestIOStreamStartTLS(AsyncTestCase):
  814. def setUp(self):
  815. try:
  816. super(TestIOStreamStartTLS, self).setUp()
  817. self.listener, self.port = bind_unused_port()
  818. self.server_stream = None
  819. self.server_accepted = Future() # type: Future[None]
  820. netutil.add_accept_handler(self.listener, self.accept)
  821. self.client_stream = IOStream(socket.socket())
  822. self.io_loop.add_future(
  823. self.client_stream.connect(("127.0.0.1", self.port)), self.stop
  824. )
  825. self.wait()
  826. self.io_loop.add_future(self.server_accepted, self.stop)
  827. self.wait()
  828. except Exception as e:
  829. print(e)
  830. raise
  831. def tearDown(self):
  832. if self.server_stream is not None:
  833. self.server_stream.close()
  834. if self.client_stream is not None:
  835. self.client_stream.close()
  836. self.listener.close()
  837. super(TestIOStreamStartTLS, self).tearDown()
  838. def accept(self, connection, address):
  839. if self.server_stream is not None:
  840. self.fail("should only get one connection")
  841. self.server_stream = IOStream(connection)
  842. self.server_accepted.set_result(None)
  843. @gen.coroutine
  844. def client_send_line(self, line):
  845. self.client_stream.write(line)
  846. recv_line = yield self.server_stream.read_until(b"\r\n")
  847. self.assertEqual(line, recv_line)
  848. @gen.coroutine
  849. def server_send_line(self, line):
  850. self.server_stream.write(line)
  851. recv_line = yield self.client_stream.read_until(b"\r\n")
  852. self.assertEqual(line, recv_line)
  853. def client_start_tls(self, ssl_options=None, server_hostname=None):
  854. client_stream = self.client_stream
  855. self.client_stream = None
  856. return client_stream.start_tls(False, ssl_options, server_hostname)
  857. def server_start_tls(self, ssl_options=None):
  858. server_stream = self.server_stream
  859. self.server_stream = None
  860. return server_stream.start_tls(True, ssl_options)
  861. @gen_test
  862. def test_start_tls_smtp(self):
  863. # This flow is simplified from RFC 3207 section 5.
  864. # We don't really need all of this, but it helps to make sure
  865. # that after realistic back-and-forth traffic the buffers end up
  866. # in a sane state.
  867. yield self.server_send_line(b"220 mail.example.com ready\r\n")
  868. yield self.client_send_line(b"EHLO mail.example.com\r\n")
  869. yield self.server_send_line(b"250-mail.example.com welcome\r\n")
  870. yield self.server_send_line(b"250 STARTTLS\r\n")
  871. yield self.client_send_line(b"STARTTLS\r\n")
  872. yield self.server_send_line(b"220 Go ahead\r\n")
  873. client_future = self.client_start_tls(dict(cert_reqs=ssl.CERT_NONE))
  874. server_future = self.server_start_tls(_server_ssl_options())
  875. self.client_stream = yield client_future
  876. self.server_stream = yield server_future
  877. self.assertTrue(isinstance(self.client_stream, SSLIOStream))
  878. self.assertTrue(isinstance(self.server_stream, SSLIOStream))
  879. yield self.client_send_line(b"EHLO mail.example.com\r\n")
  880. yield self.server_send_line(b"250 mail.example.com welcome\r\n")
  881. @gen_test
  882. def test_handshake_fail(self):
  883. server_future = self.server_start_tls(_server_ssl_options())
  884. # Certificates are verified with the default configuration.
  885. with ExpectLog(gen_log, "SSL Error"):
  886. client_future = self.client_start_tls(server_hostname="localhost")
  887. with self.assertRaises(ssl.SSLError):
  888. yield client_future
  889. with self.assertRaises((ssl.SSLError, socket.error)):
  890. yield server_future
  891. @gen_test
  892. def test_check_hostname(self):
  893. # Test that server_hostname parameter to start_tls is being used.
  894. # The check_hostname functionality is only available in python 2.7 and
  895. # up and in python 3.4 and up.
  896. server_future = self.server_start_tls(_server_ssl_options())
  897. with ExpectLog(gen_log, "SSL Error"):
  898. client_future = self.client_start_tls(
  899. ssl.create_default_context(), server_hostname="127.0.0.1"
  900. )
  901. with self.assertRaises(ssl.SSLError):
  902. # The client fails to connect with an SSL error.
  903. yield client_future
  904. with self.assertRaises(Exception):
  905. # The server fails to connect, but the exact error is unspecified.
  906. yield server_future
  907. class WaitForHandshakeTest(AsyncTestCase):
  908. @gen.coroutine
  909. def connect_to_server(self, server_cls):
  910. server = client = None
  911. try:
  912. sock, port = bind_unused_port()
  913. server = server_cls(ssl_options=_server_ssl_options())
  914. server.add_socket(sock)
  915. ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
  916. ssl_ctx.check_hostname = False
  917. ssl_ctx.verify_mode = ssl.CERT_NONE
  918. # These tests fail with ConnectionAbortedErrors with TLS
  919. # 1.3 on windows python 3.7.4 (which includes an upgrade
  920. # to openssl 1.1.c. Other platforms might be affected with
  921. # newer openssl too). Disable it until we figure out
  922. # what's up.
  923. ssl_ctx.options |= getattr(ssl, "OP_NO_TLSv1_3", 0)
  924. client = SSLIOStream(socket.socket(), ssl_options=ssl_ctx)
  925. yield client.connect(("127.0.0.1", port))
  926. self.assertIsNotNone(client.socket.cipher())
  927. finally:
  928. if server is not None:
  929. server.stop()
  930. if client is not None:
  931. client.close()
  932. @gen_test
  933. def test_wait_for_handshake_future(self):
  934. test = self
  935. handshake_future = Future() # type: Future[None]
  936. class TestServer(TCPServer):
  937. def handle_stream(self, stream, address):
  938. test.assertIsNone(stream.socket.cipher())
  939. test.io_loop.spawn_callback(self.handle_connection, stream)
  940. @gen.coroutine
  941. def handle_connection(self, stream):
  942. yield stream.wait_for_handshake()
  943. handshake_future.set_result(None)
  944. yield self.connect_to_server(TestServer)
  945. yield handshake_future
  946. @gen_test
  947. def test_wait_for_handshake_already_waiting_error(self):
  948. test = self
  949. handshake_future = Future() # type: Future[None]
  950. class TestServer(TCPServer):
  951. @gen.coroutine
  952. def handle_stream(self, stream, address):
  953. fut = stream.wait_for_handshake()
  954. test.assertRaises(RuntimeError, stream.wait_for_handshake)
  955. yield fut
  956. handshake_future.set_result(None)
  957. yield self.connect_to_server(TestServer)
  958. yield handshake_future
  959. @gen_test
  960. def test_wait_for_handshake_already_connected(self):
  961. handshake_future = Future() # type: Future[None]
  962. class TestServer(TCPServer):
  963. @gen.coroutine
  964. def handle_stream(self, stream, address):
  965. yield stream.wait_for_handshake()
  966. yield stream.wait_for_handshake()
  967. handshake_future.set_result(None)
  968. yield self.connect_to_server(TestServer)
  969. yield handshake_future
  970. @skipIfNonUnix
  971. class TestPipeIOStream(TestReadWriteMixin, AsyncTestCase):
  972. @gen.coroutine
  973. def make_iostream_pair(self, **kwargs):
  974. r, w = os.pipe()
  975. return PipeIOStream(r, **kwargs), PipeIOStream(w, **kwargs)
  976. @gen_test
  977. def test_pipe_iostream(self):
  978. rs, ws = yield self.make_iostream_pair()
  979. ws.write(b"hel")
  980. ws.write(b"lo world")
  981. data = yield rs.read_until(b" ")
  982. self.assertEqual(data, b"hello ")
  983. data = yield rs.read_bytes(3)
  984. self.assertEqual(data, b"wor")
  985. ws.close()
  986. data = yield rs.read_until_close()
  987. self.assertEqual(data, b"ld")
  988. rs.close()
  989. @gen_test
  990. def test_pipe_iostream_big_write(self):
  991. rs, ws = yield self.make_iostream_pair()
  992. NUM_BYTES = 1048576
  993. # Write 1MB of data, which should fill the buffer
  994. ws.write(b"1" * NUM_BYTES)
  995. data = yield rs.read_bytes(NUM_BYTES)
  996. self.assertEqual(data, b"1" * NUM_BYTES)
  997. ws.close()
  998. rs.close()
  999. class TestStreamBuffer(unittest.TestCase):
  1000. """
  1001. Unit tests for the private _StreamBuffer class.
  1002. """
  1003. def setUp(self):
  1004. self.random = random.Random(42)
  1005. def to_bytes(self, b):
  1006. if isinstance(b, (bytes, bytearray)):
  1007. return bytes(b)
  1008. elif isinstance(b, memoryview):
  1009. return b.tobytes() # For py2
  1010. else:
  1011. raise TypeError(b)
  1012. def make_streambuffer(self, large_buf_threshold=10):
  1013. buf = _StreamBuffer()
  1014. assert buf._large_buf_threshold
  1015. buf._large_buf_threshold = large_buf_threshold
  1016. return buf
  1017. def check_peek(self, buf, expected):
  1018. size = 1
  1019. while size < 2 * len(expected):
  1020. got = self.to_bytes(buf.peek(size))
  1021. self.assertTrue(got) # Not empty
  1022. self.assertLessEqual(len(got), size)
  1023. self.assertTrue(expected.startswith(got), (expected, got))
  1024. size = (size * 3 + 1) // 2
  1025. def check_append_all_then_skip_all(self, buf, objs, input_type):
  1026. self.assertEqual(len(buf), 0)
  1027. expected = b""
  1028. for o in objs:
  1029. expected += o
  1030. buf.append(input_type(o))
  1031. self.assertEqual(len(buf), len(expected))
  1032. self.check_peek(buf, expected)
  1033. while expected:
  1034. n = self.random.randrange(1, len(expected) + 1)
  1035. expected = expected[n:]
  1036. buf.advance(n)
  1037. self.assertEqual(len(buf), len(expected))
  1038. self.check_peek(buf, expected)
  1039. self.assertEqual(len(buf), 0)
  1040. def test_small(self):
  1041. objs = [b"12", b"345", b"67", b"89a", b"bcde", b"fgh", b"ijklmn"]
  1042. buf = self.make_streambuffer()
  1043. self.check_append_all_then_skip_all(buf, objs, bytes)
  1044. buf = self.make_streambuffer()
  1045. self.check_append_all_then_skip_all(buf, objs, bytearray)
  1046. buf = self.make_streambuffer()
  1047. self.check_append_all_then_skip_all(buf, objs, memoryview)
  1048. # Test internal algorithm
  1049. buf = self.make_streambuffer(10)
  1050. for i in range(9):
  1051. buf.append(b"x")
  1052. self.assertEqual(len(buf._buffers), 1)
  1053. for i in range(9):
  1054. buf.append(b"x")
  1055. self.assertEqual(len(buf._buffers), 2)
  1056. buf.advance(10)
  1057. self.assertEqual(len(buf._buffers), 1)
  1058. buf.advance(8)
  1059. self.assertEqual(len(buf._buffers), 0)
  1060. self.assertEqual(len(buf), 0)
  1061. def test_large(self):
  1062. objs = [
  1063. b"12" * 5,
  1064. b"345" * 2,
  1065. b"67" * 20,
  1066. b"89a" * 12,
  1067. b"bcde" * 1,
  1068. b"fgh" * 7,
  1069. b"ijklmn" * 2,
  1070. ]
  1071. buf = self.make_streambuffer()
  1072. self.check_append_all_then_skip_all(buf, objs, bytes)
  1073. buf = self.make_streambuffer()
  1074. self.check_append_all_then_skip_all(buf, objs, bytearray)
  1075. buf = self.make_streambuffer()
  1076. self.check_append_all_then_skip_all(buf, objs, memoryview)
  1077. # Test internal algorithm
  1078. buf = self.make_streambuffer(10)
  1079. for i in range(3):
  1080. buf.append(b"x" * 11)
  1081. self.assertEqual(len(buf._buffers), 3)
  1082. buf.append(b"y")
  1083. self.assertEqual(len(buf._buffers), 4)
  1084. buf.append(b"z")
  1085. self.assertEqual(len(buf._buffers), 4)
  1086. buf.advance(33)
  1087. self.assertEqual(len(buf._buffers), 1)
  1088. buf.advance(2)
  1089. self.assertEqual(len(buf._buffers), 0)
  1090. self.assertEqual(len(buf), 0)