| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692 |
- from tornado.escape import _unicode
- from tornado import gen
- from tornado.httpclient import (
- HTTPResponse,
- HTTPError,
- AsyncHTTPClient,
- main,
- _RequestProxy,
- HTTPRequest,
- )
- from tornado import httputil
- from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
- from tornado.ioloop import IOLoop
- from tornado.iostream import StreamClosedError, IOStream
- from tornado.netutil import (
- Resolver,
- OverrideResolver,
- _client_ssl_defaults,
- is_valid_ip,
- )
- from tornado.log import gen_log
- from tornado.tcpclient import TCPClient
- import base64
- import collections
- import copy
- import functools
- import re
- import socket
- import ssl
- import sys
- import time
- from io import BytesIO
- import urllib.parse
- from typing import Dict, Any, Callable, Optional, Type, Union
- from types import TracebackType
- import typing
- if typing.TYPE_CHECKING:
- from typing import Deque, Tuple, List # noqa: F401
- class HTTPTimeoutError(HTTPError):
- """Error raised by SimpleAsyncHTTPClient on timeout.
- For historical reasons, this is a subclass of `.HTTPClientError`
- which simulates a response code of 599.
- .. versionadded:: 5.1
- """
- def __init__(self, message: str) -> None:
- super(HTTPTimeoutError, self).__init__(599, message=message)
- def __str__(self) -> str:
- return self.message or "Timeout"
- class HTTPStreamClosedError(HTTPError):
- """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed.
- When a more specific exception is available (such as `ConnectionResetError`),
- it may be raised instead of this one.
- For historical reasons, this is a subclass of `.HTTPClientError`
- which simulates a response code of 599.
- .. versionadded:: 5.1
- """
- def __init__(self, message: str) -> None:
- super(HTTPStreamClosedError, self).__init__(599, message=message)
- def __str__(self) -> str:
- return self.message or "Stream closed"
- class SimpleAsyncHTTPClient(AsyncHTTPClient):
- """Non-blocking HTTP client with no external dependencies.
- This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
- Some features found in the curl-based AsyncHTTPClient are not yet
- supported. In particular, proxies are not supported, connections
- are not reused, and callers cannot select the network interface to be
- used.
- """
- def initialize( # type: ignore
- self,
- max_clients: int = 10,
- hostname_mapping: Dict[str, str] = None,
- max_buffer_size: int = 104857600,
- resolver: Resolver = None,
- defaults: Dict[str, Any] = None,
- max_header_size: int = None,
- max_body_size: int = None,
- ) -> None:
- """Creates a AsyncHTTPClient.
- Only a single AsyncHTTPClient instance exists per IOLoop
- in order to provide limitations on the number of pending connections.
- ``force_instance=True`` may be used to suppress this behavior.
- Note that because of this implicit reuse, unless ``force_instance``
- is used, only the first call to the constructor actually uses
- its arguments. It is recommended to use the ``configure`` method
- instead of the constructor to ensure that arguments take effect.
- ``max_clients`` is the number of concurrent requests that can be
- in progress; when this limit is reached additional requests will be
- queued. Note that time spent waiting in this queue still counts
- against the ``request_timeout``.
- ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses.
- It can be used to make local DNS changes when modifying system-wide
- settings like ``/etc/hosts`` is not possible or desirable (e.g. in
- unittests).
- ``max_buffer_size`` (default 100MB) is the number of bytes
- that can be read into memory at once. ``max_body_size``
- (defaults to ``max_buffer_size``) is the largest response body
- that the client will accept. Without a
- ``streaming_callback``, the smaller of these two limits
- applies; with a ``streaming_callback`` only ``max_body_size``
- does.
- .. versionchanged:: 4.2
- Added the ``max_body_size`` argument.
- """
- super(SimpleAsyncHTTPClient, self).initialize(defaults=defaults)
- self.max_clients = max_clients
- self.queue = (
- collections.deque()
- ) # type: Deque[Tuple[object, HTTPRequest, Callable[[HTTPResponse], None]]]
- self.active = (
- {}
- ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None]]]
- self.waiting = (
- {}
- ) # type: Dict[object, Tuple[HTTPRequest, Callable[[HTTPResponse], None], object]]
- self.max_buffer_size = max_buffer_size
- self.max_header_size = max_header_size
- self.max_body_size = max_body_size
- # TCPClient could create a Resolver for us, but we have to do it
- # ourselves to support hostname_mapping.
- if resolver:
- self.resolver = resolver
- self.own_resolver = False
- else:
- self.resolver = Resolver()
- self.own_resolver = True
- if hostname_mapping is not None:
- self.resolver = OverrideResolver(
- resolver=self.resolver, mapping=hostname_mapping
- )
- self.tcp_client = TCPClient(resolver=self.resolver)
- def close(self) -> None:
- super(SimpleAsyncHTTPClient, self).close()
- if self.own_resolver:
- self.resolver.close()
- self.tcp_client.close()
- def fetch_impl(
- self, request: HTTPRequest, callback: Callable[[HTTPResponse], None]
- ) -> None:
- key = object()
- self.queue.append((key, request, callback))
- if not len(self.active) < self.max_clients:
- assert request.connect_timeout is not None
- assert request.request_timeout is not None
- timeout_handle = self.io_loop.add_timeout(
- self.io_loop.time()
- + min(request.connect_timeout, request.request_timeout),
- functools.partial(self._on_timeout, key, "in request queue"),
- )
- else:
- timeout_handle = None
- self.waiting[key] = (request, callback, timeout_handle)
- self._process_queue()
- if self.queue:
- gen_log.debug(
- "max_clients limit reached, request queued. "
- "%d active, %d queued requests." % (len(self.active), len(self.queue))
- )
- def _process_queue(self) -> None:
- while self.queue and len(self.active) < self.max_clients:
- key, request, callback = self.queue.popleft()
- if key not in self.waiting:
- continue
- self._remove_timeout(key)
- self.active[key] = (request, callback)
- release_callback = functools.partial(self._release_fetch, key)
- self._handle_request(request, release_callback, callback)
- def _connection_class(self) -> type:
- return _HTTPConnection
- def _handle_request(
- self,
- request: HTTPRequest,
- release_callback: Callable[[], None],
- final_callback: Callable[[HTTPResponse], None],
- ) -> None:
- self._connection_class()(
- self,
- request,
- release_callback,
- final_callback,
- self.max_buffer_size,
- self.tcp_client,
- self.max_header_size,
- self.max_body_size,
- )
- def _release_fetch(self, key: object) -> None:
- del self.active[key]
- self._process_queue()
- def _remove_timeout(self, key: object) -> None:
- if key in self.waiting:
- request, callback, timeout_handle = self.waiting[key]
- if timeout_handle is not None:
- self.io_loop.remove_timeout(timeout_handle)
- del self.waiting[key]
- def _on_timeout(self, key: object, info: str = None) -> None:
- """Timeout callback of request.
- Construct a timeout HTTPResponse when a timeout occurs.
- :arg object key: A simple object to mark the request.
- :info string key: More detailed timeout information.
- """
- request, callback, timeout_handle = self.waiting[key]
- self.queue.remove((key, request, callback))
- error_message = "Timeout {0}".format(info) if info else "Timeout"
- timeout_response = HTTPResponse(
- request,
- 599,
- error=HTTPTimeoutError(error_message),
- request_time=self.io_loop.time() - request.start_time,
- )
- self.io_loop.add_callback(callback, timeout_response)
- del self.waiting[key]
- class _HTTPConnection(httputil.HTTPMessageDelegate):
- _SUPPORTED_METHODS = set(
- ["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"]
- )
- def __init__(
- self,
- client: Optional[SimpleAsyncHTTPClient],
- request: HTTPRequest,
- release_callback: Callable[[], None],
- final_callback: Callable[[HTTPResponse], None],
- max_buffer_size: int,
- tcp_client: TCPClient,
- max_header_size: int,
- max_body_size: int,
- ) -> None:
- self.io_loop = IOLoop.current()
- self.start_time = self.io_loop.time()
- self.start_wall_time = time.time()
- self.client = client
- self.request = request
- self.release_callback = release_callback
- self.final_callback = final_callback
- self.max_buffer_size = max_buffer_size
- self.tcp_client = tcp_client
- self.max_header_size = max_header_size
- self.max_body_size = max_body_size
- self.code = None # type: Optional[int]
- self.headers = None # type: Optional[httputil.HTTPHeaders]
- self.chunks = [] # type: List[bytes]
- self._decompressor = None
- # Timeout handle returned by IOLoop.add_timeout
- self._timeout = None # type: object
- self._sockaddr = None
- IOLoop.current().add_future(
- gen.convert_yielded(self.run()), lambda f: f.result()
- )
- async def run(self) -> None:
- try:
- self.parsed = urllib.parse.urlsplit(_unicode(self.request.url))
- if self.parsed.scheme not in ("http", "https"):
- raise ValueError("Unsupported url scheme: %s" % self.request.url)
- # urlsplit results have hostname and port results, but they
- # didn't support ipv6 literals until python 2.7.
- netloc = self.parsed.netloc
- if "@" in netloc:
- userpass, _, netloc = netloc.rpartition("@")
- host, port = httputil.split_host_and_port(netloc)
- if port is None:
- port = 443 if self.parsed.scheme == "https" else 80
- if re.match(r"^\[.*\]$", host):
- # raw ipv6 addresses in urls are enclosed in brackets
- host = host[1:-1]
- self.parsed_hostname = host # save final host for _on_connect
- if self.request.allow_ipv6 is False:
- af = socket.AF_INET
- else:
- af = socket.AF_UNSPEC
- ssl_options = self._get_ssl_options(self.parsed.scheme)
- source_ip = None
- if self.request.network_interface:
- if is_valid_ip(self.request.network_interface):
- source_ip = self.request.network_interface
- else:
- raise ValueError(
- "Unrecognized IPv4 or IPv6 address for network_interface, got %r"
- % (self.request.network_interface,)
- )
- timeout = min(self.request.connect_timeout, self.request.request_timeout)
- if timeout:
- self._timeout = self.io_loop.add_timeout(
- self.start_time + timeout,
- functools.partial(self._on_timeout, "while connecting"),
- )
- stream = await self.tcp_client.connect(
- host,
- port,
- af=af,
- ssl_options=ssl_options,
- max_buffer_size=self.max_buffer_size,
- source_ip=source_ip,
- )
- if self.final_callback is None:
- # final_callback is cleared if we've hit our timeout.
- stream.close()
- return
- self.stream = stream
- self.stream.set_close_callback(self.on_connection_close)
- self._remove_timeout()
- if self.final_callback is None:
- return
- if self.request.request_timeout:
- self._timeout = self.io_loop.add_timeout(
- self.start_time + self.request.request_timeout,
- functools.partial(self._on_timeout, "during request"),
- )
- if (
- self.request.method not in self._SUPPORTED_METHODS
- and not self.request.allow_nonstandard_methods
- ):
- raise KeyError("unknown method %s" % self.request.method)
- for key in (
- "proxy_host",
- "proxy_port",
- "proxy_username",
- "proxy_password",
- "proxy_auth_mode",
- ):
- if getattr(self.request, key, None):
- raise NotImplementedError("%s not supported" % key)
- if "Connection" not in self.request.headers:
- self.request.headers["Connection"] = "close"
- if "Host" not in self.request.headers:
- if "@" in self.parsed.netloc:
- self.request.headers["Host"] = self.parsed.netloc.rpartition(
- "@"
- )[-1]
- else:
- self.request.headers["Host"] = self.parsed.netloc
- username, password = None, None
- if self.parsed.username is not None:
- username, password = self.parsed.username, self.parsed.password
- elif self.request.auth_username is not None:
- username = self.request.auth_username
- password = self.request.auth_password or ""
- if username is not None:
- assert password is not None
- if self.request.auth_mode not in (None, "basic"):
- raise ValueError(
- "unsupported auth_mode %s", self.request.auth_mode
- )
- self.request.headers["Authorization"] = "Basic " + _unicode(
- base64.b64encode(
- httputil.encode_username_password(username, password)
- )
- )
- if self.request.user_agent:
- self.request.headers["User-Agent"] = self.request.user_agent
- if not self.request.allow_nonstandard_methods:
- # Some HTTP methods nearly always have bodies while others
- # almost never do. Fail in this case unless the user has
- # opted out of sanity checks with allow_nonstandard_methods.
- body_expected = self.request.method in ("POST", "PATCH", "PUT")
- body_present = (
- self.request.body is not None
- or self.request.body_producer is not None
- )
- if (body_expected and not body_present) or (
- body_present and not body_expected
- ):
- raise ValueError(
- "Body must %sbe None for method %s (unless "
- "allow_nonstandard_methods is true)"
- % ("not " if body_expected else "", self.request.method)
- )
- if self.request.expect_100_continue:
- self.request.headers["Expect"] = "100-continue"
- if self.request.body is not None:
- # When body_producer is used the caller is responsible for
- # setting Content-Length (or else chunked encoding will be used).
- self.request.headers["Content-Length"] = str(len(self.request.body))
- if (
- self.request.method == "POST"
- and "Content-Type" not in self.request.headers
- ):
- self.request.headers[
- "Content-Type"
- ] = "application/x-www-form-urlencoded"
- if self.request.decompress_response:
- self.request.headers["Accept-Encoding"] = "gzip"
- req_path = (self.parsed.path or "/") + (
- ("?" + self.parsed.query) if self.parsed.query else ""
- )
- self.connection = self._create_connection(stream)
- start_line = httputil.RequestStartLine(
- self.request.method, req_path, ""
- )
- self.connection.write_headers(start_line, self.request.headers)
- if self.request.expect_100_continue:
- await self.connection.read_response(self)
- else:
- await self._write_body(True)
- except Exception:
- if not self._handle_exception(*sys.exc_info()):
- raise
- def _get_ssl_options(
- self, scheme: str
- ) -> Union[None, Dict[str, Any], ssl.SSLContext]:
- if scheme == "https":
- if self.request.ssl_options is not None:
- return self.request.ssl_options
- # If we are using the defaults, don't construct a
- # new SSLContext.
- if (
- self.request.validate_cert
- and self.request.ca_certs is None
- and self.request.client_cert is None
- and self.request.client_key is None
- ):
- return _client_ssl_defaults
- ssl_ctx = ssl.create_default_context(
- ssl.Purpose.SERVER_AUTH, cafile=self.request.ca_certs
- )
- if not self.request.validate_cert:
- ssl_ctx.check_hostname = False
- ssl_ctx.verify_mode = ssl.CERT_NONE
- if self.request.client_cert is not None:
- ssl_ctx.load_cert_chain(
- self.request.client_cert, self.request.client_key
- )
- if hasattr(ssl, "OP_NO_COMPRESSION"):
- # See netutil.ssl_options_to_context
- ssl_ctx.options |= ssl.OP_NO_COMPRESSION
- return ssl_ctx
- return None
- def _on_timeout(self, info: str = None) -> None:
- """Timeout callback of _HTTPConnection instance.
- Raise a `HTTPTimeoutError` when a timeout occurs.
- :info string key: More detailed timeout information.
- """
- self._timeout = None
- error_message = "Timeout {0}".format(info) if info else "Timeout"
- if self.final_callback is not None:
- self._handle_exception(
- HTTPTimeoutError, HTTPTimeoutError(error_message), None
- )
- def _remove_timeout(self) -> None:
- if self._timeout is not None:
- self.io_loop.remove_timeout(self._timeout)
- self._timeout = None
- def _create_connection(self, stream: IOStream) -> HTTP1Connection:
- stream.set_nodelay(True)
- connection = HTTP1Connection(
- stream,
- True,
- HTTP1ConnectionParameters(
- no_keep_alive=True,
- max_header_size=self.max_header_size,
- max_body_size=self.max_body_size,
- decompress=bool(self.request.decompress_response),
- ),
- self._sockaddr,
- )
- return connection
- async def _write_body(self, start_read: bool) -> None:
- if self.request.body is not None:
- self.connection.write(self.request.body)
- elif self.request.body_producer is not None:
- fut = self.request.body_producer(self.connection.write)
- if fut is not None:
- await fut
- self.connection.finish()
- if start_read:
- try:
- await self.connection.read_response(self)
- except StreamClosedError:
- if not self._handle_exception(*sys.exc_info()):
- raise
- def _release(self) -> None:
- if self.release_callback is not None:
- release_callback = self.release_callback
- self.release_callback = None # type: ignore
- release_callback()
- def _run_callback(self, response: HTTPResponse) -> None:
- self._release()
- if self.final_callback is not None:
- final_callback = self.final_callback
- self.final_callback = None # type: ignore
- self.io_loop.add_callback(final_callback, response)
- def _handle_exception(
- self,
- typ: "Optional[Type[BaseException]]",
- value: Optional[BaseException],
- tb: Optional[TracebackType],
- ) -> bool:
- if self.final_callback:
- self._remove_timeout()
- if isinstance(value, StreamClosedError):
- if value.real_error is None:
- value = HTTPStreamClosedError("Stream closed")
- else:
- value = value.real_error
- self._run_callback(
- HTTPResponse(
- self.request,
- 599,
- error=value,
- request_time=self.io_loop.time() - self.start_time,
- start_time=self.start_wall_time,
- )
- )
- if hasattr(self, "stream"):
- # TODO: this may cause a StreamClosedError to be raised
- # by the connection's Future. Should we cancel the
- # connection more gracefully?
- self.stream.close()
- return True
- else:
- # If our callback has already been called, we are probably
- # catching an exception that is not caused by us but rather
- # some child of our callback. Rather than drop it on the floor,
- # pass it along, unless it's just the stream being closed.
- return isinstance(value, StreamClosedError)
- def on_connection_close(self) -> None:
- if self.final_callback is not None:
- message = "Connection closed"
- if self.stream.error:
- raise self.stream.error
- try:
- raise HTTPStreamClosedError(message)
- except HTTPStreamClosedError:
- self._handle_exception(*sys.exc_info())
- async def headers_received(
- self,
- first_line: Union[httputil.ResponseStartLine, httputil.RequestStartLine],
- headers: httputil.HTTPHeaders,
- ) -> None:
- assert isinstance(first_line, httputil.ResponseStartLine)
- if self.request.expect_100_continue and first_line.code == 100:
- await self._write_body(False)
- return
- self.code = first_line.code
- self.reason = first_line.reason
- self.headers = headers
- if self._should_follow_redirect():
- return
- if self.request.header_callback is not None:
- # Reassemble the start line.
- self.request.header_callback("%s %s %s\r\n" % first_line)
- for k, v in self.headers.get_all():
- self.request.header_callback("%s: %s\r\n" % (k, v))
- self.request.header_callback("\r\n")
- def _should_follow_redirect(self) -> bool:
- if self.request.follow_redirects:
- assert self.request.max_redirects is not None
- return (
- self.code in (301, 302, 303, 307, 308)
- and self.request.max_redirects > 0
- and self.headers is not None
- and self.headers.get("Location") is not None
- )
- return False
- def finish(self) -> None:
- assert self.code is not None
- data = b"".join(self.chunks)
- self._remove_timeout()
- original_request = getattr(self.request, "original_request", self.request)
- if self._should_follow_redirect():
- assert isinstance(self.request, _RequestProxy)
- new_request = copy.copy(self.request.request)
- new_request.url = urllib.parse.urljoin(
- self.request.url, self.headers["Location"]
- )
- new_request.max_redirects = self.request.max_redirects - 1
- del new_request.headers["Host"]
- # https://tools.ietf.org/html/rfc7231#section-6.4
- #
- # The original HTTP spec said that after a 301 or 302
- # redirect, the request method should be preserved.
- # However, browsers implemented this by changing the
- # method to GET, and the behavior stuck. 303 redirects
- # always specified this POST-to-GET behavior (arguably 303
- # redirects should change *all* requests to GET, but
- # libcurl only does this for POST so we follow their
- # example).
- if self.code in (301, 302, 303) and self.request.method == "POST":
- new_request.method = "GET"
- new_request.body = None
- for h in [
- "Content-Length",
- "Content-Type",
- "Content-Encoding",
- "Transfer-Encoding",
- ]:
- try:
- del self.request.headers[h]
- except KeyError:
- pass
- new_request.original_request = original_request
- final_callback = self.final_callback
- self.final_callback = None
- self._release()
- fut = self.client.fetch(new_request, raise_error=False)
- fut.add_done_callback(lambda f: final_callback(f.result()))
- self._on_end_request()
- return
- if self.request.streaming_callback:
- buffer = BytesIO()
- else:
- buffer = BytesIO(data) # TODO: don't require one big string?
- response = HTTPResponse(
- original_request,
- self.code,
- reason=getattr(self, "reason", None),
- headers=self.headers,
- request_time=self.io_loop.time() - self.start_time,
- start_time=self.start_wall_time,
- buffer=buffer,
- effective_url=self.request.url,
- )
- self._run_callback(response)
- self._on_end_request()
- def _on_end_request(self) -> None:
- self.stream.close()
- def data_received(self, chunk: bytes) -> None:
- if self._should_follow_redirect():
- # We're going to follow a redirect so just discard the body.
- return
- if self.request.streaming_callback is not None:
- self.request.streaming_callback(chunk)
- else:
- self.chunks.append(chunk)
- if __name__ == "__main__":
- AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
- main()
|