simple_httpclient.py 27 KB


  1. from tornado.escape import _unicode
  2. from tornado import gen
  3. from tornado.httpclient import (
  4. HTTPResponse,
  5. HTTPError,
  6. AsyncHTTPClient,
  7. main,
  8. _RequestProxy,
  9. HTTPRequest,
  10. )
  11. from tornado import httputil
  12. from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
  13. from tornado.ioloop import IOLoop
  14. from tornado.iostream import StreamClosedError, IOStream
  15. from tornado.netutil import (
  16. Resolver,
  17. OverrideResolver,
  18. _client_ssl_defaults,
  19. is_valid_ip,
  20. )
  21. from tornado.log import gen_log
  22. from tornado.tcpclient import TCPClient
  23. import base64
  24. import collections
  25. import copy
  26. import functools
  27. import re
  28. import socket
  29. import ssl
  30. import sys
  31. import time
  32. from io import BytesIO
  33. import urllib.parse
  34. from typing import Dict, Any, Callable, Optional, Type, Union
  35. from types import TracebackType
  36. import typing
  37. if typing.TYPE_CHECKING:
  38. from typing import Deque, Tuple, List # noqa: F401
  39. class HTTPTimeoutError(HTTPError):
  40. """Error raised by SimpleAsyncHTTPClient on timeout.
  41. For historical reasons, this is a subclass of `.HTTPClientError`
  42. which simulates a response code of 599.
  43. .. versionadded:: 5.1
  44. """
  45. def __init__(self, message: str) -> None:
  46. super(HTTPTimeoutError, self).__init__(599, message=message)
  47. def __str__(self) -> str:
  48. return self.message or "Timeout"
  49. class HTTPStreamClosedError(HTTPError):
  50. """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed.
  51. When a more specific exception is available (such as `ConnectionResetError`),
  52. it may be raised instead of this one.
  53. For historical reasons, this is a subclass of `.HTTPClientError`
  54. which simulates a response code of 599.
  55. .. versionadded:: 5.1
  56. """
  57. def __init__(self, message: str) -> None:
  58. super(HTTPStreamClosedError, self).__init__(599, message=message)
  59. def __str__(self) -> str:
  60. return self.message or "Stream closed"
  61. class SimpleAsyncHTTPClient(AsyncHTTPClient):
  62. """Non-blocking HTTP client with no external dependencies.
  63. This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
  64. Some features found in the curl-based AsyncHTTPClient are not yet
  65. supported. In particular, proxies are not supported, connections
  66. are not reused, and callers cannot select the network interface to be
  67. used.
  68. """
  69. def initialize( # type: ignore
  70. self,
  71. max_clients: int = 10,
  72. hostname_mapping: Dict[str, str] = None,
  73. max_buffer_size: int = 104857600,
  74. resolver: Resolver = None,
  75. defaults: Dict[str, Any] = None,
  76. max_header_size: int = None,
  77. max_body_size: int = None,
  78. ) -> None:
  79. """Creates a AsyncHTTPClient.
  80. Only a single AsyncHTTPClient instance exists per IOLoop
  81. in order to provide limitations on the number of pending connections.
  82. ``force_instance=True`` may be used to suppress this behavior.
  83. Note that because of this implicit reuse, unless ``force_instance``
  84. is used, only the first call to the constructor actually uses
  85. its arguments. It is recommended to use the ``configure`` method
  86. instead of the constructor to ensure that arguments take effect.
  87. ``max_clients`` is the number of concurrent requests that can be
  88. in progress; when this limit is reached additional requests will be
  89. queued. Note that time spent waiting in this queue still counts
  90. against the ``request_timeout``.
  91. ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses.
  92. It can be used to make local DNS changes when modifying system-wide
  93. settings like ``/etc/hosts`` is not possible or desirable (e.g. in
  94. unittests).
  95. ``max_buffer_size`` (default 100MB) is the number of bytes
  96. that can be read into memory at once. ``max_body_size``
  97. (defaults to ``max_buffer_size``) is the largest response body
  98. that the client will accept. Without a
  99. ``streaming_callback``, the smaller of these two limits
  100. applies; with a ``streaming_callback`` only ``max_body_size``
  101. does.
  102. .. versionchanged:: 4.2
  103. Added the ``max_body_size`` argument.
  104. """
  105. super(SimpleAsyncHTTPClient, self).initialize(defaults=defaults)
  106. self.max_clients = max_clients
  107. self.queue = (
  108. collections.deque()
  109. ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]]
  110. self.active = (
  111. {}
  112. ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]]
  113. self.waiting = (
  114. {}
  115. ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]]
  116. self.max_buffer_size = max_buffer_size
  117. self.max_header_size = max_header_size
  118. self.max_body_size = max_body_size
  119. # TCPClient could create a Resolver for us, but we have to do it
  120. # ourselves to support hostname_mapping.
  121. if resolver:
  122. self.resolver = resolver
  123. self.own_resolver = False
  124. else:
  125. self.resolver = Resolver()
  126. self.own_resolver = True
  127. if hostname_mapping is not None:
  128. self.resolver = OverrideResolver(
  129. resolver=self.resolver, mapping=hostname_mapping
  130. )
  131. self.tcp_client = TCPClient(resolver=self.resolver)
  132. def close(self) -> None:
  133. super(SimpleAsyncHTTPClient, self).close()
  134. if self.own_resolver:
  135. self.resolver.close()
  136. self.tcp_client.close()
  137. def fetch_impl(
  138. self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]
  139. ) -> None:
  140. key = object()
  141. self.queue.append((key, request, callback))
  142. if not len(self.active) < self.max_clients:
  143. assert request.connect_timeout is not None
  144. assert request.request_timeout is not None
  145. timeout_handle = self.io_loop.add_timeout(
  146. self.io_loop.time()
  147. + min(request.connect_timeout, request.request_timeout),
  148. functools.partial(self._on_timeout, key, "in request queue"),
  149. )
  150. else:
  151. timeout_handle = None
  152. self.waiting[key] = (request, callback, timeout_handle)
  153. self._process_queue()
  154. if self.queue:
  155. gen_log.debug(
  156. "max_clients limit reached, request queued. "
  157. "%d active, %d queued requests." % (len(self.active), len(self.queue))
  158. )
  159. def _process_queue(self) -> None:
  160. while self.queue and len(self.active) < self.max_clients:
  161. key, request, callback = self.queue.popleft()
  162. if key not in self.waiting:
  163. continue
  164. self._remove_timeout(key)
  165. self.active[key] = (request, callback)
  166. release_callback = functools.partial(self._release_fetch, key)
  167. self._handle_request(request, release_callback, callback)
  168. def _connection_class(self) -> type:
  169. return _HTTPConnection
  170. def _handle_request(
  171. self,
  172. request: HTTPRequest,
  173. release_callback: Callable[[], None],
  174. final_callback: Callable[[HTTPResponse], None],
  175. ) -> None:
  176. self._connection_class()(
  177. self,
  178. request,
  179. release_callback,
  180. final_callback,
  181. self.max_buffer_size,
  182. self.tcp_client,
  183. self.max_header_size,
  184. self.max_body_size,
  185. )
  186. def _release_fetch(self, key: object) -> None:
  187. del self.active[key]
  188. self._process_queue()
  189. def _remove_timeout(self, key: object) -> None:
  190. if key in self.waiting:
  191. request, callback, timeout_handle = self.waiting[key]
  192. if timeout_handle is not None:
  193. self.io_loop.remove_timeout(timeout_handle)
  194. del self.waiting[key]
  195. def _on_timeout(self, key: object, info: str = None) -> None:
  196. """Timeout callback of request.
  197. Construct a timeout HTTPResponse when a timeout occurs.
  198. :arg object key: A simple object to mark the request.
  199. :info string key: More detailed timeout information.
  200. """
  201. request, callback, timeout_handle = self.waiting[key]
  202. self.queue.remove((key, request, callback))
  203. error_message = "Timeout {0}".format(info) if info else "Timeout"
  204. timeout_response = HTTPResponse(
  205. request,
  206. 599,
  207. error=HTTPTimeoutError(error_message),
  208. request_time=self.io_loop.time() - request.start_time,
  209. )
  210. self.io_loop.add_callback(callback, timeout_response)
  211. del self.waiting[key]
  212. class _HTTPConnection(httputil.HTTPMessageDelegate):
  213. _SUPPORTED_METHODS = set(
  214. ["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
  215. )
  216. def __init__(
  217. self,
  218. client: Optional[SimpleAsyncHTTPClient],
  219. request: HTTPRequest,
  220. release_callback: Callable[[], None],
  221. final_callback: Callable[[HTTPResponse], None],
  222. max_buffer_size: int,
  223. tcp_client: TCPClient,
  224. max_header_size: int,
  225. max_body_size: int,
  226. ) -> None:
  227. self.io_loop = IOLoop.current()
  228. self.start_time = self.io_loop.time()
  229. self.start_wall_time = time.time()
  230. self.client = client
  231. self.request = request
  232. self.release_callback = release_callback
  233. self.final_callback = final_callback
  234. self.max_buffer_size = max_buffer_size
  235. self.tcp_client = tcp_client
  236. self.max_header_size = max_header_size
  237. self.max_body_size = max_body_size
  238. self.code = None # type: Optional[int]
  239. self.headers = None # type: Optional[httputil.HTTPHeaders]
  240. self.chunks = [] # type: List[bytes]
  241. self._decompressor = None
  242. # Timeout handle returned by IOLoop.add_timeout
  243. self._timeout = None # type: object
  244. self._sockaddr = None
  245. IOLoop.current().add_future(
  246. gen.convert_yielded(self.run()), lambda f: f.result()
  247. )
  248. async def run(self) -> None:
  249. try:
  250. self.parsed = urllib.parse.urlsplit(_unicode(self.request.url))
  251. if self.parsed.scheme not in ("http", "https"):
  252. raise ValueError("Unsupported url scheme: %s" % self.request.url)
  253. # urlsplit results have hostname and port results, but they
  254. # didn't support ipv6 literals until python 2.7.
  255. netloc = self.parsed.netloc
  256. if "@" in netloc:
  257. userpass, _, netloc = netloc.rpartition("@")
  258. host, port = httputil.split_host_and_port(netloc)
  259. if port is None:
  260. port = 443 if self.parsed.scheme == "https" else 80
  261. if re.match(r"^\[.*\]$", host):
  262. # raw ipv6 addresses in urls are enclosed in brackets
  263. host = host[1:-1]
  264. self.parsed_hostname = host # save final host for _on_connect
  265. if self.request.allow_ipv6 is False:
  266. af = socket.AF_INET
  267. else:
  268. af = socket.AF_UNSPEC
  269. ssl_options = self._get_ssl_options(self.parsed.scheme)
  270. source_ip = None
  271. if self.request.network_interface:
  272. if is_valid_ip(self.request.network_interface):
  273. source_ip = self.request.network_interface
  274. else:
  275. raise ValueError(
  276. "Unrecognized IPv4 or IPv6 address for network_interface, got %r"
  277. % (self.request.network_interface,)
  278. )
  279. timeout = min(self.request.connect_timeout, self.request.request_timeout)
  280. if timeout:
  281. self._timeout = self.io_loop.add_timeout(
  282. self.start_time + timeout,
  283. functools.partial(self._on_timeout, "while connecting"),
  284. )
  285. stream = await self.tcp_client.connect(
  286. host,
  287. port,
  288. af=af,
  289. ssl_options=ssl_options,
  290. max_buffer_size=self.max_buffer_size,
  291. source_ip=source_ip,
  292. )
  293. if self.final_callback is None:
  294. # final_callback is cleared if we've hit our timeout.
  295. stream.close()
  296. return
  297. self.stream = stream
  298. self.stream.set_close_callback(self.on_connection_close)
  299. self._remove_timeout()
  300. if self.final_callback is None:
  301. return
  302. if self.request.request_timeout:
  303. self._timeout = self.io_loop.add_timeout(
  304. self.start_time + self.request.request_timeout,
  305. functools.partial(self._on_timeout, "during request"),
  306. )
  307. if (
  308. self.request.method not in self._SUPPORTED_METHODS
  309. and not self.request.allow_nonstandard_methods
  310. ):
  311. raise KeyError("unknown method %s" % self.request.method)
  312. for key in (
  313. "proxy_host",
  314. "proxy_port",
  315. "proxy_username",
  316. "proxy_password",
  317. "proxy_auth_mode",
  318. ):
  319. if getattr(self.request, key, None):
  320. raise NotImplementedError("%s not supported" % key)
  321. if "Connection" not in self.request.headers:
  322. self.request.headers["Connection"] = "close"
  323. if "Host" not in self.request.headers:
  324. if "@" in self.parsed.netloc:
  325. self.request.headers["Host"] = self.parsed.netloc.rpartition(
  326. "@"
  327. )[-1]
  328. else:
  329. self.request.headers["Host"] = self.parsed.netloc
  330. username, password = None, None
  331. if self.parsed.username is not None:
  332. username, password = self.parsed.username, self.parsed.password
  333. elif self.request.auth_username is not None:
  334. username = self.request.auth_username
  335. password = self.request.auth_password or ""
  336. if username is not None:
  337. assert password is not None
  338. if self.request.auth_mode not in (None, "basic"):
  339. raise ValueError(
  340. "unsupported auth_mode %s", self.request.auth_mode
  341. )
  342. self.request.headers["Authorization"] = "Basic " + _unicode(
  343. base64.b64encode(
  344. httputil.encode_username_password(username, password)
  345. )
  346. )
  347. if self.request.user_agent:
  348. self.request.headers["User-Agent"] = self.request.user_agent
  349. if not self.request.allow_nonstandard_methods:
  350. # Some HTTP methods nearly always have bodies while others
  351. # almost never do. Fail in this case unless the user has
  352. # opted out of sanity checks with allow_nonstandard_methods.
  353. body_expected = self.request.method in ("POST", "PATCH", "PUT")
  354. body_present = (
  355. self.request.body is not None
  356. or self.request.body_producer is not None
  357. )
  358. if (body_expected and not body_present) or (
  359. body_present and not body_expected
  360. ):
  361. raise ValueError(
  362. "Body must %sbe None for method %s (unless "
  363. "allow_nonstandard_methods is true)"
  364. % ("not " if body_expected else "", self.request.method)
  365. )
  366. if self.request.expect_100_continue:
  367. self.request.headers["Expect"] = "100-continue"
  368. if self.request.body is not None:
  369. # When body_producer is used the caller is responsible for
  370. # setting Content-Length (or else chunked encoding will be used).
  371. self.request.headers["Content-Length"] = str(len(self.request.body))
  372. if (
  373. self.request.method == "POST"
  374. and "Content-Type" not in self.request.headers
  375. ):
  376. self.request.headers[
  377. "Content-Type"
  378. ] = "application/x-www-form-urlencoded"
  379. if self.request.decompress_response:
  380. self.request.headers["Accept-Encoding"] = "gzip"
  381. req_path = (self.parsed.path or "/") + (
  382. ("?" + self.parsed.query) if self.parsed.query else ""
  383. )
  384. self.connection = self._create_connection(stream)
  385. start_line = httputil.RequestStartLine(
  386. self.request.method, req_path, ""
  387. )
  388. self.connection.write_headers(start_line, self.request.headers)
  389. if self.request.expect_100_continue:
  390. await self.connection.read_response(self)
  391. else:
  392. await self._write_body(True)
  393. except Exception:
  394. if not self._handle_exception(*sys.exc_info()):
  395. raise
  396. def _get_ssl_options(
  397. self, scheme: str
  398. ) -> Union[None, Dict[str, Any], ssl.SSLContext]:
  399. if scheme == "https":
  400. if self.request.ssl_options is not None:
  401. return self.request.ssl_options
  402. # If we are using the defaults, don't construct a
  403. # new SSLContext.
  404. if (
  405. self.request.validate_cert
  406. and self.request.ca_certs is None
  407. and self.request.client_cert is None
  408. and self.request.client_key is None
  409. ):
  410. return _client_ssl_defaults
  411. ssl_ctx = ssl.create_default_context(
  412. ssl.Purpose.SERVER_AUTH, cafile=self.request.ca_certs
  413. )
  414. if not self.request.validate_cert:
  415. ssl_ctx.check_hostname = False
  416. ssl_ctx.verify_mode = ssl.CERT_NONE
  417. if self.request.client_cert is not None:
  418. ssl_ctx.load_cert_chain(
  419. self.request.client_cert, self.request.client_key
  420. )
  421. if hasattr(ssl, "OP_NO_COMPRESSION"):
  422. # See netutil.ssl_options_to_context
  423. ssl_ctx.options |= ssl.OP_NO_COMPRESSION
  424. return ssl_ctx
  425. return None
  426. def _on_timeout(self, info: str = None) -> None:
  427. """Timeout callback of _HTTPConnection instance.
  428. Raise a `HTTPTimeoutError` when a timeout occurs.
  429. :info string key: More detailed timeout information.
  430. """
  431. self._timeout = None
  432. error_message = "Timeout {0}".format(info) if info else "Timeout"
  433. if self.final_callback is not None:
  434. self._handle_exception(
  435. HTTPTimeoutError, HTTPTimeoutError(error_message), None
  436. )
  437. def _remove_timeout(self) -> None:
  438. if self._timeout is not None:
  439. self.io_loop.remove_timeout(self._timeout)
  440. self._timeout = None
  441. def _create_connection(self, stream: IOStream) -> HTTP1Connection:
  442. stream.set_nodelay(True)
  443. connection = HTTP1Connection(
  444. stream,
  445. True,
  446. HTTP1ConnectionParameters(
  447. no_keep_alive=True,
  448. max_header_size=self.max_header_size,
  449. max_body_size=self.max_body_size,
  450. decompress=bool(self.request.decompress_response),
  451. ),
  452. self._sockaddr,
  453. )
  454. return connection
  455. async def _write_body(self, start_read: bool) -> None:
  456. if self.request.body is not None:
  457. self.connection.write(self.request.body)
  458. elif self.request.body_producer is not None:
  459. fut = self.request.body_producer(self.connection.write)
  460. if fut is not None:
  461. await fut
  462. self.connection.finish()
  463. if start_read:
  464. try:
  465. await self.connection.read_response(self)
  466. except StreamClosedError:
  467. if not self._handle_exception(*sys.exc_info()):
  468. raise
  469. def _release(self) -> None:
  470. if self.release_callback is not None:
  471. release_callback = self.release_callback
  472. self.release_callback = None # type: ignore
  473. release_callback()
  474. def _run_callback(self, response: HTTPResponse) -> None:
  475. self._release()
  476. if self.final_callback is not None:
  477. final_callback = self.final_callback
  478. self.final_callback = None # type: ignore
  479. self.io_loop.add_callback(final_callback, response)
  480. def _handle_exception(
  481. self,
  482. typ: "Optional[Type[BaseException]]",
  483. value: Optional[BaseException],
  484. tb: Optional[TracebackType],
  485. ) -> bool:
  486. if self.final_callback:
  487. self._remove_timeout()
  488. if isinstance(value, StreamClosedError):
  489. if value.real_error is None:
  490. value = HTTPStreamClosedError("Stream closed")
  491. else:
  492. value = value.real_error
  493. self._run_callback(
  494. HTTPResponse(
  495. self.request,
  496. 599,
  497. error=value,
  498. request_time=self.io_loop.time() - self.start_time,
  499. start_time=self.start_wall_time,
  500. )
  501. )
  502. if hasattr(self, "stream"):
  503. # TODO: this may cause a StreamClosedError to be raised
  504. # by the connection's Future. Should we cancel the
  505. # connection more gracefully?
  506. self.stream.close()
  507. return True
  508. else:
  509. # If our callback has already been called, we are probably
  510. # catching an exception that is not caused by us but rather
  511. # some child of our callback. Rather than drop it on the floor,
  512. # pass it along, unless it's just the stream being closed.
  513. return isinstance(value, StreamClosedError)
  514. def on_connection_close(self) -> None:
  515. if self.final_callback is not None:
  516. message = "Connection closed"
  517. if self.stream.error:
  518. raise self.stream.error
  519. try:
  520. raise HTTPStreamClosedError(message)
  521. except HTTPStreamClosedError:
  522. self._handle_exception(*sys.exc_info())
  523. async def headers_received(
  524. self,
  525. first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine],
  526. headers: httputil.HTTPHeaders,
  527. ) -> None:
  528. assert isinstance(first_line, httputil.ResponseStartLine)
  529. if self.request.expect_100_continue and first_line.code == 100:
  530. await self._write_body(False)
  531. return
  532. self.code = first_line.code
  533. self.reason = first_line.reason
  534. self.headers = headers
  535. if self._should_follow_redirect():
  536. return
  537. if self.request.header_callback is not None:
  538. # Reassemble the start line.
  539. self.request.header_callback("%s %s %s\r\n" % first_line)
  540. for k, v in self.headers.get_all():
  541. self.request.header_callback("%s: %s\r\n" % (k, v))
  542. self.request.header_callback("\r\n")
  543. def _should_follow_redirect(self) -> bool:
  544. if self.request.follow_redirects:
  545. assert self.request.max_redirects is not None
  546. return (
  547. self.code in (301, 302, 303, 307, 308)
  548. and self.request.max_redirects > 0
  549. and self.headers is not None
  550. and self.headers.get("Location") is not None
  551. )
  552. return False
  553. def finish(self) -> None:
  554. assert self.code is not None
  555. data = b"".join(self.chunks)
  556. self._remove_timeout()
  557. original_request = getattr(self.request, "original_request", self.request)
  558. if self._should_follow_redirect():
  559. assert isinstance(self.request, _RequestProxy)
  560. new_request = copy.copy(self.request.request)
  561. new_request.url = urllib.parse.urljoin(
  562. self.request.url, self.headers["Location"]
  563. )
  564. new_request.max_redirects = self.request.max_redirects - 1
  565. del new_request.headers["Host"]
  566. # https://tools.ietf.org/html/rfc7231#section-6.4
  567. #
  568. # The original HTTP spec said that after a 301 or 302
  569. # redirect, the request method should be preserved.
  570. # However, browsers implemented this by changing the
  571. # method to GET, and the behavior stuck. 303 redirects
  572. # always specified this POST-to-GET behavior (arguably 303
  573. # redirects should change *all* requests to GET, but
  574. # libcurl only does this for POST so we follow their
  575. # example).
  576. if self.code in (301, 302, 303) and self.request.method == "POST":
  577. new_request.method = "GET"
  578. new_request.body = None
  579. for h in [
  580. "Content-Length",
  581. "Content-Type",
  582. "Content-Encoding",
  583. "Transfer-Encoding",
  584. ]:
  585. try:
  586. del self.request.headers[h]
  587. except KeyError:
  588. pass
  589. new_request.original_request = original_request
  590. final_callback = self.final_callback
  591. self.final_callback = None
  592. self._release()
  593. fut = self.client.fetch(new_request, raise_error=False)
  594. fut.add_done_callback(lambda f: final_callback(f.result()))
  595. self._on_end_request()
  596. return
  597. if self.request.streaming_callback:
  598. buffer = BytesIO()
  599. else:
  600. buffer = BytesIO(data) # TODO: don't require one big string?
  601. response = HTTPResponse(
  602. original_request,
  603. self.code,
  604. reason=getattr(self, "reason", None),
  605. headers=self.headers,
  606. request_time=self.io_loop.time() - self.start_time,
  607. start_time=self.start_wall_time,
  608. buffer=buffer,
  609. effective_url=self.request.url,
  610. )
  611. self._run_callback(response)
  612. self._on_end_request()
  613. def _on_end_request(self) -> None:
  614. self.stream.close()
  615. def data_received(self, chunk: bytes) -> None:
  616. if self._should_follow_redirect():
  617. # We're going to follow a redirect so just discard the body.
  618. return
  619. if self.request.streaming_callback is not None:
  620. self.request.streaming_callback(chunk)
  621. else:
  622. self.chunks.append(chunk)
  623. if __name__ == "__main__":
  624. AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
  625. main()