websocket.py 60 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663
  1. """Implementation of the WebSocket protocol.
  2. `WebSockets <http://dev.w3.org/html5/websockets/>`_ allow for bidirectional
  3. communication between the browser and server.
  4. WebSockets are supported in the current versions of all major browsers,
  5. although older versions that do not support WebSockets are still in use
  6. (refer to http://caniuse.com/websockets for details).
  7. This module implements the final version of the WebSocket protocol as
  8. defined in `RFC 6455 <http://tools.ietf.org/html/rfc6455>`_. Certain
  9. browser versions (notably Safari 5.x) implemented an earlier draft of
  10. the protocol (known as "draft 76") and are not compatible with this module.
  11. .. versionchanged:: 4.0
  12. Removed support for the draft 76 protocol version.
  13. """
  14. import abc
  15. import asyncio
  16. import base64
  17. import hashlib
  18. import os
  19. import sys
  20. import struct
  21. import tornado.escape
  22. import tornado.web
  23. from urllib.parse import urlparse
  24. import zlib
  25. from tornado.concurrent import Future, future_set_result_unless_cancelled
  26. from tornado.escape import utf8, native_str, to_unicode
  27. from tornado import gen, httpclient, httputil
  28. from tornado.ioloop import IOLoop, PeriodicCallback
  29. from tornado.iostream import StreamClosedError, IOStream
  30. from tornado.log import gen_log, app_log
  31. from tornado import simple_httpclient
  32. from tornado.queues import Queue
  33. from tornado.tcpclient import TCPClient
  34. from tornado.util import _websocket_mask
  35. from typing import (
  36. TYPE_CHECKING,
  37. cast,
  38. Any,
  39. Optional,
  40. Dict,
  41. Union,
  42. List,
  43. Awaitable,
  44. Callable,
  45. Tuple,
  46. Type,
  47. )
  48. from types import TracebackType
  49. if TYPE_CHECKING:
  50. from typing_extensions import Protocol
  51. # The zlib compressor types aren't actually exposed anywhere
  52. # publicly, so declare protocols for the portions we use.
  53. class _Compressor(Protocol):
  54. def compress(self, data: bytes) -> bytes:
  55. pass
  56. def flush(self, mode: int) -> bytes:
  57. pass
  58. class _Decompressor(Protocol):
  59. unconsumed_tail = b"" # type: bytes
  60. def decompress(self, data: bytes, max_length: int) -> bytes:
  61. pass
  62. class _WebSocketDelegate(Protocol):
  63. # The common base interface implemented by WebSocketHandler on
  64. # the server side and WebSocketClientConnection on the client
  65. # side.
  66. def on_ws_connection_close(
  67. self, close_code: int = None, close_reason: str = None
  68. ) -> None:
  69. pass
  70. def on_message(self, message: Union[str, bytes]) -> Optional["Awaitable[None]"]:
  71. pass
  72. def on_ping(self, data: bytes) -> None:
  73. pass
  74. def on_pong(self, data: bytes) -> None:
  75. pass
  76. def log_exception(
  77. self,
  78. typ: Optional[Type[BaseException]],
  79. value: Optional[BaseException],
  80. tb: Optional[TracebackType],
  81. ) -> None:
  82. pass
  83. _default_max_message_size = 10 * 1024 * 1024
  84. class WebSocketError(Exception):
  85. pass
  86. class WebSocketClosedError(WebSocketError):
  87. """Raised by operations on a closed connection.
  88. .. versionadded:: 3.2
  89. """
  90. pass
  91. class _DecompressTooLargeError(Exception):
  92. pass
  93. class _WebSocketParams(object):
  94. def __init__(
  95. self,
  96. ping_interval: float = None,
  97. ping_timeout: float = None,
  98. max_message_size: int = _default_max_message_size,
  99. compression_options: Dict[str, Any] = None,
  100. ) -> None:
  101. self.ping_interval = ping_interval
  102. self.ping_timeout = ping_timeout
  103. self.max_message_size = max_message_size
  104. self.compression_options = compression_options
  105. class WebSocketHandler(tornado.web.RequestHandler):
  106. """Subclass this class to create a basic WebSocket handler.
  107. Override `on_message` to handle incoming messages, and use
  108. `write_message` to send messages to the client. You can also
  109. override `open` and `on_close` to handle opened and closed
  110. connections.
  111. Custom upgrade response headers can be sent by overriding
  112. `~tornado.web.RequestHandler.set_default_headers` or
  113. `~tornado.web.RequestHandler.prepare`.
  114. See http://dev.w3.org/html5/websockets/ for details on the
  115. JavaScript interface. The protocol is specified at
  116. http://tools.ietf.org/html/rfc6455.
  117. Here is an example WebSocket handler that echos back all received messages
  118. back to the client:
  119. .. testcode::
  120. class EchoWebSocket(tornado.websocket.WebSocketHandler):
  121. def open(self):
  122. print("WebSocket opened")
  123. def on_message(self, message):
  124. self.write_message(u"You said: " + message)
  125. def on_close(self):
  126. print("WebSocket closed")
  127. .. testoutput::
  128. :hide:
  129. WebSockets are not standard HTTP connections. The "handshake" is
  130. HTTP, but after the handshake, the protocol is
  131. message-based. Consequently, most of the Tornado HTTP facilities
  132. are not available in handlers of this type. The only communication
  133. methods available to you are `write_message()`, `ping()`, and
  134. `close()`. Likewise, your request handler class should implement
  135. `open()` method rather than ``get()`` or ``post()``.
  136. If you map the handler above to ``/websocket`` in your application, you can
  137. invoke it in JavaScript with::
  138. var ws = new WebSocket("ws://localhost:8888/websocket");
  139. ws.onopen = function() {
  140. ws.send("Hello, world");
  141. };
  142. ws.onmessage = function (evt) {
  143. alert(evt.data);
  144. };
  145. This script pops up an alert box that says "You said: Hello, world".
  146. Web browsers allow any site to open a websocket connection to any other,
  147. instead of using the same-origin policy that governs other network
  148. access from javascript. This can be surprising and is a potential
  149. security hole, so since Tornado 4.0 `WebSocketHandler` requires
  150. applications that wish to receive cross-origin websockets to opt in
  151. by overriding the `~WebSocketHandler.check_origin` method (see that
  152. method's docs for details). Failure to do so is the most likely
  153. cause of 403 errors when making a websocket connection.
  154. When using a secure websocket connection (``wss://``) with a self-signed
  155. certificate, the connection from a browser may fail because it wants
  156. to show the "accept this certificate" dialog but has nowhere to show it.
  157. You must first visit a regular HTML page using the same certificate
  158. to accept it before the websocket connection will succeed.
  159. If the application setting ``websocket_ping_interval`` has a non-zero
  160. value, a ping will be sent periodically, and the connection will be
  161. closed if a response is not received before the ``websocket_ping_timeout``.
  162. Messages larger than the ``websocket_max_message_size`` application setting
  163. (default 10MiB) will not be accepted.
  164. .. versionchanged:: 4.5
  165. Added ``websocket_ping_interval``, ``websocket_ping_timeout``, and
  166. ``websocket_max_message_size``.
  167. """
  168. def __init__(
  169. self,
  170. application: tornado.web.Application,
  171. request: httputil.HTTPServerRequest,
  172. **kwargs: Any
  173. ) -> None:
  174. super(WebSocketHandler, self).__init__(application, request, **kwargs)
  175. self.ws_connection = None # type: Optional[WebSocketProtocol]
  176. self.close_code = None # type: Optional[int]
  177. self.close_reason = None # type: Optional[str]
  178. self.stream = None # type: Optional[IOStream]
  179. self._on_close_called = False
  180. async def get(self, *args: Any, **kwargs: Any) -> None:
  181. self.open_args = args
  182. self.open_kwargs = kwargs
  183. # Upgrade header should be present and should be equal to WebSocket
  184. if self.request.headers.get("Upgrade", "").lower() != "websocket":
  185. self.set_status(400)
  186. log_msg = 'Can "Upgrade" only to "WebSocket".'
  187. self.finish(log_msg)
  188. gen_log.debug(log_msg)
  189. return
  190. # Connection header should be upgrade.
  191. # Some proxy servers/load balancers
  192. # might mess with it.
  193. headers = self.request.headers
  194. connection = map(
  195. lambda s: s.strip().lower(), headers.get("Connection", "").split(",")
  196. )
  197. if "upgrade" not in connection:
  198. self.set_status(400)
  199. log_msg = '"Connection" must be "Upgrade".'
  200. self.finish(log_msg)
  201. gen_log.debug(log_msg)
  202. return
  203. # Handle WebSocket Origin naming convention differences
  204. # The difference between version 8 and 13 is that in 8 the
  205. # client sends a "Sec-Websocket-Origin" header and in 13 it's
  206. # simply "Origin".
  207. if "Origin" in self.request.headers:
  208. origin = self.request.headers.get("Origin")
  209. else:
  210. origin = self.request.headers.get("Sec-Websocket-Origin", None)
  211. # If there was an origin header, check to make sure it matches
  212. # according to check_origin. When the origin is None, we assume it
  213. # did not come from a browser and that it can be passed on.
  214. if origin is not None and not self.check_origin(origin):
  215. self.set_status(403)
  216. log_msg = "Cross origin websockets not allowed"
  217. self.finish(log_msg)
  218. gen_log.debug(log_msg)
  219. return
  220. self.ws_connection = self.get_websocket_protocol()
  221. if self.ws_connection:
  222. await self.ws_connection.accept_connection(self)
  223. else:
  224. self.set_status(426, "Upgrade Required")
  225. self.set_header("Sec-WebSocket-Version", "7, 8, 13")
  226. stream = None
  227. @property
  228. def ping_interval(self) -> Optional[float]:
  229. """The interval for websocket keep-alive pings.
  230. Set websocket_ping_interval = 0 to disable pings.
  231. """
  232. return self.settings.get("websocket_ping_interval", None)
  233. @property
  234. def ping_timeout(self) -> Optional[float]:
  235. """If no ping is received in this many seconds,
  236. close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
  237. Default is max of 3 pings or 30 seconds.
  238. """
  239. return self.settings.get("websocket_ping_timeout", None)
  240. @property
  241. def max_message_size(self) -> int:
  242. """Maximum allowed message size.
  243. If the remote peer sends a message larger than this, the connection
  244. will be closed.
  245. Default is 10MiB.
  246. """
  247. return self.settings.get(
  248. "websocket_max_message_size", _default_max_message_size
  249. )
  250. def write_message(
  251. self, message: Union[bytes, str, Dict[str, Any]], binary: bool = False
  252. ) -> "Future[None]":
  253. """Sends the given message to the client of this Web Socket.
  254. The message may be either a string or a dict (which will be
  255. encoded as json). If the ``binary`` argument is false, the
  256. message will be sent as utf8; in binary mode any byte string
  257. is allowed.
  258. If the connection is already closed, raises `WebSocketClosedError`.
  259. Returns a `.Future` which can be used for flow control.
  260. .. versionchanged:: 3.2
  261. `WebSocketClosedError` was added (previously a closed connection
  262. would raise an `AttributeError`)
  263. .. versionchanged:: 4.3
  264. Returns a `.Future` which can be used for flow control.
  265. .. versionchanged:: 5.0
  266. Consistently raises `WebSocketClosedError`. Previously could
  267. sometimes raise `.StreamClosedError`.
  268. """
  269. if self.ws_connection is None or self.ws_connection.is_closing():
  270. raise WebSocketClosedError()
  271. if isinstance(message, dict):
  272. message = tornado.escape.json_encode(message)
  273. return self.ws_connection.write_message(message, binary=binary)
  274. def select_subprotocol(self, subprotocols: List[str]) -> Optional[str]:
  275. """Override to implement subprotocol negotiation.
  276. ``subprotocols`` is a list of strings identifying the
  277. subprotocols proposed by the client. This method may be
  278. overridden to return one of those strings to select it, or
  279. ``None`` to not select a subprotocol.
  280. Failure to select a subprotocol does not automatically abort
  281. the connection, although clients may close the connection if
  282. none of their proposed subprotocols was selected.
  283. The list may be empty, in which case this method must return
  284. None. This method is always called exactly once even if no
  285. subprotocols were proposed so that the handler can be advised
  286. of this fact.
  287. .. versionchanged:: 5.1
  288. Previously, this method was called with a list containing
  289. an empty string instead of an empty list if no subprotocols
  290. were proposed by the client.
  291. """
  292. return None
  293. @property
  294. def selected_subprotocol(self) -> Optional[str]:
  295. """The subprotocol returned by `select_subprotocol`.
  296. .. versionadded:: 5.1
  297. """
  298. assert self.ws_connection is not None
  299. return self.ws_connection.selected_subprotocol
  300. def get_compression_options(self) -> Optional[Dict[str, Any]]:
  301. """Override to return compression options for the connection.
  302. If this method returns None (the default), compression will
  303. be disabled. If it returns a dict (even an empty one), it
  304. will be enabled. The contents of the dict may be used to
  305. control the following compression options:
  306. ``compression_level`` specifies the compression level.
  307. ``mem_level`` specifies the amount of memory used for the internal compression state.
  308. These parameters are documented in details here:
  309. https://docs.python.org/3.6/library/zlib.html#zlib.compressobj
  310. .. versionadded:: 4.1
  311. .. versionchanged:: 4.5
  312. Added ``compression_level`` and ``mem_level``.
  313. """
  314. # TODO: Add wbits option.
  315. return None
  316. def open(self, *args: str, **kwargs: str) -> Optional[Awaitable[None]]:
  317. """Invoked when a new WebSocket is opened.
  318. The arguments to `open` are extracted from the `tornado.web.URLSpec`
  319. regular expression, just like the arguments to
  320. `tornado.web.RequestHandler.get`.
  321. `open` may be a coroutine. `on_message` will not be called until
  322. `open` has returned.
  323. .. versionchanged:: 5.1
  324. ``open`` may be a coroutine.
  325. """
  326. pass
  327. def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]:
  328. """Handle incoming messages on the WebSocket
  329. This method must be overridden.
  330. .. versionchanged:: 4.5
  331. ``on_message`` can be a coroutine.
  332. """
  333. raise NotImplementedError
  334. def ping(self, data: Union[str, bytes] = b"") -> None:
  335. """Send ping frame to the remote end.
  336. The data argument allows a small amount of data (up to 125
  337. bytes) to be sent as a part of the ping message. Note that not
  338. all websocket implementations expose this data to
  339. applications.
  340. Consider using the ``websocket_ping_interval`` application
  341. setting instead of sending pings manually.
  342. .. versionchanged:: 5.1
  343. The data argument is now optional.
  344. """
  345. data = utf8(data)
  346. if self.ws_connection is None or self.ws_connection.is_closing():
  347. raise WebSocketClosedError()
  348. self.ws_connection.write_ping(data)
  349. def on_pong(self, data: bytes) -> None:
  350. """Invoked when the response to a ping frame is received."""
  351. pass
  352. def on_ping(self, data: bytes) -> None:
  353. """Invoked when the a ping frame is received."""
  354. pass
  355. def on_close(self) -> None:
  356. """Invoked when the WebSocket is closed.
  357. If the connection was closed cleanly and a status code or reason
  358. phrase was supplied, these values will be available as the attributes
  359. ``self.close_code`` and ``self.close_reason``.
  360. .. versionchanged:: 4.0
  361. Added ``close_code`` and ``close_reason`` attributes.
  362. """
  363. pass
  364. def close(self, code: int = None, reason: str = None) -> None:
  365. """Closes this Web Socket.
  366. Once the close handshake is successful the socket will be closed.
  367. ``code`` may be a numeric status code, taken from the values
  368. defined in `RFC 6455 section 7.4.1
  369. <https://tools.ietf.org/html/rfc6455#section-7.4.1>`_.
  370. ``reason`` may be a textual message about why the connection is
  371. closing. These values are made available to the client, but are
  372. not otherwise interpreted by the websocket protocol.
  373. .. versionchanged:: 4.0
  374. Added the ``code`` and ``reason`` arguments.
  375. """
  376. if self.ws_connection:
  377. self.ws_connection.close(code, reason)
  378. self.ws_connection = None
  379. def check_origin(self, origin: str) -> bool:
  380. """Override to enable support for allowing alternate origins.
  381. The ``origin`` argument is the value of the ``Origin`` HTTP
  382. header, the url responsible for initiating this request. This
  383. method is not called for clients that do not send this header;
  384. such requests are always allowed (because all browsers that
  385. implement WebSockets support this header, and non-browser
  386. clients do not have the same cross-site security concerns).
  387. Should return ``True`` to accept the request or ``False`` to
  388. reject it. By default, rejects all requests with an origin on
  389. a host other than this one.
  390. This is a security protection against cross site scripting attacks on
  391. browsers, since WebSockets are allowed to bypass the usual same-origin
  392. policies and don't use CORS headers.
  393. .. warning::
  394. This is an important security measure; don't disable it
  395. without understanding the security implications. In
  396. particular, if your authentication is cookie-based, you
  397. must either restrict the origins allowed by
  398. ``check_origin()`` or implement your own XSRF-like
  399. protection for websocket connections. See `these
  400. <https://www.christian-schneider.net/CrossSiteWebSocketHijacking.html>`_
  401. `articles
  402. <https://devcenter.heroku.com/articles/websocket-security>`_
  403. for more.
  404. To accept all cross-origin traffic (which was the default prior to
  405. Tornado 4.0), simply override this method to always return ``True``::
  406. def check_origin(self, origin):
  407. return True
  408. To allow connections from any subdomain of your site, you might
  409. do something like::
  410. def check_origin(self, origin):
  411. parsed_origin = urllib.parse.urlparse(origin)
  412. return parsed_origin.netloc.endswith(".mydomain.com")
  413. .. versionadded:: 4.0
  414. """
  415. parsed_origin = urlparse(origin)
  416. origin = parsed_origin.netloc
  417. origin = origin.lower()
  418. host = self.request.headers.get("Host")
  419. # Check to see that origin matches host directly, including ports
  420. return origin == host
  421. def set_nodelay(self, value: bool) -> None:
  422. """Set the no-delay flag for this stream.
  423. By default, small messages may be delayed and/or combined to minimize
  424. the number of packets sent. This can sometimes cause 200-500ms delays
  425. due to the interaction between Nagle's algorithm and TCP delayed
  426. ACKs. To reduce this delay (at the expense of possibly increasing
  427. bandwidth usage), call ``self.set_nodelay(True)`` once the websocket
  428. connection is established.
  429. See `.BaseIOStream.set_nodelay` for additional details.
  430. .. versionadded:: 3.1
  431. """
  432. assert self.ws_connection is not None
  433. self.ws_connection.set_nodelay(value)
  434. def on_connection_close(self) -> None:
  435. if self.ws_connection:
  436. self.ws_connection.on_connection_close()
  437. self.ws_connection = None
  438. if not self._on_close_called:
  439. self._on_close_called = True
  440. self.on_close()
  441. self._break_cycles()
  442. def on_ws_connection_close(
  443. self, close_code: int = None, close_reason: str = None
  444. ) -> None:
  445. self.close_code = close_code
  446. self.close_reason = close_reason
  447. self.on_connection_close()
  448. def _break_cycles(self) -> None:
  449. # WebSocketHandlers call finish() early, but we don't want to
  450. # break up reference cycles (which makes it impossible to call
  451. # self.render_string) until after we've really closed the
  452. # connection (if it was established in the first place,
  453. # indicated by status code 101).
  454. if self.get_status() != 101 or self._on_close_called:
  455. super(WebSocketHandler, self)._break_cycles()
  456. def send_error(self, *args: Any, **kwargs: Any) -> None:
  457. if self.stream is None:
  458. super(WebSocketHandler, self).send_error(*args, **kwargs)
  459. else:
  460. # If we get an uncaught exception during the handshake,
  461. # we have no choice but to abruptly close the connection.
  462. # TODO: for uncaught exceptions after the handshake,
  463. # we can close the connection more gracefully.
  464. self.stream.close()
  465. def get_websocket_protocol(self) -> Optional["WebSocketProtocol"]:
  466. websocket_version = self.request.headers.get("Sec-WebSocket-Version")
  467. if websocket_version in ("7", "8", "13"):
  468. params = _WebSocketParams(
  469. ping_interval=self.ping_interval,
  470. ping_timeout=self.ping_timeout,
  471. max_message_size=self.max_message_size,
  472. compression_options=self.get_compression_options(),
  473. )
  474. return WebSocketProtocol13(self, False, params)
  475. return None
  476. def _detach_stream(self) -> IOStream:
  477. # disable non-WS methods
  478. for method in [
  479. "write",
  480. "redirect",
  481. "set_header",
  482. "set_cookie",
  483. "set_status",
  484. "flush",
  485. "finish",
  486. ]:
  487. setattr(self, method, _raise_not_supported_for_websockets)
  488. return self.detach()
  489. def _raise_not_supported_for_websockets(*args: Any, **kwargs: Any) -> None:
  490. raise RuntimeError("Method not supported for Web Sockets")
  491. class WebSocketProtocol(abc.ABC):
  492. """Base class for WebSocket protocol versions.
  493. """
  494. def __init__(self, handler: "_WebSocketDelegate") -> None:
  495. self.handler = handler
  496. self.stream = None # type: Optional[IOStream]
  497. self.client_terminated = False
  498. self.server_terminated = False
  499. def _run_callback(
  500. self, callback: Callable, *args: Any, **kwargs: Any
  501. ) -> "Optional[Future[Any]]":
  502. """Runs the given callback with exception handling.
  503. If the callback is a coroutine, returns its Future. On error, aborts the
  504. websocket connection and returns None.
  505. """
  506. try:
  507. result = callback(*args, **kwargs)
  508. except Exception:
  509. self.handler.log_exception(*sys.exc_info())
  510. self._abort()
  511. return None
  512. else:
  513. if result is not None:
  514. result = gen.convert_yielded(result)
  515. assert self.stream is not None
  516. self.stream.io_loop.add_future(result, lambda f: f.result())
  517. return result
  518. def on_connection_close(self) -> None:
  519. self._abort()
  520. def _abort(self) -> None:
  521. """Instantly aborts the WebSocket connection by closing the socket"""
  522. self.client_terminated = True
  523. self.server_terminated = True
  524. if self.stream is not None:
  525. self.stream.close() # forcibly tear down the connection
  526. self.close() # let the subclass cleanup
  527. @abc.abstractmethod
  528. def close(self, code: int = None, reason: str = None) -> None:
  529. raise NotImplementedError()
  530. @abc.abstractmethod
  531. def is_closing(self) -> bool:
  532. raise NotImplementedError()
  533. @abc.abstractmethod
  534. async def accept_connection(self, handler: WebSocketHandler) -> None:
  535. raise NotImplementedError()
  536. @abc.abstractmethod
  537. def write_message(
  538. self, message: Union[str, bytes], binary: bool = False
  539. ) -> "Future[None]":
  540. raise NotImplementedError()
  541. @property
  542. @abc.abstractmethod
  543. def selected_subprotocol(self) -> Optional[str]:
  544. raise NotImplementedError()
  545. @abc.abstractmethod
  546. def write_ping(self, data: bytes) -> None:
  547. raise NotImplementedError()
  548. # The entry points below are used by WebSocketClientConnection,
  549. # which was introduced after we only supported a single version of
  550. # WebSocketProtocol. The WebSocketProtocol/WebSocketProtocol13
  551. # boundary is currently pretty ad-hoc.
  552. @abc.abstractmethod
  553. def _process_server_headers(
  554. self, key: Union[str, bytes], headers: httputil.HTTPHeaders
  555. ) -> None:
  556. raise NotImplementedError()
  557. @abc.abstractmethod
  558. def start_pinging(self) -> None:
  559. raise NotImplementedError()
  560. @abc.abstractmethod
  561. async def _receive_frame_loop(self) -> None:
  562. raise NotImplementedError()
  563. @abc.abstractmethod
  564. def set_nodelay(self, x: bool) -> None:
  565. raise NotImplementedError()
  566. class _PerMessageDeflateCompressor(object):
  567. def __init__(
  568. self,
  569. persistent: bool,
  570. max_wbits: Optional[int],
  571. compression_options: Dict[str, Any] = None,
  572. ) -> None:
  573. if max_wbits is None:
  574. max_wbits = zlib.MAX_WBITS
  575. # There is no symbolic constant for the minimum wbits value.
  576. if not (8 <= max_wbits <= zlib.MAX_WBITS):
  577. raise ValueError(
  578. "Invalid max_wbits value %r; allowed range 8-%d",
  579. max_wbits,
  580. zlib.MAX_WBITS,
  581. )
  582. self._max_wbits = max_wbits
  583. if (
  584. compression_options is None
  585. or "compression_level" not in compression_options
  586. ):
  587. self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL
  588. else:
  589. self._compression_level = compression_options["compression_level"]
  590. if compression_options is None or "mem_level" not in compression_options:
  591. self._mem_level = 8
  592. else:
  593. self._mem_level = compression_options["mem_level"]
  594. if persistent:
  595. self._compressor = self._create_compressor() # type: Optional[_Compressor]
  596. else:
  597. self._compressor = None
  598. def _create_compressor(self) -> "_Compressor":
  599. return zlib.compressobj(
  600. self._compression_level, zlib.DEFLATED, -self._max_wbits, self._mem_level
  601. )
  602. def compress(self, data: bytes) -> bytes:
  603. compressor = self._compressor or self._create_compressor()
  604. data = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH)
  605. assert data.endswith(b"\x00\x00\xff\xff")
  606. return data[:-4]
  607. class _PerMessageDeflateDecompressor(object):
  608. def __init__(
  609. self,
  610. persistent: bool,
  611. max_wbits: Optional[int],
  612. max_message_size: int,
  613. compression_options: Dict[str, Any] = None,
  614. ) -> None:
  615. self._max_message_size = max_message_size
  616. if max_wbits is None:
  617. max_wbits = zlib.MAX_WBITS
  618. if not (8 <= max_wbits <= zlib.MAX_WBITS):
  619. raise ValueError(
  620. "Invalid max_wbits value %r; allowed range 8-%d",
  621. max_wbits,
  622. zlib.MAX_WBITS,
  623. )
  624. self._max_wbits = max_wbits
  625. if persistent:
  626. self._decompressor = (
  627. self._create_decompressor()
  628. ) # type: Optional[_Decompressor]
  629. else:
  630. self._decompressor = None
  631. def _create_decompressor(self) -> "_Decompressor":
  632. return zlib.decompressobj(-self._max_wbits)
  633. def decompress(self, data: bytes) -> bytes:
  634. decompressor = self._decompressor or self._create_decompressor()
  635. result = decompressor.decompress(
  636. data + b"\x00\x00\xff\xff", self._max_message_size
  637. )
  638. if decompressor.unconsumed_tail:
  639. raise _DecompressTooLargeError()
  640. return result
  641. class WebSocketProtocol13(WebSocketProtocol):
  642. """Implementation of the WebSocket protocol from RFC 6455.
  643. This class supports versions 7 and 8 of the protocol in addition to the
  644. final version 13.
  645. """
  646. # Bit masks for the first byte of a frame.
  647. FIN = 0x80
  648. RSV1 = 0x40
  649. RSV2 = 0x20
  650. RSV3 = 0x10
  651. RSV_MASK = RSV1 | RSV2 | RSV3
  652. OPCODE_MASK = 0x0F
  653. stream = None # type: IOStream
  654. def __init__(
  655. self,
  656. handler: "_WebSocketDelegate",
  657. mask_outgoing: bool,
  658. params: _WebSocketParams,
  659. ) -> None:
  660. WebSocketProtocol.__init__(self, handler)
  661. self.mask_outgoing = mask_outgoing
  662. self.params = params
  663. self._final_frame = False
  664. self._frame_opcode = None
  665. self._masked_frame = None
  666. self._frame_mask = None # type: Optional[bytes]
  667. self._frame_length = None
  668. self._fragmented_message_buffer = None # type: Optional[bytes]
  669. self._fragmented_message_opcode = None
  670. self._waiting = None # type: object
  671. self._compression_options = params.compression_options
  672. self._decompressor = None # type: Optional[_PerMessageDeflateDecompressor]
  673. self._compressor = None # type: Optional[_PerMessageDeflateCompressor]
  674. self._frame_compressed = None # type: Optional[bool]
  675. # The total uncompressed size of all messages received or sent.
  676. # Unicode messages are encoded to utf8.
  677. # Only for testing; subject to change.
  678. self._message_bytes_in = 0
  679. self._message_bytes_out = 0
  680. # The total size of all packets received or sent. Includes
  681. # the effect of compression, frame overhead, and control frames.
  682. self._wire_bytes_in = 0
  683. self._wire_bytes_out = 0
  684. self.ping_callback = None # type: Optional[PeriodicCallback]
  685. self.last_ping = 0.0
  686. self.last_pong = 0.0
  687. self.close_code = None # type: Optional[int]
  688. self.close_reason = None # type: Optional[str]
  689. # Use a property for this to satisfy the abc.
  690. @property
  691. def selected_subprotocol(self) -> Optional[str]:
  692. return self._selected_subprotocol
  693. @selected_subprotocol.setter
  694. def selected_subprotocol(self, value: Optional[str]) -> None:
  695. self._selected_subprotocol = value
  696. async def accept_connection(self, handler: WebSocketHandler) -> None:
  697. try:
  698. self._handle_websocket_headers(handler)
  699. except ValueError:
  700. handler.set_status(400)
  701. log_msg = "Missing/Invalid WebSocket headers"
  702. handler.finish(log_msg)
  703. gen_log.debug(log_msg)
  704. return
  705. try:
  706. await self._accept_connection(handler)
  707. except asyncio.CancelledError:
  708. self._abort()
  709. return
  710. except ValueError:
  711. gen_log.debug("Malformed WebSocket request received", exc_info=True)
  712. self._abort()
  713. return
  714. def _handle_websocket_headers(self, handler: WebSocketHandler) -> None:
  715. """Verifies all invariant- and required headers
  716. If a header is missing or have an incorrect value ValueError will be
  717. raised
  718. """
  719. fields = ("Host", "Sec-Websocket-Key", "Sec-Websocket-Version")
  720. if not all(map(lambda f: handler.request.headers.get(f), fields)):
  721. raise ValueError("Missing/Invalid WebSocket headers")
  722. @staticmethod
  723. def compute_accept_value(key: Union[str, bytes]) -> str:
  724. """Computes the value for the Sec-WebSocket-Accept header,
  725. given the value for Sec-WebSocket-Key.
  726. """
  727. sha1 = hashlib.sha1()
  728. sha1.update(utf8(key))
  729. sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value
  730. return native_str(base64.b64encode(sha1.digest()))
  731. def _challenge_response(self, handler: WebSocketHandler) -> str:
  732. return WebSocketProtocol13.compute_accept_value(
  733. cast(str, handler.request.headers.get("Sec-Websocket-Key"))
  734. )
  735. async def _accept_connection(self, handler: WebSocketHandler) -> None:
  736. subprotocol_header = handler.request.headers.get("Sec-WebSocket-Protocol")
  737. if subprotocol_header:
  738. subprotocols = [s.strip() for s in subprotocol_header.split(",")]
  739. else:
  740. subprotocols = []
  741. self.selected_subprotocol = handler.select_subprotocol(subprotocols)
  742. if self.selected_subprotocol:
  743. assert self.selected_subprotocol in subprotocols
  744. handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol)
  745. extensions = self._parse_extensions_header(handler.request.headers)
  746. for ext in extensions:
  747. if ext[0] == "permessage-deflate" and self._compression_options is not None:
  748. # TODO: negotiate parameters if compression_options
  749. # specifies limits.
  750. self._create_compressors("server", ext[1], self._compression_options)
  751. if (
  752. "client_max_window_bits" in ext[1]
  753. and ext[1]["client_max_window_bits"] is None
  754. ):
  755. # Don't echo an offered client_max_window_bits
  756. # parameter with no value.
  757. del ext[1]["client_max_window_bits"]
  758. handler.set_header(
  759. "Sec-WebSocket-Extensions",
  760. httputil._encode_header("permessage-deflate", ext[1]),
  761. )
  762. break
  763. handler.clear_header("Content-Type")
  764. handler.set_status(101)
  765. handler.set_header("Upgrade", "websocket")
  766. handler.set_header("Connection", "Upgrade")
  767. handler.set_header("Sec-WebSocket-Accept", self._challenge_response(handler))
  768. handler.finish()
  769. self.stream = handler._detach_stream()
  770. self.start_pinging()
  771. try:
  772. open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  773. if open_result is not None:
  774. await open_result
  775. except Exception:
  776. handler.log_exception(*sys.exc_info())
  777. self._abort()
  778. return
  779. await self._receive_frame_loop()
  780. def _parse_extensions_header(
  781. self, headers: httputil.HTTPHeaders
  782. ) -> List[Tuple[str, Dict[str, str]]]:
  783. extensions = headers.get("Sec-WebSocket-Extensions", "")
  784. if extensions:
  785. return [httputil._parse_header(e.strip()) for e in extensions.split(",")]
  786. return []
  787. def _process_server_headers(
  788. self, key: Union[str, bytes], headers: httputil.HTTPHeaders
  789. ) -> None:
  790. """Process the headers sent by the server to this client connection.
  791. 'key' is the websocket handshake challenge/response key.
  792. """
  793. assert headers["Upgrade"].lower() == "websocket"
  794. assert headers["Connection"].lower() == "upgrade"
  795. accept = self.compute_accept_value(key)
  796. assert headers["Sec-Websocket-Accept"] == accept
  797. extensions = self._parse_extensions_header(headers)
  798. for ext in extensions:
  799. if ext[0] == "permessage-deflate" and self._compression_options is not None:
  800. self._create_compressors("client", ext[1])
  801. else:
  802. raise ValueError("unsupported extension %r", ext)
  803. self.selected_subprotocol = headers.get("Sec-WebSocket-Protocol", None)
  804. def _get_compressor_options(
  805. self,
  806. side: str,
  807. agreed_parameters: Dict[str, Any],
  808. compression_options: Dict[str, Any] = None,
  809. ) -> Dict[str, Any]:
  810. """Converts a websocket agreed_parameters set to keyword arguments
  811. for our compressor objects.
  812. """
  813. options = dict(
  814. persistent=(side + "_no_context_takeover") not in agreed_parameters
  815. ) # type: Dict[str, Any]
  816. wbits_header = agreed_parameters.get(side + "_max_window_bits", None)
  817. if wbits_header is None:
  818. options["max_wbits"] = zlib.MAX_WBITS
  819. else:
  820. options["max_wbits"] = int(wbits_header)
  821. options["compression_options"] = compression_options
  822. return options
  823. def _create_compressors(
  824. self,
  825. side: str,
  826. agreed_parameters: Dict[str, Any],
  827. compression_options: Dict[str, Any] = None,
  828. ) -> None:
  829. # TODO: handle invalid parameters gracefully
  830. allowed_keys = set(
  831. [
  832. "server_no_context_takeover",
  833. "client_no_context_takeover",
  834. "server_max_window_bits",
  835. "client_max_window_bits",
  836. ]
  837. )
  838. for key in agreed_parameters:
  839. if key not in allowed_keys:
  840. raise ValueError("unsupported compression parameter %r" % key)
  841. other_side = "client" if (side == "server") else "server"
  842. self._compressor = _PerMessageDeflateCompressor(
  843. **self._get_compressor_options(side, agreed_parameters, compression_options)
  844. )
  845. self._decompressor = _PerMessageDeflateDecompressor(
  846. max_message_size=self.params.max_message_size,
  847. **self._get_compressor_options(
  848. other_side, agreed_parameters, compression_options
  849. )
  850. )
  851. def _write_frame(
  852. self, fin: bool, opcode: int, data: bytes, flags: int = 0
  853. ) -> "Future[None]":
  854. data_len = len(data)
  855. if opcode & 0x8:
  856. # All control frames MUST have a payload length of 125
  857. # bytes or less and MUST NOT be fragmented.
  858. if not fin:
  859. raise ValueError("control frames may not be fragmented")
  860. if data_len > 125:
  861. raise ValueError("control frame payloads may not exceed 125 bytes")
  862. if fin:
  863. finbit = self.FIN
  864. else:
  865. finbit = 0
  866. frame = struct.pack("B", finbit | opcode | flags)
  867. if self.mask_outgoing:
  868. mask_bit = 0x80
  869. else:
  870. mask_bit = 0
  871. if data_len < 126:
  872. frame += struct.pack("B", data_len | mask_bit)
  873. elif data_len <= 0xFFFF:
  874. frame += struct.pack("!BH", 126 | mask_bit, data_len)
  875. else:
  876. frame += struct.pack("!BQ", 127 | mask_bit, data_len)
  877. if self.mask_outgoing:
  878. mask = os.urandom(4)
  879. data = mask + _websocket_mask(mask, data)
  880. frame += data
  881. self._wire_bytes_out += len(frame)
  882. return self.stream.write(frame)
  883. def write_message(
  884. self, message: Union[str, bytes], binary: bool = False
  885. ) -> "Future[None]":
  886. """Sends the given message to the client of this Web Socket."""
  887. if binary:
  888. opcode = 0x2
  889. else:
  890. opcode = 0x1
  891. message = tornado.escape.utf8(message)
  892. assert isinstance(message, bytes)
  893. self._message_bytes_out += len(message)
  894. flags = 0
  895. if self._compressor:
  896. message = self._compressor.compress(message)
  897. flags |= self.RSV1
  898. # For historical reasons, write methods in Tornado operate in a semi-synchronous
  899. # mode in which awaiting the Future they return is optional (But errors can
  900. # still be raised). This requires us to go through an awkward dance here
  901. # to transform the errors that may be returned while presenting the same
  902. # semi-synchronous interface.
  903. try:
  904. fut = self._write_frame(True, opcode, message, flags=flags)
  905. except StreamClosedError:
  906. raise WebSocketClosedError()
  907. async def wrapper() -> None:
  908. try:
  909. await fut
  910. except StreamClosedError:
  911. raise WebSocketClosedError()
  912. return asyncio.ensure_future(wrapper())
  913. def write_ping(self, data: bytes) -> None:
  914. """Send ping frame."""
  915. assert isinstance(data, bytes)
  916. self._write_frame(True, 0x9, data)
  917. async def _receive_frame_loop(self) -> None:
  918. try:
  919. while not self.client_terminated:
  920. await self._receive_frame()
  921. except StreamClosedError:
  922. self._abort()
  923. self.handler.on_ws_connection_close(self.close_code, self.close_reason)
  924. async def _read_bytes(self, n: int) -> bytes:
  925. data = await self.stream.read_bytes(n)
  926. self._wire_bytes_in += n
  927. return data
  928. async def _receive_frame(self) -> None:
  929. # Read the frame header.
  930. data = await self._read_bytes(2)
  931. header, mask_payloadlen = struct.unpack("BB", data)
  932. is_final_frame = header & self.FIN
  933. reserved_bits = header & self.RSV_MASK
  934. opcode = header & self.OPCODE_MASK
  935. opcode_is_control = opcode & 0x8
  936. if self._decompressor is not None and opcode != 0:
  937. # Compression flag is present in the first frame's header,
  938. # but we can't decompress until we have all the frames of
  939. # the message.
  940. self._frame_compressed = bool(reserved_bits & self.RSV1)
  941. reserved_bits &= ~self.RSV1
  942. if reserved_bits:
  943. # client is using as-yet-undefined extensions; abort
  944. self._abort()
  945. return
  946. is_masked = bool(mask_payloadlen & 0x80)
  947. payloadlen = mask_payloadlen & 0x7F
  948. # Parse and validate the length.
  949. if opcode_is_control and payloadlen >= 126:
  950. # control frames must have payload < 126
  951. self._abort()
  952. return
  953. if payloadlen < 126:
  954. self._frame_length = payloadlen
  955. elif payloadlen == 126:
  956. data = await self._read_bytes(2)
  957. payloadlen = struct.unpack("!H", data)[0]
  958. elif payloadlen == 127:
  959. data = await self._read_bytes(8)
  960. payloadlen = struct.unpack("!Q", data)[0]
  961. new_len = payloadlen
  962. if self._fragmented_message_buffer is not None:
  963. new_len += len(self._fragmented_message_buffer)
  964. if new_len > self.params.max_message_size:
  965. self.close(1009, "message too big")
  966. self._abort()
  967. return
  968. # Read the payload, unmasking if necessary.
  969. if is_masked:
  970. self._frame_mask = await self._read_bytes(4)
  971. data = await self._read_bytes(payloadlen)
  972. if is_masked:
  973. assert self._frame_mask is not None
  974. data = _websocket_mask(self._frame_mask, data)
  975. # Decide what to do with this frame.
  976. if opcode_is_control:
  977. # control frames may be interleaved with a series of fragmented
  978. # data frames, so control frames must not interact with
  979. # self._fragmented_*
  980. if not is_final_frame:
  981. # control frames must not be fragmented
  982. self._abort()
  983. return
  984. elif opcode == 0: # continuation frame
  985. if self._fragmented_message_buffer is None:
  986. # nothing to continue
  987. self._abort()
  988. return
  989. self._fragmented_message_buffer += data
  990. if is_final_frame:
  991. opcode = self._fragmented_message_opcode
  992. data = self._fragmented_message_buffer
  993. self._fragmented_message_buffer = None
  994. else: # start of new data message
  995. if self._fragmented_message_buffer is not None:
  996. # can't start new message until the old one is finished
  997. self._abort()
  998. return
  999. if not is_final_frame:
  1000. self._fragmented_message_opcode = opcode
  1001. self._fragmented_message_buffer = data
  1002. if is_final_frame:
  1003. handled_future = self._handle_message(opcode, data)
  1004. if handled_future is not None:
  1005. await handled_future
  1006. def _handle_message(self, opcode: int, data: bytes) -> "Optional[Future[None]]":
  1007. """Execute on_message, returning its Future if it is a coroutine."""
  1008. if self.client_terminated:
  1009. return None
  1010. if self._frame_compressed:
  1011. assert self._decompressor is not None
  1012. try:
  1013. data = self._decompressor.decompress(data)
  1014. except _DecompressTooLargeError:
  1015. self.close(1009, "message too big after decompression")
  1016. self._abort()
  1017. return None
  1018. if opcode == 0x1:
  1019. # UTF-8 data
  1020. self._message_bytes_in += len(data)
  1021. try:
  1022. decoded = data.decode("utf-8")
  1023. except UnicodeDecodeError:
  1024. self._abort()
  1025. return None
  1026. return self._run_callback(self.handler.on_message, decoded)
  1027. elif opcode == 0x2:
  1028. # Binary data
  1029. self._message_bytes_in += len(data)
  1030. return self._run_callback(self.handler.on_message, data)
  1031. elif opcode == 0x8:
  1032. # Close
  1033. self.client_terminated = True
  1034. if len(data) >= 2:
  1035. self.close_code = struct.unpack(">H", data[:2])[0]
  1036. if len(data) > 2:
  1037. self.close_reason = to_unicode(data[2:])
  1038. # Echo the received close code, if any (RFC 6455 section 5.5.1).
  1039. self.close(self.close_code)
  1040. elif opcode == 0x9:
  1041. # Ping
  1042. try:
  1043. self._write_frame(True, 0xA, data)
  1044. except StreamClosedError:
  1045. self._abort()
  1046. self._run_callback(self.handler.on_ping, data)
  1047. elif opcode == 0xA:
  1048. # Pong
  1049. self.last_pong = IOLoop.current().time()
  1050. return self._run_callback(self.handler.on_pong, data)
  1051. else:
  1052. self._abort()
  1053. return None
  1054. def close(self, code: int = None, reason: str = None) -> None:
  1055. """Closes the WebSocket connection."""
  1056. if not self.server_terminated:
  1057. if not self.stream.closed():
  1058. if code is None and reason is not None:
  1059. code = 1000 # "normal closure" status code
  1060. if code is None:
  1061. close_data = b""
  1062. else:
  1063. close_data = struct.pack(">H", code)
  1064. if reason is not None:
  1065. close_data += utf8(reason)
  1066. try:
  1067. self._write_frame(True, 0x8, close_data)
  1068. except StreamClosedError:
  1069. self._abort()
  1070. self.server_terminated = True
  1071. if self.client_terminated:
  1072. if self._waiting is not None:
  1073. self.stream.io_loop.remove_timeout(self._waiting)
  1074. self._waiting = None
  1075. self.stream.close()
  1076. elif self._waiting is None:
  1077. # Give the client a few seconds to complete a clean shutdown,
  1078. # otherwise just close the connection.
  1079. self._waiting = self.stream.io_loop.add_timeout(
  1080. self.stream.io_loop.time() + 5, self._abort
  1081. )
  1082. def is_closing(self) -> bool:
  1083. """Return ``True`` if this connection is closing.
  1084. The connection is considered closing if either side has
  1085. initiated its closing handshake or if the stream has been
  1086. shut down uncleanly.
  1087. """
  1088. return self.stream.closed() or self.client_terminated or self.server_terminated
  1089. @property
  1090. def ping_interval(self) -> Optional[float]:
  1091. interval = self.params.ping_interval
  1092. if interval is not None:
  1093. return interval
  1094. return 0
  1095. @property
  1096. def ping_timeout(self) -> Optional[float]:
  1097. timeout = self.params.ping_timeout
  1098. if timeout is not None:
  1099. return timeout
  1100. assert self.ping_interval is not None
  1101. return max(3 * self.ping_interval, 30)
  1102. def start_pinging(self) -> None:
  1103. """Start sending periodic pings to keep the connection alive"""
  1104. assert self.ping_interval is not None
  1105. if self.ping_interval > 0:
  1106. self.last_ping = self.last_pong = IOLoop.current().time()
  1107. self.ping_callback = PeriodicCallback(
  1108. self.periodic_ping, self.ping_interval * 1000
  1109. )
  1110. self.ping_callback.start()
  1111. def periodic_ping(self) -> None:
  1112. """Send a ping to keep the websocket alive
  1113. Called periodically if the websocket_ping_interval is set and non-zero.
  1114. """
  1115. if self.is_closing() and self.ping_callback is not None:
  1116. self.ping_callback.stop()
  1117. return
  1118. # Check for timeout on pong. Make sure that we really have
  1119. # sent a recent ping in case the machine with both server and
  1120. # client has been suspended since the last ping.
  1121. now = IOLoop.current().time()
  1122. since_last_pong = now - self.last_pong
  1123. since_last_ping = now - self.last_ping
  1124. assert self.ping_interval is not None
  1125. assert self.ping_timeout is not None
  1126. if (
  1127. since_last_ping < 2 * self.ping_interval
  1128. and since_last_pong > self.ping_timeout
  1129. ):
  1130. self.close()
  1131. return
  1132. self.write_ping(b"")
  1133. self.last_ping = now
  1134. def set_nodelay(self, x: bool) -> None:
  1135. self.stream.set_nodelay(x)
  1136. class WebSocketClientConnection(simple_httpclient._HTTPConnection):
  1137. """WebSocket client connection.
  1138. This class should not be instantiated directly; use the
  1139. `websocket_connect` function instead.
  1140. """
  1141. protocol = None # type: WebSocketProtocol
  1142. def __init__(
  1143. self,
  1144. request: httpclient.HTTPRequest,
  1145. on_message_callback: Callable[[Union[None, str, bytes]], None] = None,
  1146. compression_options: Dict[str, Any] = None,
  1147. ping_interval: float = None,
  1148. ping_timeout: float = None,
  1149. max_message_size: int = _default_max_message_size,
  1150. subprotocols: Optional[List[str]] = [],
  1151. ) -> None:
  1152. self.connect_future = Future() # type: Future[WebSocketClientConnection]
  1153. self.read_queue = Queue(1) # type: Queue[Union[None, str, bytes]]
  1154. self.key = base64.b64encode(os.urandom(16))
  1155. self._on_message_callback = on_message_callback
  1156. self.close_code = None # type: Optional[int]
  1157. self.close_reason = None # type: Optional[str]
  1158. self.params = _WebSocketParams(
  1159. ping_interval=ping_interval,
  1160. ping_timeout=ping_timeout,
  1161. max_message_size=max_message_size,
  1162. compression_options=compression_options,
  1163. )
  1164. scheme, sep, rest = request.url.partition(":")
  1165. scheme = {"ws": "http", "wss": "https"}[scheme]
  1166. request.url = scheme + sep + rest
  1167. request.headers.update(
  1168. {
  1169. "Upgrade": "websocket",
  1170. "Connection": "Upgrade",
  1171. "Sec-WebSocket-Key": self.key,
  1172. "Sec-WebSocket-Version": "13",
  1173. }
  1174. )
  1175. if subprotocols is not None:
  1176. request.headers["Sec-WebSocket-Protocol"] = ",".join(subprotocols)
  1177. if compression_options is not None:
  1178. # Always offer to let the server set our max_wbits (and even though
  1179. # we don't offer it, we will accept a client_no_context_takeover
  1180. # from the server).
  1181. # TODO: set server parameters for deflate extension
  1182. # if requested in self.compression_options.
  1183. request.headers[
  1184. "Sec-WebSocket-Extensions"
  1185. ] = "permessage-deflate; client_max_window_bits"
  1186. self.tcp_client = TCPClient()
  1187. super(WebSocketClientConnection, self).__init__(
  1188. None,
  1189. request,
  1190. lambda: None,
  1191. self._on_http_response,
  1192. 104857600,
  1193. self.tcp_client,
  1194. 65536,
  1195. 104857600,
  1196. )
  1197. def close(self, code: int = None, reason: str = None) -> None:
  1198. """Closes the websocket connection.
  1199. ``code`` and ``reason`` are documented under
  1200. `WebSocketHandler.close`.
  1201. .. versionadded:: 3.2
  1202. .. versionchanged:: 4.0
  1203. Added the ``code`` and ``reason`` arguments.
  1204. """
  1205. if self.protocol is not None:
  1206. self.protocol.close(code, reason)
  1207. self.protocol = None # type: ignore
  1208. def on_connection_close(self) -> None:
  1209. if not self.connect_future.done():
  1210. self.connect_future.set_exception(StreamClosedError())
  1211. self._on_message(None)
  1212. self.tcp_client.close()
  1213. super(WebSocketClientConnection, self).on_connection_close()
  1214. def on_ws_connection_close(
  1215. self, close_code: int = None, close_reason: str = None
  1216. ) -> None:
  1217. self.close_code = close_code
  1218. self.close_reason = close_reason
  1219. self.on_connection_close()
  1220. def _on_http_response(self, response: httpclient.HTTPResponse) -> None:
  1221. if not self.connect_future.done():
  1222. if response.error:
  1223. self.connect_future.set_exception(response.error)
  1224. else:
  1225. self.connect_future.set_exception(
  1226. WebSocketError("Non-websocket response")
  1227. )
  1228. async def headers_received(
  1229. self,
  1230. start_line: Union[httputil.RequestStartLine, httputil.ResponseStartLine],
  1231. headers: httputil.HTTPHeaders,
  1232. ) -> None:
  1233. assert isinstance(start_line, httputil.ResponseStartLine)
  1234. if start_line.code != 101:
  1235. await super(WebSocketClientConnection, self).headers_received(
  1236. start_line, headers
  1237. )
  1238. return
  1239. if self._timeout is not None:
  1240. self.io_loop.remove_timeout(self._timeout)
  1241. self._timeout = None
  1242. self.headers = headers
  1243. self.protocol = self.get_websocket_protocol()
  1244. self.protocol._process_server_headers(self.key, self.headers)
  1245. self.protocol.stream = self.connection.detach()
  1246. IOLoop.current().add_callback(self.protocol._receive_frame_loop)
  1247. self.protocol.start_pinging()
  1248. # Once we've taken over the connection, clear the final callback
  1249. # we set on the http request. This deactivates the error handling
  1250. # in simple_httpclient that would otherwise interfere with our
  1251. # ability to see exceptions.
  1252. self.final_callback = None # type: ignore
  1253. future_set_result_unless_cancelled(self.connect_future, self)
  1254. def write_message(
  1255. self, message: Union[str, bytes], binary: bool = False
  1256. ) -> "Future[None]":
  1257. """Sends a message to the WebSocket server.
  1258. If the stream is closed, raises `WebSocketClosedError`.
  1259. Returns a `.Future` which can be used for flow control.
  1260. .. versionchanged:: 5.0
  1261. Exception raised on a closed stream changed from `.StreamClosedError`
  1262. to `WebSocketClosedError`.
  1263. """
  1264. return self.protocol.write_message(message, binary=binary)
  1265. def read_message(
  1266. self, callback: Callable[["Future[Union[None, str, bytes]]"], None] = None
  1267. ) -> Awaitable[Union[None, str, bytes]]:
  1268. """Reads a message from the WebSocket server.
  1269. If on_message_callback was specified at WebSocket
  1270. initialization, this function will never return messages
  1271. Returns a future whose result is the message, or None
  1272. if the connection is closed. If a callback argument
  1273. is given it will be called with the future when it is
  1274. ready.
  1275. """
  1276. awaitable = self.read_queue.get()
  1277. if callback is not None:
  1278. self.io_loop.add_future(asyncio.ensure_future(awaitable), callback)
  1279. return awaitable
  1280. def on_message(self, message: Union[str, bytes]) -> Optional[Awaitable[None]]:
  1281. return self._on_message(message)
  1282. def _on_message(
  1283. self, message: Union[None, str, bytes]
  1284. ) -> Optional[Awaitable[None]]:
  1285. if self._on_message_callback:
  1286. self._on_message_callback(message)
  1287. return None
  1288. else:
  1289. return self.read_queue.put(message)
  1290. def ping(self, data: bytes = b"") -> None:
  1291. """Send ping frame to the remote end.
  1292. The data argument allows a small amount of data (up to 125
  1293. bytes) to be sent as a part of the ping message. Note that not
  1294. all websocket implementations expose this data to
  1295. applications.
  1296. Consider using the ``ping_interval`` argument to
  1297. `websocket_connect` instead of sending pings manually.
  1298. .. versionadded:: 5.1
  1299. """
  1300. data = utf8(data)
  1301. if self.protocol is None:
  1302. raise WebSocketClosedError()
  1303. self.protocol.write_ping(data)
  1304. def on_pong(self, data: bytes) -> None:
  1305. pass
  1306. def on_ping(self, data: bytes) -> None:
  1307. pass
  1308. def get_websocket_protocol(self) -> WebSocketProtocol:
  1309. return WebSocketProtocol13(self, mask_outgoing=True, params=self.params)
  1310. @property
  1311. def selected_subprotocol(self) -> Optional[str]:
  1312. """The subprotocol selected by the server.
  1313. .. versionadded:: 5.1
  1314. """
  1315. return self.protocol.selected_subprotocol
  1316. def log_exception(
  1317. self,
  1318. typ: "Optional[Type[BaseException]]",
  1319. value: Optional[BaseException],
  1320. tb: Optional[TracebackType],
  1321. ) -> None:
  1322. assert typ is not None
  1323. assert value is not None
  1324. app_log.error("Uncaught exception %s", value, exc_info=(typ, value, tb))
  1325. def websocket_connect(
  1326. url: Union[str, httpclient.HTTPRequest],
  1327. callback: Callable[["Future[WebSocketClientConnection]"], None] = None,
  1328. connect_timeout: float = None,
  1329. on_message_callback: Callable[[Union[None, str, bytes]], None] = None,
  1330. compression_options: Dict[str, Any] = None,
  1331. ping_interval: float = None,
  1332. ping_timeout: float = None,
  1333. max_message_size: int = _default_max_message_size,
  1334. subprotocols: List[str] = None,
  1335. ) -> "Awaitable[WebSocketClientConnection]":
  1336. """Client-side websocket support.
  1337. Takes a url and returns a Future whose result is a
  1338. `WebSocketClientConnection`.
  1339. ``compression_options`` is interpreted in the same way as the
  1340. return value of `.WebSocketHandler.get_compression_options`.
  1341. The connection supports two styles of operation. In the coroutine
  1342. style, the application typically calls
  1343. `~.WebSocketClientConnection.read_message` in a loop::
  1344. conn = yield websocket_connect(url)
  1345. while True:
  1346. msg = yield conn.read_message()
  1347. if msg is None: break
  1348. # Do something with msg
  1349. In the callback style, pass an ``on_message_callback`` to
  1350. ``websocket_connect``. In both styles, a message of ``None``
  1351. indicates that the connection has been closed.
  1352. ``subprotocols`` may be a list of strings specifying proposed
  1353. subprotocols. The selected protocol may be found on the
  1354. ``selected_subprotocol`` attribute of the connection object
  1355. when the connection is complete.
  1356. .. versionchanged:: 3.2
  1357. Also accepts ``HTTPRequest`` objects in place of urls.
  1358. .. versionchanged:: 4.1
  1359. Added ``compression_options`` and ``on_message_callback``.
  1360. .. versionchanged:: 4.5
  1361. Added the ``ping_interval``, ``ping_timeout``, and ``max_message_size``
  1362. arguments, which have the same meaning as in `WebSocketHandler`.
  1363. .. versionchanged:: 5.0
  1364. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  1365. .. versionchanged:: 5.1
  1366. Added the ``subprotocols`` argument.
  1367. """
  1368. if isinstance(url, httpclient.HTTPRequest):
  1369. assert connect_timeout is None
  1370. request = url
  1371. # Copy and convert the headers dict/object (see comments in
  1372. # AsyncHTTPClient.fetch)
  1373. request.headers = httputil.HTTPHeaders(request.headers)
  1374. else:
  1375. request = httpclient.HTTPRequest(url, connect_timeout=connect_timeout)
  1376. request = cast(
  1377. httpclient.HTTPRequest,
  1378. httpclient._RequestProxy(request, httpclient.HTTPRequest._DEFAULTS),
  1379. )
  1380. conn = WebSocketClientConnection(
  1381. request,
  1382. on_message_callback=on_message_callback,
  1383. compression_options=compression_options,
  1384. ping_interval=ping_interval,
  1385. ping_timeout=ping_timeout,
  1386. max_message_size=max_message_size,
  1387. subprotocols=subprotocols,
  1388. )
  1389. if callback is not None:
  1390. IOLoop.current().add_future(conn.connect_future, callback)
  1391. return conn.connect_future