httputil_test.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. # -*- coding: utf-8 -*-
  2. from tornado.httputil import (
  3. url_concat,
  4. parse_multipart_form_data,
  5. HTTPHeaders,
  6. format_timestamp,
  7. HTTPServerRequest,
  8. parse_request_start_line,
  9. parse_cookie,
  10. qs_to_qsl,
  11. HTTPInputError,
  12. HTTPFile,
  13. )
  14. from tornado.escape import utf8, native_str
  15. from tornado.log import gen_log
  16. from tornado.testing import ExpectLog
  17. import copy
  18. import datetime
  19. import logging
  20. import pickle
  21. import time
  22. import urllib.parse
  23. import unittest
  24. from typing import Tuple, Dict, List
  25. def form_data_args() -> Tuple[Dict[str, List[bytes]], Dict[str, List[HTTPFile]]]:
  26. """Return two empty dicts suitable for use with parse_multipart_form_data.
  27. mypy insists on type annotations for dict literals, so this lets us avoid
  28. the verbose types throughout this test.
  29. """
  30. return {}, {}
  31. class TestUrlConcat(unittest.TestCase):
  32. def test_url_concat_no_query_params(self):
  33. url = url_concat("https://localhost/path", [("y", "y"), ("z", "z")])
  34. self.assertEqual(url, "https://localhost/path?y=y&z=z")
  35. def test_url_concat_encode_args(self):
  36. url = url_concat("https://localhost/path", [("y", "/y"), ("z", "z")])
  37. self.assertEqual(url, "https://localhost/path?y=%2Fy&z=z")
  38. def test_url_concat_trailing_q(self):
  39. url = url_concat("https://localhost/path?", [("y", "y"), ("z", "z")])
  40. self.assertEqual(url, "https://localhost/path?y=y&z=z")
  41. def test_url_concat_q_with_no_trailing_amp(self):
  42. url = url_concat("https://localhost/path?x", [("y", "y"), ("z", "z")])
  43. self.assertEqual(url, "https://localhost/path?x=&y=y&z=z")
  44. def test_url_concat_trailing_amp(self):
  45. url = url_concat("https://localhost/path?x&", [("y", "y"), ("z", "z")])
  46. self.assertEqual(url, "https://localhost/path?x=&y=y&z=z")
  47. def test_url_concat_mult_params(self):
  48. url = url_concat("https://localhost/path?a=1&b=2", [("y", "y"), ("z", "z")])
  49. self.assertEqual(url, "https://localhost/path?a=1&b=2&y=y&z=z")
  50. def test_url_concat_no_params(self):
  51. url = url_concat("https://localhost/path?r=1&t=2", [])
  52. self.assertEqual(url, "https://localhost/path?r=1&t=2")
  53. def test_url_concat_none_params(self):
  54. url = url_concat("https://localhost/path?r=1&t=2", None)
  55. self.assertEqual(url, "https://localhost/path?r=1&t=2")
  56. def test_url_concat_with_frag(self):
  57. url = url_concat("https://localhost/path#tab", [("y", "y")])
  58. self.assertEqual(url, "https://localhost/path?y=y#tab")
  59. def test_url_concat_multi_same_params(self):
  60. url = url_concat("https://localhost/path", [("y", "y1"), ("y", "y2")])
  61. self.assertEqual(url, "https://localhost/path?y=y1&y=y2")
  62. def test_url_concat_multi_same_query_params(self):
  63. url = url_concat("https://localhost/path?r=1&r=2", [("y", "y")])
  64. self.assertEqual(url, "https://localhost/path?r=1&r=2&y=y")
  65. def test_url_concat_dict_params(self):
  66. url = url_concat("https://localhost/path", dict(y="y"))
  67. self.assertEqual(url, "https://localhost/path?y=y")
  68. class QsParseTest(unittest.TestCase):
  69. def test_parsing(self):
  70. qsstring = "a=1&b=2&a=3"
  71. qs = urllib.parse.parse_qs(qsstring)
  72. qsl = list(qs_to_qsl(qs))
  73. self.assertIn(("a", "1"), qsl)
  74. self.assertIn(("a", "3"), qsl)
  75. self.assertIn(("b", "2"), qsl)
  76. class MultipartFormDataTest(unittest.TestCase):
  77. def test_file_upload(self):
  78. data = b"""\
  79. --1234
  80. Content-Disposition: form-data; name="files"; filename="ab.txt"
  81. Foo
  82. --1234--""".replace(
  83. b"\n", b"\r\n"
  84. )
  85. args, files = form_data_args()
  86. parse_multipart_form_data(b"1234", data, args, files)
  87. file = files["files"][0]
  88. self.assertEqual(file["filename"], "ab.txt")
  89. self.assertEqual(file["body"], b"Foo")
  90. def test_unquoted_names(self):
  91. # quotes are optional unless special characters are present
  92. data = b"""\
  93. --1234
  94. Content-Disposition: form-data; name=files; filename=ab.txt
  95. Foo
  96. --1234--""".replace(
  97. b"\n", b"\r\n"
  98. )
  99. args, files = form_data_args()
  100. parse_multipart_form_data(b"1234", data, args, files)
  101. file = files["files"][0]
  102. self.assertEqual(file["filename"], "ab.txt")
  103. self.assertEqual(file["body"], b"Foo")
  104. def test_special_filenames(self):
  105. filenames = [
  106. "a;b.txt",
  107. 'a"b.txt',
  108. 'a";b.txt',
  109. 'a;"b.txt',
  110. 'a";";.txt',
  111. 'a\\"b.txt',
  112. "a\\b.txt",
  113. ]
  114. for filename in filenames:
  115. logging.debug("trying filename %r", filename)
  116. str_data = """\
  117. --1234
  118. Content-Disposition: form-data; name="files"; filename="%s"
  119. Foo
  120. --1234--""" % filename.replace(
  121. "\\", "\\\\"
  122. ).replace(
  123. '"', '\\"'
  124. )
  125. data = utf8(str_data.replace("\n", "\r\n"))
  126. args, files = form_data_args()
  127. parse_multipart_form_data(b"1234", data, args, files)
  128. file = files["files"][0]
  129. self.assertEqual(file["filename"], filename)
  130. self.assertEqual(file["body"], b"Foo")
  131. def test_non_ascii_filename(self):
  132. data = b"""\
  133. --1234
  134. Content-Disposition: form-data; name="files"; filename="ab.txt"; filename*=UTF-8''%C3%A1b.txt
  135. Foo
  136. --1234--""".replace(
  137. b"\n", b"\r\n"
  138. )
  139. args, files = form_data_args()
  140. parse_multipart_form_data(b"1234", data, args, files)
  141. file = files["files"][0]
  142. self.assertEqual(file["filename"], u"áb.txt")
  143. self.assertEqual(file["body"], b"Foo")
  144. def test_boundary_starts_and_ends_with_quotes(self):
  145. data = b"""\
  146. --1234
  147. Content-Disposition: form-data; name="files"; filename="ab.txt"
  148. Foo
  149. --1234--""".replace(
  150. b"\n", b"\r\n"
  151. )
  152. args, files = form_data_args()
  153. parse_multipart_form_data(b'"1234"', data, args, files)
  154. file = files["files"][0]
  155. self.assertEqual(file["filename"], "ab.txt")
  156. self.assertEqual(file["body"], b"Foo")
  157. def test_missing_headers(self):
  158. data = b"""\
  159. --1234
  160. Foo
  161. --1234--""".replace(
  162. b"\n", b"\r\n"
  163. )
  164. args, files = form_data_args()
  165. with ExpectLog(gen_log, "multipart/form-data missing headers"):
  166. parse_multipart_form_data(b"1234", data, args, files)
  167. self.assertEqual(files, {})
  168. def test_invalid_content_disposition(self):
  169. data = b"""\
  170. --1234
  171. Content-Disposition: invalid; name="files"; filename="ab.txt"
  172. Foo
  173. --1234--""".replace(
  174. b"\n", b"\r\n"
  175. )
  176. args, files = form_data_args()
  177. with ExpectLog(gen_log, "Invalid multipart/form-data"):
  178. parse_multipart_form_data(b"1234", data, args, files)
  179. self.assertEqual(files, {})
  180. def test_line_does_not_end_with_correct_line_break(self):
  181. data = b"""\
  182. --1234
  183. Content-Disposition: form-data; name="files"; filename="ab.txt"
  184. Foo--1234--""".replace(
  185. b"\n", b"\r\n"
  186. )
  187. args, files = form_data_args()
  188. with ExpectLog(gen_log, "Invalid multipart/form-data"):
  189. parse_multipart_form_data(b"1234", data, args, files)
  190. self.assertEqual(files, {})
  191. def test_content_disposition_header_without_name_parameter(self):
  192. data = b"""\
  193. --1234
  194. Content-Disposition: form-data; filename="ab.txt"
  195. Foo
  196. --1234--""".replace(
  197. b"\n", b"\r\n"
  198. )
  199. args, files = form_data_args()
  200. with ExpectLog(gen_log, "multipart/form-data value missing name"):
  201. parse_multipart_form_data(b"1234", data, args, files)
  202. self.assertEqual(files, {})
  203. def test_data_after_final_boundary(self):
  204. # The spec requires that data after the final boundary be ignored.
  205. # http://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
  206. # In practice, some libraries include an extra CRLF after the boundary.
  207. data = b"""\
  208. --1234
  209. Content-Disposition: form-data; name="files"; filename="ab.txt"
  210. Foo
  211. --1234--
  212. """.replace(
  213. b"\n", b"\r\n"
  214. )
  215. args, files = form_data_args()
  216. parse_multipart_form_data(b"1234", data, args, files)
  217. file = files["files"][0]
  218. self.assertEqual(file["filename"], "ab.txt")
  219. self.assertEqual(file["body"], b"Foo")
  220. class HTTPHeadersTest(unittest.TestCase):
  221. def test_multi_line(self):
  222. # Lines beginning with whitespace are appended to the previous line
  223. # with any leading whitespace replaced by a single space.
  224. # Note that while multi-line headers are a part of the HTTP spec,
  225. # their use is strongly discouraged.
  226. data = """\
  227. Foo: bar
  228. baz
  229. Asdf: qwer
  230. \tzxcv
  231. Foo: even
  232. more
  233. lines
  234. """.replace(
  235. "\n", "\r\n"
  236. )
  237. headers = HTTPHeaders.parse(data)
  238. self.assertEqual(headers["asdf"], "qwer zxcv")
  239. self.assertEqual(headers.get_list("asdf"), ["qwer zxcv"])
  240. self.assertEqual(headers["Foo"], "bar baz,even more lines")
  241. self.assertEqual(headers.get_list("foo"), ["bar baz", "even more lines"])
  242. self.assertEqual(
  243. sorted(list(headers.get_all())),
  244. [("Asdf", "qwer zxcv"), ("Foo", "bar baz"), ("Foo", "even more lines")],
  245. )
  246. def test_malformed_continuation(self):
  247. # If the first line starts with whitespace, it's a
  248. # continuation line with nothing to continue, so reject it
  249. # (with a proper error).
  250. data = " Foo: bar"
  251. self.assertRaises(HTTPInputError, HTTPHeaders.parse, data)
  252. def test_unicode_newlines(self):
  253. # Ensure that only \r\n is recognized as a header separator, and not
  254. # the other newline-like unicode characters.
  255. # Characters that are likely to be problematic can be found in
  256. # http://unicode.org/standard/reports/tr13/tr13-5.html
  257. # and cpython's unicodeobject.c (which defines the implementation
  258. # of unicode_type.splitlines(), and uses a different list than TR13).
  259. newlines = [
  260. u"\u001b", # VERTICAL TAB
  261. u"\u001c", # FILE SEPARATOR
  262. u"\u001d", # GROUP SEPARATOR
  263. u"\u001e", # RECORD SEPARATOR
  264. u"\u0085", # NEXT LINE
  265. u"\u2028", # LINE SEPARATOR
  266. u"\u2029", # PARAGRAPH SEPARATOR
  267. ]
  268. for newline in newlines:
  269. # Try the utf8 and latin1 representations of each newline
  270. for encoding in ["utf8", "latin1"]:
  271. try:
  272. try:
  273. encoded = newline.encode(encoding)
  274. except UnicodeEncodeError:
  275. # Some chars cannot be represented in latin1
  276. continue
  277. data = b"Cookie: foo=" + encoded + b"bar"
  278. # parse() wants a native_str, so decode through latin1
  279. # in the same way the real parser does.
  280. headers = HTTPHeaders.parse(native_str(data.decode("latin1")))
  281. expected = [
  282. (
  283. "Cookie",
  284. "foo=" + native_str(encoded.decode("latin1")) + "bar",
  285. )
  286. ]
  287. self.assertEqual(expected, list(headers.get_all()))
  288. except Exception:
  289. gen_log.warning("failed while trying %r in %s", newline, encoding)
  290. raise
  291. def test_optional_cr(self):
  292. # Both CRLF and LF should be accepted as separators. CR should not be
  293. # part of the data when followed by LF, but it is a normal char
  294. # otherwise (or should bare CR be an error?)
  295. headers = HTTPHeaders.parse("CRLF: crlf\r\nLF: lf\nCR: cr\rMore: more\r\n")
  296. self.assertEqual(
  297. sorted(headers.get_all()),
  298. [("Cr", "cr\rMore: more"), ("Crlf", "crlf"), ("Lf", "lf")],
  299. )
  300. def test_copy(self):
  301. all_pairs = [("A", "1"), ("A", "2"), ("B", "c")]
  302. h1 = HTTPHeaders()
  303. for k, v in all_pairs:
  304. h1.add(k, v)
  305. h2 = h1.copy()
  306. h3 = copy.copy(h1)
  307. h4 = copy.deepcopy(h1)
  308. for headers in [h1, h2, h3, h4]:
  309. # All the copies are identical, no matter how they were
  310. # constructed.
  311. self.assertEqual(list(sorted(headers.get_all())), all_pairs)
  312. for headers in [h2, h3, h4]:
  313. # Neither the dict or its member lists are reused.
  314. self.assertIsNot(headers, h1)
  315. self.assertIsNot(headers.get_list("A"), h1.get_list("A"))
  316. def test_pickle_roundtrip(self):
  317. headers = HTTPHeaders()
  318. headers.add("Set-Cookie", "a=b")
  319. headers.add("Set-Cookie", "c=d")
  320. headers.add("Content-Type", "text/html")
  321. pickled = pickle.dumps(headers)
  322. unpickled = pickle.loads(pickled)
  323. self.assertEqual(sorted(headers.get_all()), sorted(unpickled.get_all()))
  324. self.assertEqual(sorted(headers.items()), sorted(unpickled.items()))
  325. def test_setdefault(self):
  326. headers = HTTPHeaders()
  327. headers["foo"] = "bar"
  328. # If a value is present, setdefault returns it without changes.
  329. self.assertEqual(headers.setdefault("foo", "baz"), "bar")
  330. self.assertEqual(headers["foo"], "bar")
  331. # If a value is not present, setdefault sets it for future use.
  332. self.assertEqual(headers.setdefault("quux", "xyzzy"), "xyzzy")
  333. self.assertEqual(headers["quux"], "xyzzy")
  334. self.assertEqual(sorted(headers.get_all()), [("Foo", "bar"), ("Quux", "xyzzy")])
  335. def test_string(self):
  336. headers = HTTPHeaders()
  337. headers.add("Foo", "1")
  338. headers.add("Foo", "2")
  339. headers.add("Foo", "3")
  340. headers2 = HTTPHeaders.parse(str(headers))
  341. self.assertEquals(headers, headers2)
  342. class FormatTimestampTest(unittest.TestCase):
  343. # Make sure that all the input types are supported.
  344. TIMESTAMP = 1359312200.503611
  345. EXPECTED = "Sun, 27 Jan 2013 18:43:20 GMT"
  346. def check(self, value):
  347. self.assertEqual(format_timestamp(value), self.EXPECTED)
  348. def test_unix_time_float(self):
  349. self.check(self.TIMESTAMP)
  350. def test_unix_time_int(self):
  351. self.check(int(self.TIMESTAMP))
  352. def test_struct_time(self):
  353. self.check(time.gmtime(self.TIMESTAMP))
  354. def test_time_tuple(self):
  355. tup = tuple(time.gmtime(self.TIMESTAMP))
  356. self.assertEqual(9, len(tup))
  357. self.check(tup)
  358. def test_datetime(self):
  359. self.check(datetime.datetime.utcfromtimestamp(self.TIMESTAMP))
  360. # HTTPServerRequest is mainly tested incidentally to the server itself,
  361. # but this tests the parts of the class that can be tested in isolation.
  362. class HTTPServerRequestTest(unittest.TestCase):
  363. def test_default_constructor(self):
  364. # All parameters are formally optional, but uri is required
  365. # (and has been for some time). This test ensures that no
  366. # more required parameters slip in.
  367. HTTPServerRequest(uri="/")
  368. def test_body_is_a_byte_string(self):
  369. requets = HTTPServerRequest(uri="/")
  370. self.assertIsInstance(requets.body, bytes)
  371. def test_repr_does_not_contain_headers(self):
  372. request = HTTPServerRequest(
  373. uri="/", headers=HTTPHeaders({"Canary": ["Coal Mine"]})
  374. )
  375. self.assertTrue("Canary" not in repr(request))
  376. class ParseRequestStartLineTest(unittest.TestCase):
  377. METHOD = "GET"
  378. PATH = "/foo"
  379. VERSION = "HTTP/1.1"
  380. def test_parse_request_start_line(self):
  381. start_line = " ".join([self.METHOD, self.PATH, self.VERSION])
  382. parsed_start_line = parse_request_start_line(start_line)
  383. self.assertEqual(parsed_start_line.method, self.METHOD)
  384. self.assertEqual(parsed_start_line.path, self.PATH)
  385. self.assertEqual(parsed_start_line.version, self.VERSION)
  386. class ParseCookieTest(unittest.TestCase):
  387. # These tests copied from Django:
  388. # https://github.com/django/django/pull/6277/commits/da810901ada1cae9fc1f018f879f11a7fb467b28
  389. def test_python_cookies(self):
  390. """
  391. Test cases copied from Python's Lib/test/test_http_cookies.py
  392. """
  393. self.assertEqual(
  394. parse_cookie("chips=ahoy; vienna=finger"),
  395. {"chips": "ahoy", "vienna": "finger"},
  396. )
  397. # Here parse_cookie() differs from Python's cookie parsing in that it
  398. # treats all semicolons as delimiters, even within quotes.
  399. self.assertEqual(
  400. parse_cookie('keebler="E=mc2; L=\\"Loves\\"; fudge=\\012;"'),
  401. {"keebler": '"E=mc2', "L": '\\"Loves\\"', "fudge": "\\012", "": '"'},
  402. )
  403. # Illegal cookies that have an '=' char in an unquoted value.
  404. self.assertEqual(parse_cookie("keebler=E=mc2"), {"keebler": "E=mc2"})
  405. # Cookies with ':' character in their name.
  406. self.assertEqual(
  407. parse_cookie("key:term=value:term"), {"key:term": "value:term"}
  408. )
  409. # Cookies with '[' and ']'.
  410. self.assertEqual(
  411. parse_cookie("a=b; c=[; d=r; f=h"), {"a": "b", "c": "[", "d": "r", "f": "h"}
  412. )
  413. def test_cookie_edgecases(self):
  414. # Cookies that RFC6265 allows.
  415. self.assertEqual(
  416. parse_cookie("a=b; Domain=example.com"), {"a": "b", "Domain": "example.com"}
  417. )
  418. # parse_cookie() has historically kept only the last cookie with the
  419. # same name.
  420. self.assertEqual(parse_cookie("a=b; h=i; a=c"), {"a": "c", "h": "i"})
  421. def test_invalid_cookies(self):
  422. """
  423. Cookie strings that go against RFC6265 but browsers will send if set
  424. via document.cookie.
  425. """
  426. # Chunks without an equals sign appear as unnamed values per
  427. # https://bugzilla.mozilla.org/show_bug.cgi?id=169091
  428. self.assertIn(
  429. "django_language",
  430. parse_cookie("abc=def; unnamed; django_language=en").keys(),
  431. )
  432. # Even a double quote may be an unamed value.
  433. self.assertEqual(parse_cookie('a=b; "; c=d'), {"a": "b", "": '"', "c": "d"})
  434. # Spaces in names and values, and an equals sign in values.
  435. self.assertEqual(
  436. parse_cookie("a b c=d e = f; gh=i"), {"a b c": "d e = f", "gh": "i"}
  437. )
  438. # More characters the spec forbids.
  439. self.assertEqual(
  440. parse_cookie('a b,c<>@:/[]?{}=d " =e,f g'),
  441. {"a b,c<>@:/[]?{}": 'd " =e,f g'},
  442. )
  443. # Unicode characters. The spec only allows ASCII.
  444. self.assertEqual(
  445. parse_cookie("saint=André Bessette"),
  446. {"saint": native_str("André Bessette")},
  447. )
  448. # Browsers don't send extra whitespace or semicolons in Cookie headers,
  449. # but parse_cookie() should parse whitespace the same way
  450. # document.cookie parses whitespace.
  451. self.assertEqual(
  452. parse_cookie(" = b ; ; = ; c = ; "), {"": "b", "c": ""}
  453. )