web_test.py 113 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138
  1. from tornado.concurrent import Future
  2. from tornado import gen
  3. from tornado.escape import (
  4. json_decode,
  5. utf8,
  6. to_unicode,
  7. recursive_unicode,
  8. native_str,
  9. to_basestring,
  10. )
  11. from tornado.httpclient import HTTPClientError
  12. from tornado.httputil import format_timestamp
  13. from tornado.iostream import IOStream
  14. from tornado import locale
  15. from tornado.locks import Event
  16. from tornado.log import app_log, gen_log
  17. from tornado.simple_httpclient import SimpleAsyncHTTPClient
  18. from tornado.template import DictLoader
  19. from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
  20. from tornado.util import ObjectDict, unicode_type
  21. from tornado.web import (
  22. Application,
  23. RequestHandler,
  24. StaticFileHandler,
  25. RedirectHandler as WebRedirectHandler,
  26. HTTPError,
  27. MissingArgumentError,
  28. ErrorHandler,
  29. authenticated,
  30. url,
  31. _create_signature_v1,
  32. create_signed_value,
  33. decode_signed_value,
  34. get_signature_key_version,
  35. UIModule,
  36. Finish,
  37. stream_request_body,
  38. removeslash,
  39. addslash,
  40. GZipContentEncoding,
  41. )
  42. import binascii
  43. import contextlib
  44. import copy
  45. import datetime
  46. import email.utils
  47. import gzip
  48. from io import BytesIO
  49. import itertools
  50. import logging
  51. import os
  52. import re
  53. import socket
  54. import typing # noqa: F401
  55. import unittest
  56. import urllib.parse
  57. def relpath(*a):
  58. return os.path.join(os.path.dirname(__file__), *a)
  59. class WebTestCase(AsyncHTTPTestCase):
  60. """Base class for web tests that also supports WSGI mode.
  61. Override get_handlers and get_app_kwargs instead of get_app.
  62. This class is deprecated since WSGI mode is no longer supported.
  63. """
  64. def get_app(self):
  65. self.app = Application(self.get_handlers(), **self.get_app_kwargs())
  66. return self.app
  67. def get_handlers(self):
  68. raise NotImplementedError()
  69. def get_app_kwargs(self):
  70. return {}
  71. class SimpleHandlerTestCase(WebTestCase):
  72. """Simplified base class for tests that work with a single handler class.
  73. To use, define a nested class named ``Handler``.
  74. """
  75. def get_handlers(self):
  76. return [("/", self.Handler)]
  77. class HelloHandler(RequestHandler):
  78. def get(self):
  79. self.write("hello")
  80. class CookieTestRequestHandler(RequestHandler):
  81. # stub out enough methods to make the secure_cookie functions work
  82. def __init__(self, cookie_secret="0123456789", key_version=None):
  83. # don't call super.__init__
  84. self._cookies = {} # type: typing.Dict[str, bytes]
  85. if key_version is None:
  86. self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret))
  87. else:
  88. self.application = ObjectDict(
  89. settings=dict(cookie_secret=cookie_secret, key_version=key_version)
  90. )
  91. def get_cookie(self, name):
  92. return self._cookies.get(name)
  93. def set_cookie(self, name, value, expires_days=None):
  94. self._cookies[name] = value
  95. # See SignedValueTest below for more.
  96. class SecureCookieV1Test(unittest.TestCase):
  97. def test_round_trip(self):
  98. handler = CookieTestRequestHandler()
  99. handler.set_secure_cookie("foo", b"bar", version=1)
  100. self.assertEqual(handler.get_secure_cookie("foo", min_version=1), b"bar")
  101. def test_cookie_tampering_future_timestamp(self):
  102. handler = CookieTestRequestHandler()
  103. # this string base64-encodes to '12345678'
  104. handler.set_secure_cookie("foo", binascii.a2b_hex(b"d76df8e7aefc"), version=1)
  105. cookie = handler._cookies["foo"]
  106. match = re.match(br"12345678\|([0-9]+)\|([0-9a-f]+)", cookie)
  107. assert match is not None
  108. timestamp = match.group(1)
  109. sig = match.group(2)
  110. self.assertEqual(
  111. _create_signature_v1(
  112. handler.application.settings["cookie_secret"],
  113. "foo",
  114. "12345678",
  115. timestamp,
  116. ),
  117. sig,
  118. )
  119. # shifting digits from payload to timestamp doesn't alter signature
  120. # (this is not desirable behavior, just confirming that that's how it
  121. # works)
  122. self.assertEqual(
  123. _create_signature_v1(
  124. handler.application.settings["cookie_secret"],
  125. "foo",
  126. "1234",
  127. b"5678" + timestamp,
  128. ),
  129. sig,
  130. )
  131. # tamper with the cookie
  132. handler._cookies["foo"] = utf8(
  133. "1234|5678%s|%s" % (to_basestring(timestamp), to_basestring(sig))
  134. )
  135. # it gets rejected
  136. with ExpectLog(gen_log, "Cookie timestamp in future"):
  137. self.assertTrue(handler.get_secure_cookie("foo", min_version=1) is None)
  138. def test_arbitrary_bytes(self):
  139. # Secure cookies accept arbitrary data (which is base64 encoded).
  140. # Note that normal cookies accept only a subset of ascii.
  141. handler = CookieTestRequestHandler()
  142. handler.set_secure_cookie("foo", b"\xe9", version=1)
  143. self.assertEqual(handler.get_secure_cookie("foo", min_version=1), b"\xe9")
  144. # See SignedValueTest below for more.
  145. class SecureCookieV2Test(unittest.TestCase):
  146. KEY_VERSIONS = {0: "ajklasdf0ojaisdf", 1: "aslkjasaolwkjsdf"}
  147. def test_round_trip(self):
  148. handler = CookieTestRequestHandler()
  149. handler.set_secure_cookie("foo", b"bar", version=2)
  150. self.assertEqual(handler.get_secure_cookie("foo", min_version=2), b"bar")
  151. def test_key_version_roundtrip(self):
  152. handler = CookieTestRequestHandler(
  153. cookie_secret=self.KEY_VERSIONS, key_version=0
  154. )
  155. handler.set_secure_cookie("foo", b"bar")
  156. self.assertEqual(handler.get_secure_cookie("foo"), b"bar")
  157. def test_key_version_roundtrip_differing_version(self):
  158. handler = CookieTestRequestHandler(
  159. cookie_secret=self.KEY_VERSIONS, key_version=1
  160. )
  161. handler.set_secure_cookie("foo", b"bar")
  162. self.assertEqual(handler.get_secure_cookie("foo"), b"bar")
  163. def test_key_version_increment_version(self):
  164. handler = CookieTestRequestHandler(
  165. cookie_secret=self.KEY_VERSIONS, key_version=0
  166. )
  167. handler.set_secure_cookie("foo", b"bar")
  168. new_handler = CookieTestRequestHandler(
  169. cookie_secret=self.KEY_VERSIONS, key_version=1
  170. )
  171. new_handler._cookies = handler._cookies
  172. self.assertEqual(new_handler.get_secure_cookie("foo"), b"bar")
  173. def test_key_version_invalidate_version(self):
  174. handler = CookieTestRequestHandler(
  175. cookie_secret=self.KEY_VERSIONS, key_version=0
  176. )
  177. handler.set_secure_cookie("foo", b"bar")
  178. new_key_versions = self.KEY_VERSIONS.copy()
  179. new_key_versions.pop(0)
  180. new_handler = CookieTestRequestHandler(
  181. cookie_secret=new_key_versions, key_version=1
  182. )
  183. new_handler._cookies = handler._cookies
  184. self.assertEqual(new_handler.get_secure_cookie("foo"), None)
  185. class FinalReturnTest(WebTestCase):
  186. def get_handlers(self):
  187. test = self
  188. class FinishHandler(RequestHandler):
  189. @gen.coroutine
  190. def get(self):
  191. test.final_return = self.finish()
  192. yield test.final_return
  193. @gen.coroutine
  194. def post(self):
  195. self.write("hello,")
  196. yield self.flush()
  197. test.final_return = self.finish("world")
  198. yield test.final_return
  199. class RenderHandler(RequestHandler):
  200. def create_template_loader(self, path):
  201. return DictLoader({"foo.html": "hi"})
  202. @gen.coroutine
  203. def get(self):
  204. test.final_return = self.render("foo.html")
  205. return [("/finish", FinishHandler), ("/render", RenderHandler)]
  206. def get_app_kwargs(self):
  207. return dict(template_path="FinalReturnTest")
  208. def test_finish_method_return_future(self):
  209. response = self.fetch(self.get_url("/finish"))
  210. self.assertEqual(response.code, 200)
  211. self.assertIsInstance(self.final_return, Future)
  212. self.assertTrue(self.final_return.done())
  213. response = self.fetch(self.get_url("/finish"), method="POST", body=b"")
  214. self.assertEqual(response.code, 200)
  215. self.assertIsInstance(self.final_return, Future)
  216. self.assertTrue(self.final_return.done())
  217. def test_render_method_return_future(self):
  218. response = self.fetch(self.get_url("/render"))
  219. self.assertEqual(response.code, 200)
  220. self.assertIsInstance(self.final_return, Future)
  221. class CookieTest(WebTestCase):
  222. def get_handlers(self):
  223. class SetCookieHandler(RequestHandler):
  224. def get(self):
  225. # Try setting cookies with different argument types
  226. # to ensure that everything gets encoded correctly
  227. self.set_cookie("str", "asdf")
  228. self.set_cookie("unicode", u"qwer")
  229. self.set_cookie("bytes", b"zxcv")
  230. class GetCookieHandler(RequestHandler):
  231. def get(self):
  232. self.write(self.get_cookie("foo", "default"))
  233. class SetCookieDomainHandler(RequestHandler):
  234. def get(self):
  235. # unicode domain and path arguments shouldn't break things
  236. # either (see bug #285)
  237. self.set_cookie("unicode_args", "blah", domain=u"foo.com", path=u"/foo")
  238. class SetCookieSpecialCharHandler(RequestHandler):
  239. def get(self):
  240. self.set_cookie("equals", "a=b")
  241. self.set_cookie("semicolon", "a;b")
  242. self.set_cookie("quote", 'a"b')
  243. class SetCookieOverwriteHandler(RequestHandler):
  244. def get(self):
  245. self.set_cookie("a", "b", domain="example.com")
  246. self.set_cookie("c", "d", domain="example.com")
  247. # A second call with the same name clobbers the first.
  248. # Attributes from the first call are not carried over.
  249. self.set_cookie("a", "e")
  250. class SetCookieMaxAgeHandler(RequestHandler):
  251. def get(self):
  252. self.set_cookie("foo", "bar", max_age=10)
  253. class SetCookieExpiresDaysHandler(RequestHandler):
  254. def get(self):
  255. self.set_cookie("foo", "bar", expires_days=10)
  256. class SetCookieFalsyFlags(RequestHandler):
  257. def get(self):
  258. self.set_cookie("a", "1", secure=True)
  259. self.set_cookie("b", "1", secure=False)
  260. self.set_cookie("c", "1", httponly=True)
  261. self.set_cookie("d", "1", httponly=False)
  262. return [
  263. ("/set", SetCookieHandler),
  264. ("/get", GetCookieHandler),
  265. ("/set_domain", SetCookieDomainHandler),
  266. ("/special_char", SetCookieSpecialCharHandler),
  267. ("/set_overwrite", SetCookieOverwriteHandler),
  268. ("/set_max_age", SetCookieMaxAgeHandler),
  269. ("/set_expires_days", SetCookieExpiresDaysHandler),
  270. ("/set_falsy_flags", SetCookieFalsyFlags),
  271. ]
  272. def test_set_cookie(self):
  273. response = self.fetch("/set")
  274. self.assertEqual(
  275. sorted(response.headers.get_list("Set-Cookie")),
  276. ["bytes=zxcv; Path=/", "str=asdf; Path=/", "unicode=qwer; Path=/"],
  277. )
  278. def test_get_cookie(self):
  279. response = self.fetch("/get", headers={"Cookie": "foo=bar"})
  280. self.assertEqual(response.body, b"bar")
  281. response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
  282. self.assertEqual(response.body, b"bar")
  283. response = self.fetch("/get", headers={"Cookie": "/=exception;"})
  284. self.assertEqual(response.body, b"default")
  285. def test_set_cookie_domain(self):
  286. response = self.fetch("/set_domain")
  287. self.assertEqual(
  288. response.headers.get_list("Set-Cookie"),
  289. ["unicode_args=blah; Domain=foo.com; Path=/foo"],
  290. )
  291. def test_cookie_special_char(self):
  292. response = self.fetch("/special_char")
  293. headers = sorted(response.headers.get_list("Set-Cookie"))
  294. self.assertEqual(len(headers), 3)
  295. self.assertEqual(headers[0], 'equals="a=b"; Path=/')
  296. self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
  297. # python 2.7 octal-escapes the semicolon; older versions leave it alone
  298. self.assertTrue(
  299. headers[2] in ('semicolon="a;b"; Path=/', 'semicolon="a\\073b"; Path=/'),
  300. headers[2],
  301. )
  302. data = [
  303. ("foo=a=b", "a=b"),
  304. ('foo="a=b"', "a=b"),
  305. ('foo="a;b"', '"a'), # even quoted, ";" is a delimiter
  306. ("foo=a\\073b", "a\\073b"), # escapes only decoded in quotes
  307. ('foo="a\\073b"', "a;b"),
  308. ('foo="a\\"b"', 'a"b'),
  309. ]
  310. for header, expected in data:
  311. logging.debug("trying %r", header)
  312. response = self.fetch("/get", headers={"Cookie": header})
  313. self.assertEqual(response.body, utf8(expected))
  314. def test_set_cookie_overwrite(self):
  315. response = self.fetch("/set_overwrite")
  316. headers = response.headers.get_list("Set-Cookie")
  317. self.assertEqual(
  318. sorted(headers), ["a=e; Path=/", "c=d; Domain=example.com; Path=/"]
  319. )
  320. def test_set_cookie_max_age(self):
  321. response = self.fetch("/set_max_age")
  322. headers = response.headers.get_list("Set-Cookie")
  323. self.assertEqual(sorted(headers), ["foo=bar; Max-Age=10; Path=/"])
  324. def test_set_cookie_expires_days(self):
  325. response = self.fetch("/set_expires_days")
  326. header = response.headers.get("Set-Cookie")
  327. match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header)
  328. assert match is not None
  329. expires = datetime.datetime.utcnow() + datetime.timedelta(days=10)
  330. parsed = email.utils.parsedate(match.groupdict()["expires"])
  331. assert parsed is not None
  332. header_expires = datetime.datetime(*parsed[:6])
  333. self.assertTrue(abs((expires - header_expires).total_seconds()) < 10)
  334. def test_set_cookie_false_flags(self):
  335. response = self.fetch("/set_falsy_flags")
  336. headers = sorted(response.headers.get_list("Set-Cookie"))
  337. # The secure and httponly headers are capitalized in py35 and
  338. # lowercase in older versions.
  339. self.assertEqual(headers[0].lower(), "a=1; path=/; secure")
  340. self.assertEqual(headers[1].lower(), "b=1; path=/")
  341. self.assertEqual(headers[2].lower(), "c=1; httponly; path=/")
  342. self.assertEqual(headers[3].lower(), "d=1; path=/")
  343. class AuthRedirectRequestHandler(RequestHandler):
  344. def initialize(self, login_url):
  345. self.login_url = login_url
  346. def get_login_url(self):
  347. return self.login_url
  348. @authenticated
  349. def get(self):
  350. # we'll never actually get here because the test doesn't follow redirects
  351. self.send_error(500)
  352. class AuthRedirectTest(WebTestCase):
  353. def get_handlers(self):
  354. return [
  355. ("/relative", AuthRedirectRequestHandler, dict(login_url="/login")),
  356. (
  357. "/absolute",
  358. AuthRedirectRequestHandler,
  359. dict(login_url="http://example.com/login"),
  360. ),
  361. ]
  362. def test_relative_auth_redirect(self):
  363. response = self.fetch(self.get_url("/relative"), follow_redirects=False)
  364. self.assertEqual(response.code, 302)
  365. self.assertEqual(response.headers["Location"], "/login?next=%2Frelative")
  366. def test_absolute_auth_redirect(self):
  367. response = self.fetch(self.get_url("/absolute"), follow_redirects=False)
  368. self.assertEqual(response.code, 302)
  369. self.assertTrue(
  370. re.match(
  371. r"http://example.com/login\?next=http%3A%2F%2F127.0.0.1%3A[0-9]+%2Fabsolute",
  372. response.headers["Location"],
  373. ),
  374. response.headers["Location"],
  375. )
  376. class ConnectionCloseHandler(RequestHandler):
  377. def initialize(self, test):
  378. self.test = test
  379. @gen.coroutine
  380. def get(self):
  381. self.test.on_handler_waiting()
  382. yield self.test.cleanup_event.wait()
  383. def on_connection_close(self):
  384. self.test.on_connection_close()
  385. class ConnectionCloseTest(WebTestCase):
  386. def get_handlers(self):
  387. self.cleanup_event = Event()
  388. return [("/", ConnectionCloseHandler, dict(test=self))]
  389. def test_connection_close(self):
  390. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  391. s.connect(("127.0.0.1", self.get_http_port()))
  392. self.stream = IOStream(s)
  393. self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
  394. self.wait()
  395. # Let the hanging coroutine clean up after itself
  396. self.cleanup_event.set()
  397. self.io_loop.run_sync(lambda: gen.sleep(0))
  398. def on_handler_waiting(self):
  399. logging.debug("handler waiting")
  400. self.stream.close()
  401. def on_connection_close(self):
  402. logging.debug("connection closed")
  403. self.stop()
  404. class EchoHandler(RequestHandler):
  405. def get(self, *path_args):
  406. # Type checks: web.py interfaces convert argument values to
  407. # unicode strings (by default, but see also decode_argument).
  408. # In httpserver.py (i.e. self.request.arguments), they're left
  409. # as bytes. Keys are always native strings.
  410. for key in self.request.arguments:
  411. if type(key) != str:
  412. raise Exception("incorrect type for key: %r" % type(key))
  413. for value in self.request.arguments[key]:
  414. if type(value) != bytes:
  415. raise Exception("incorrect type for value: %r" % type(value))
  416. for value in self.get_arguments(key):
  417. if type(value) != unicode_type:
  418. raise Exception("incorrect type for value: %r" % type(value))
  419. for arg in path_args:
  420. if type(arg) != unicode_type:
  421. raise Exception("incorrect type for path arg: %r" % type(arg))
  422. self.write(
  423. dict(
  424. path=self.request.path,
  425. path_args=path_args,
  426. args=recursive_unicode(self.request.arguments),
  427. )
  428. )
  429. class RequestEncodingTest(WebTestCase):
  430. def get_handlers(self):
  431. return [("/group/(.*)", EchoHandler), ("/slashes/([^/]*)/([^/]*)", EchoHandler)]
  432. def fetch_json(self, path):
  433. return json_decode(self.fetch(path).body)
  434. def test_group_question_mark(self):
  435. # Ensure that url-encoded question marks are handled properly
  436. self.assertEqual(
  437. self.fetch_json("/group/%3F"),
  438. dict(path="/group/%3F", path_args=["?"], args={}),
  439. )
  440. self.assertEqual(
  441. self.fetch_json("/group/%3F?%3F=%3F"),
  442. dict(path="/group/%3F", path_args=["?"], args={"?": ["?"]}),
  443. )
  444. def test_group_encoding(self):
  445. # Path components and query arguments should be decoded the same way
  446. self.assertEqual(
  447. self.fetch_json("/group/%C3%A9?arg=%C3%A9"),
  448. {
  449. u"path": u"/group/%C3%A9",
  450. u"path_args": [u"\u00e9"],
  451. u"args": {u"arg": [u"\u00e9"]},
  452. },
  453. )
  454. def test_slashes(self):
  455. # Slashes may be escaped to appear as a single "directory" in the path,
  456. # but they are then unescaped when passed to the get() method.
  457. self.assertEqual(
  458. self.fetch_json("/slashes/foo/bar"),
  459. dict(path="/slashes/foo/bar", path_args=["foo", "bar"], args={}),
  460. )
  461. self.assertEqual(
  462. self.fetch_json("/slashes/a%2Fb/c%2Fd"),
  463. dict(path="/slashes/a%2Fb/c%2Fd", path_args=["a/b", "c/d"], args={}),
  464. )
  465. def test_error(self):
  466. # Percent signs (encoded as %25) should not mess up printf-style
  467. # messages in logs
  468. with ExpectLog(gen_log, ".*Invalid unicode"):
  469. self.fetch("/group/?arg=%25%e9")
  470. class TypeCheckHandler(RequestHandler):
  471. def prepare(self):
  472. self.errors = {} # type: typing.Dict[str, str]
  473. self.check_type("status", self.get_status(), int)
  474. # get_argument is an exception from the general rule of using
  475. # type str for non-body data mainly for historical reasons.
  476. self.check_type("argument", self.get_argument("foo"), unicode_type)
  477. self.check_type("cookie_key", list(self.cookies.keys())[0], str)
  478. self.check_type("cookie_value", list(self.cookies.values())[0].value, str)
  479. # Secure cookies return bytes because they can contain arbitrary
  480. # data, but regular cookies are native strings.
  481. if list(self.cookies.keys()) != ["asdf"]:
  482. raise Exception(
  483. "unexpected values for cookie keys: %r" % self.cookies.keys()
  484. )
  485. self.check_type("get_secure_cookie", self.get_secure_cookie("asdf"), bytes)
  486. self.check_type("get_cookie", self.get_cookie("asdf"), str)
  487. self.check_type("xsrf_token", self.xsrf_token, bytes)
  488. self.check_type("xsrf_form_html", self.xsrf_form_html(), str)
  489. self.check_type("reverse_url", self.reverse_url("typecheck", "foo"), str)
  490. self.check_type("request_summary", self._request_summary(), str)
  491. def get(self, path_component):
  492. # path_component uses type unicode instead of str for consistency
  493. # with get_argument()
  494. self.check_type("path_component", path_component, unicode_type)
  495. self.write(self.errors)
  496. def post(self, path_component):
  497. self.check_type("path_component", path_component, unicode_type)
  498. self.write(self.errors)
  499. def check_type(self, name, obj, expected_type):
  500. actual_type = type(obj)
  501. if expected_type != actual_type:
  502. self.errors[name] = "expected %s, got %s" % (expected_type, actual_type)
  503. class DecodeArgHandler(RequestHandler):
  504. def decode_argument(self, value, name=None):
  505. if type(value) != bytes:
  506. raise Exception("unexpected type for value: %r" % type(value))
  507. # use self.request.arguments directly to avoid recursion
  508. if "encoding" in self.request.arguments:
  509. return value.decode(to_unicode(self.request.arguments["encoding"][0]))
  510. else:
  511. return value
  512. def get(self, arg):
  513. def describe(s):
  514. if type(s) == bytes:
  515. return ["bytes", native_str(binascii.b2a_hex(s))]
  516. elif type(s) == unicode_type:
  517. return ["unicode", s]
  518. raise Exception("unknown type")
  519. self.write({"path": describe(arg), "query": describe(self.get_argument("foo"))})
  520. class LinkifyHandler(RequestHandler):
  521. def get(self):
  522. self.render("linkify.html", message="http://example.com")
  523. class UIModuleResourceHandler(RequestHandler):
  524. def get(self):
  525. self.render("page.html", entries=[1, 2])
  526. class OptionalPathHandler(RequestHandler):
  527. def get(self, path):
  528. self.write({"path": path})
  529. class MultiHeaderHandler(RequestHandler):
  530. def get(self):
  531. self.set_header("x-overwrite", "1")
  532. self.set_header("X-Overwrite", 2)
  533. self.add_header("x-multi", 3)
  534. self.add_header("X-Multi", "4")
  535. class RedirectHandler(RequestHandler):
  536. def get(self):
  537. if self.get_argument("permanent", None) is not None:
  538. self.redirect("/", permanent=int(self.get_argument("permanent")))
  539. elif self.get_argument("status", None) is not None:
  540. self.redirect("/", status=int(self.get_argument("status")))
  541. else:
  542. raise Exception("didn't get permanent or status arguments")
  543. class EmptyFlushCallbackHandler(RequestHandler):
  544. @gen.coroutine
  545. def get(self):
  546. # Ensure that the flush callback is run whether or not there
  547. # was any output. The gen.Task and direct yield forms are
  548. # equivalent.
  549. yield self.flush() # "empty" flush, but writes headers
  550. yield self.flush() # empty flush
  551. self.write("o")
  552. yield self.flush() # flushes the "o"
  553. yield self.flush() # empty flush
  554. self.finish("k")
  555. class HeaderInjectionHandler(RequestHandler):
  556. def get(self):
  557. try:
  558. self.set_header("X-Foo", "foo\r\nX-Bar: baz")
  559. raise Exception("Didn't get expected exception")
  560. except ValueError as e:
  561. if "Unsafe header value" in str(e):
  562. self.finish(b"ok")
  563. else:
  564. raise
  565. class GetArgumentHandler(RequestHandler):
  566. def prepare(self):
  567. if self.get_argument("source", None) == "query":
  568. method = self.get_query_argument
  569. elif self.get_argument("source", None) == "body":
  570. method = self.get_body_argument
  571. else:
  572. method = self.get_argument
  573. self.finish(method("foo", "default"))
  574. class GetArgumentsHandler(RequestHandler):
  575. def prepare(self):
  576. self.finish(
  577. dict(
  578. default=self.get_arguments("foo"),
  579. query=self.get_query_arguments("foo"),
  580. body=self.get_body_arguments("foo"),
  581. )
  582. )
  583. # This test was shared with wsgi_test.py; now the name is meaningless.
  584. class WSGISafeWebTest(WebTestCase):
  585. COOKIE_SECRET = "WebTest.COOKIE_SECRET"
  586. def get_app_kwargs(self):
  587. loader = DictLoader(
  588. {
  589. "linkify.html": "{% module linkify(message) %}",
  590. "page.html": """\
  591. <html><head></head><body>
  592. {% for e in entries %}
  593. {% module Template("entry.html", entry=e) %}
  594. {% end %}
  595. </body></html>""",
  596. "entry.html": """\
  597. {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }",
  598. embedded_javascript="js_embed()",
  599. css_files=["/base.css", "/foo.css"],
  600. javascript_files="/common.js",
  601. html_head="<meta>",
  602. html_body='<script src="/analytics.js"/>') }}
  603. <div class="entry">...</div>""",
  604. }
  605. )
  606. return dict(
  607. template_loader=loader,
  608. autoescape="xhtml_escape",
  609. cookie_secret=self.COOKIE_SECRET,
  610. )
  611. def tearDown(self):
  612. super(WSGISafeWebTest, self).tearDown()
  613. RequestHandler._template_loaders.clear()
  614. def get_handlers(self):
  615. urls = [
  616. url("/typecheck/(.*)", TypeCheckHandler, name="typecheck"),
  617. url("/decode_arg/(.*)", DecodeArgHandler, name="decode_arg"),
  618. url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
  619. url("/linkify", LinkifyHandler),
  620. url("/uimodule_resources", UIModuleResourceHandler),
  621. url("/optional_path/(.+)?", OptionalPathHandler),
  622. url("/multi_header", MultiHeaderHandler),
  623. url("/redirect", RedirectHandler),
  624. url(
  625. "/web_redirect_permanent",
  626. WebRedirectHandler,
  627. {"url": "/web_redirect_newpath"},
  628. ),
  629. url(
  630. "/web_redirect",
  631. WebRedirectHandler,
  632. {"url": "/web_redirect_newpath", "permanent": False},
  633. ),
  634. url(
  635. "//web_redirect_double_slash",
  636. WebRedirectHandler,
  637. {"url": "/web_redirect_newpath"},
  638. ),
  639. url("/header_injection", HeaderInjectionHandler),
  640. url("/get_argument", GetArgumentHandler),
  641. url("/get_arguments", GetArgumentsHandler),
  642. ]
  643. return urls
  644. def fetch_json(self, *args, **kwargs):
  645. response = self.fetch(*args, **kwargs)
  646. response.rethrow()
  647. return json_decode(response.body)
  648. def test_types(self):
  649. cookie_value = to_unicode(
  650. create_signed_value(self.COOKIE_SECRET, "asdf", "qwer")
  651. )
  652. response = self.fetch(
  653. "/typecheck/asdf?foo=bar", headers={"Cookie": "asdf=" + cookie_value}
  654. )
  655. data = json_decode(response.body)
  656. self.assertEqual(data, {})
  657. response = self.fetch(
  658. "/typecheck/asdf?foo=bar",
  659. method="POST",
  660. headers={"Cookie": "asdf=" + cookie_value},
  661. body="foo=bar",
  662. )
  663. def test_decode_argument(self):
  664. # These urls all decode to the same thing
  665. urls = [
  666. "/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
  667. "/decode_arg/%E9?foo=%E9&encoding=latin1",
  668. "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
  669. ]
  670. for req_url in urls:
  671. response = self.fetch(req_url)
  672. response.rethrow()
  673. data = json_decode(response.body)
  674. self.assertEqual(
  675. data,
  676. {u"path": [u"unicode", u"\u00e9"], u"query": [u"unicode", u"\u00e9"]},
  677. )
  678. response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
  679. response.rethrow()
  680. data = json_decode(response.body)
  681. self.assertEqual(
  682. data, {u"path": [u"bytes", u"c3a9"], u"query": [u"bytes", u"c3a9"]}
  683. )
  684. def test_decode_argument_invalid_unicode(self):
  685. # test that invalid unicode in URLs causes 400, not 500
  686. with ExpectLog(gen_log, ".*Invalid unicode.*"):
  687. response = self.fetch("/typecheck/invalid%FF")
  688. self.assertEqual(response.code, 400)
  689. response = self.fetch("/typecheck/invalid?foo=%FF")
  690. self.assertEqual(response.code, 400)
  691. def test_decode_argument_plus(self):
  692. # These urls are all equivalent.
  693. urls = [
  694. "/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8",
  695. "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8",
  696. ]
  697. for req_url in urls:
  698. response = self.fetch(req_url)
  699. response.rethrow()
  700. data = json_decode(response.body)
  701. self.assertEqual(
  702. data,
  703. {u"path": [u"unicode", u"1 + 1"], u"query": [u"unicode", u"1 + 1"]},
  704. )
  705. def test_reverse_url(self):
  706. self.assertEqual(self.app.reverse_url("decode_arg", "foo"), "/decode_arg/foo")
  707. self.assertEqual(self.app.reverse_url("decode_arg", 42), "/decode_arg/42")
  708. self.assertEqual(self.app.reverse_url("decode_arg", b"\xe9"), "/decode_arg/%E9")
  709. self.assertEqual(
  710. self.app.reverse_url("decode_arg", u"\u00e9"), "/decode_arg/%C3%A9"
  711. )
  712. self.assertEqual(
  713. self.app.reverse_url("decode_arg", "1 + 1"), "/decode_arg/1%20%2B%201"
  714. )
  715. def test_uimodule_unescaped(self):
  716. response = self.fetch("/linkify")
  717. self.assertEqual(
  718. response.body, b'<a href="http://example.com">http://example.com</a>'
  719. )
  720. def test_uimodule_resources(self):
  721. response = self.fetch("/uimodule_resources")
  722. self.assertEqual(
  723. response.body,
  724. b"""\
  725. <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
  726. <style type="text/css">
  727. .entry { margin-bottom: 1em; }
  728. </style>
  729. <meta>
  730. </head><body>
  731. <div class="entry">...</div>
  732. <div class="entry">...</div>
  733. <script src="/common.js" type="text/javascript"></script>
  734. <script type="text/javascript">
  735. //<![CDATA[
  736. js_embed()
  737. //]]>
  738. </script>
  739. <script src="/analytics.js"/>
  740. </body></html>""", # noqa: E501
  741. )
  742. def test_optional_path(self):
  743. self.assertEqual(self.fetch_json("/optional_path/foo"), {u"path": u"foo"})
  744. self.assertEqual(self.fetch_json("/optional_path/"), {u"path": None})
  745. def test_multi_header(self):
  746. response = self.fetch("/multi_header")
  747. self.assertEqual(response.headers["x-overwrite"], "2")
  748. self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
  749. def test_redirect(self):
  750. response = self.fetch("/redirect?permanent=1", follow_redirects=False)
  751. self.assertEqual(response.code, 301)
  752. response = self.fetch("/redirect?permanent=0", follow_redirects=False)
  753. self.assertEqual(response.code, 302)
  754. response = self.fetch("/redirect?status=307", follow_redirects=False)
  755. self.assertEqual(response.code, 307)
  756. def test_web_redirect(self):
  757. response = self.fetch("/web_redirect_permanent", follow_redirects=False)
  758. self.assertEqual(response.code, 301)
  759. self.assertEqual(response.headers["Location"], "/web_redirect_newpath")
  760. response = self.fetch("/web_redirect", follow_redirects=False)
  761. self.assertEqual(response.code, 302)
  762. self.assertEqual(response.headers["Location"], "/web_redirect_newpath")
  763. def test_web_redirect_double_slash(self):
  764. response = self.fetch("//web_redirect_double_slash", follow_redirects=False)
  765. self.assertEqual(response.code, 301)
  766. self.assertEqual(response.headers["Location"], "/web_redirect_newpath")
  767. def test_header_injection(self):
  768. response = self.fetch("/header_injection")
  769. self.assertEqual(response.body, b"ok")
  770. def test_get_argument(self):
  771. response = self.fetch("/get_argument?foo=bar")
  772. self.assertEqual(response.body, b"bar")
  773. response = self.fetch("/get_argument?foo=")
  774. self.assertEqual(response.body, b"")
  775. response = self.fetch("/get_argument")
  776. self.assertEqual(response.body, b"default")
  777. # Test merging of query and body arguments.
  778. # In singular form, body arguments take precedence over query arguments.
  779. body = urllib.parse.urlencode(dict(foo="hello"))
  780. response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
  781. self.assertEqual(response.body, b"hello")
  782. # In plural methods they are merged.
  783. response = self.fetch("/get_arguments?foo=bar", method="POST", body=body)
  784. self.assertEqual(
  785. json_decode(response.body),
  786. dict(default=["bar", "hello"], query=["bar"], body=["hello"]),
  787. )
  788. def test_get_query_arguments(self):
  789. # send as a post so we can ensure the separation between query
  790. # string and body arguments.
  791. body = urllib.parse.urlencode(dict(foo="hello"))
  792. response = self.fetch(
  793. "/get_argument?source=query&foo=bar", method="POST", body=body
  794. )
  795. self.assertEqual(response.body, b"bar")
  796. response = self.fetch(
  797. "/get_argument?source=query&foo=", method="POST", body=body
  798. )
  799. self.assertEqual(response.body, b"")
  800. response = self.fetch("/get_argument?source=query", method="POST", body=body)
  801. self.assertEqual(response.body, b"default")
  802. def test_get_body_arguments(self):
  803. body = urllib.parse.urlencode(dict(foo="bar"))
  804. response = self.fetch(
  805. "/get_argument?source=body&foo=hello", method="POST", body=body
  806. )
  807. self.assertEqual(response.body, b"bar")
  808. body = urllib.parse.urlencode(dict(foo=""))
  809. response = self.fetch(
  810. "/get_argument?source=body&foo=hello", method="POST", body=body
  811. )
  812. self.assertEqual(response.body, b"")
  813. body = urllib.parse.urlencode(dict())
  814. response = self.fetch(
  815. "/get_argument?source=body&foo=hello", method="POST", body=body
  816. )
  817. self.assertEqual(response.body, b"default")
  818. def test_no_gzip(self):
  819. response = self.fetch("/get_argument")
  820. self.assertNotIn("Accept-Encoding", response.headers.get("Vary", ""))
  821. self.assertNotIn("gzip", response.headers.get("Content-Encoding", ""))
  822. class NonWSGIWebTests(WebTestCase):
  823. def get_handlers(self):
  824. return [("/empty_flush", EmptyFlushCallbackHandler)]
  825. def test_empty_flush(self):
  826. response = self.fetch("/empty_flush")
  827. self.assertEqual(response.body, b"ok")
  828. class ErrorResponseTest(WebTestCase):
  829. def get_handlers(self):
  830. class DefaultHandler(RequestHandler):
  831. def get(self):
  832. if self.get_argument("status", None):
  833. raise HTTPError(int(self.get_argument("status")))
  834. 1 / 0
  835. class WriteErrorHandler(RequestHandler):
  836. def get(self):
  837. if self.get_argument("status", None):
  838. self.send_error(int(self.get_argument("status")))
  839. else:
  840. 1 / 0
  841. def write_error(self, status_code, **kwargs):
  842. self.set_header("Content-Type", "text/plain")
  843. if "exc_info" in kwargs:
  844. self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
  845. else:
  846. self.write("Status: %d" % status_code)
  847. class FailedWriteErrorHandler(RequestHandler):
  848. def get(self):
  849. 1 / 0
  850. def write_error(self, status_code, **kwargs):
  851. raise Exception("exception in write_error")
  852. return [
  853. url("/default", DefaultHandler),
  854. url("/write_error", WriteErrorHandler),
  855. url("/failed_write_error", FailedWriteErrorHandler),
  856. ]
  857. def test_default(self):
  858. with ExpectLog(app_log, "Uncaught exception"):
  859. response = self.fetch("/default")
  860. self.assertEqual(response.code, 500)
  861. self.assertTrue(b"500: Internal Server Error" in response.body)
  862. response = self.fetch("/default?status=503")
  863. self.assertEqual(response.code, 503)
  864. self.assertTrue(b"503: Service Unavailable" in response.body)
  865. response = self.fetch("/default?status=435")
  866. self.assertEqual(response.code, 435)
  867. self.assertTrue(b"435: Unknown" in response.body)
  868. def test_write_error(self):
  869. with ExpectLog(app_log, "Uncaught exception"):
  870. response = self.fetch("/write_error")
  871. self.assertEqual(response.code, 500)
  872. self.assertEqual(b"Exception: ZeroDivisionError", response.body)
  873. response = self.fetch("/write_error?status=503")
  874. self.assertEqual(response.code, 503)
  875. self.assertEqual(b"Status: 503", response.body)
  876. def test_failed_write_error(self):
  877. with ExpectLog(app_log, "Uncaught exception"):
  878. response = self.fetch("/failed_write_error")
  879. self.assertEqual(response.code, 500)
  880. self.assertEqual(b"", response.body)
  881. class StaticFileTest(WebTestCase):
  882. # The expected MD5 hash of robots.txt, used in tests that call
  883. # StaticFileHandler.get_version
  884. robots_txt_hash = b"f71d20196d4caf35b6a670db8c70b03d"
  885. static_dir = os.path.join(os.path.dirname(__file__), "static")
  886. def get_handlers(self):
  887. class StaticUrlHandler(RequestHandler):
  888. def get(self, path):
  889. with_v = int(self.get_argument("include_version", 1))
  890. self.write(self.static_url(path, include_version=with_v))
  891. class AbsoluteStaticUrlHandler(StaticUrlHandler):
  892. include_host = True
  893. class OverrideStaticUrlHandler(RequestHandler):
  894. def get(self, path):
  895. do_include = bool(self.get_argument("include_host"))
  896. self.include_host = not do_include
  897. regular_url = self.static_url(path)
  898. override_url = self.static_url(path, include_host=do_include)
  899. if override_url == regular_url:
  900. return self.write(str(False))
  901. protocol = self.request.protocol + "://"
  902. protocol_length = len(protocol)
  903. check_regular = regular_url.find(protocol, 0, protocol_length)
  904. check_override = override_url.find(protocol, 0, protocol_length)
  905. if do_include:
  906. result = check_override == 0 and check_regular == -1
  907. else:
  908. result = check_override == -1 and check_regular == 0
  909. self.write(str(result))
  910. return [
  911. ("/static_url/(.*)", StaticUrlHandler),
  912. ("/abs_static_url/(.*)", AbsoluteStaticUrlHandler),
  913. ("/override_static_url/(.*)", OverrideStaticUrlHandler),
  914. ("/root_static/(.*)", StaticFileHandler, dict(path="/")),
  915. ]
  916. def get_app_kwargs(self):
  917. return dict(static_path=relpath("static"))
  918. def test_static_files(self):
  919. response = self.fetch("/robots.txt")
  920. self.assertTrue(b"Disallow: /" in response.body)
  921. response = self.fetch("/static/robots.txt")
  922. self.assertTrue(b"Disallow: /" in response.body)
  923. self.assertEqual(response.headers.get("Content-Type"), "text/plain")
  924. def test_static_compressed_files(self):
  925. response = self.fetch("/static/sample.xml.gz")
  926. self.assertEqual(response.headers.get("Content-Type"), "application/gzip")
  927. response = self.fetch("/static/sample.xml.bz2")
  928. self.assertEqual(
  929. response.headers.get("Content-Type"), "application/octet-stream"
  930. )
  931. # make sure the uncompressed file still has the correct type
  932. response = self.fetch("/static/sample.xml")
  933. self.assertTrue(
  934. response.headers.get("Content-Type") in set(("text/xml", "application/xml"))
  935. )
  936. def test_static_url(self):
  937. response = self.fetch("/static_url/robots.txt")
  938. self.assertEqual(response.body, b"/static/robots.txt?v=" + self.robots_txt_hash)
  939. def test_absolute_static_url(self):
  940. response = self.fetch("/abs_static_url/robots.txt")
  941. self.assertEqual(
  942. response.body,
  943. (utf8(self.get_url("/")) + b"static/robots.txt?v=" + self.robots_txt_hash),
  944. )
  945. def test_relative_version_exclusion(self):
  946. response = self.fetch("/static_url/robots.txt?include_version=0")
  947. self.assertEqual(response.body, b"/static/robots.txt")
  948. def test_absolute_version_exclusion(self):
  949. response = self.fetch("/abs_static_url/robots.txt?include_version=0")
  950. self.assertEqual(response.body, utf8(self.get_url("/") + "static/robots.txt"))
  951. def test_include_host_override(self):
  952. self._trigger_include_host_check(False)
  953. self._trigger_include_host_check(True)
  954. def _trigger_include_host_check(self, include_host):
  955. path = "/override_static_url/robots.txt?include_host=%s"
  956. response = self.fetch(path % int(include_host))
  957. self.assertEqual(response.body, utf8(str(True)))
  958. def get_and_head(self, *args, **kwargs):
  959. """Performs a GET and HEAD request and returns the GET response.
  960. Fails if any ``Content-*`` headers returned by the two requests
  961. differ.
  962. """
  963. head_response = self.fetch(*args, method="HEAD", **kwargs)
  964. get_response = self.fetch(*args, method="GET", **kwargs)
  965. content_headers = set()
  966. for h in itertools.chain(head_response.headers, get_response.headers):
  967. if h.startswith("Content-"):
  968. content_headers.add(h)
  969. for h in content_headers:
  970. self.assertEqual(
  971. head_response.headers.get(h),
  972. get_response.headers.get(h),
  973. "%s differs between GET (%s) and HEAD (%s)"
  974. % (h, head_response.headers.get(h), get_response.headers.get(h)),
  975. )
  976. return get_response
  977. def test_static_304_if_modified_since(self):
  978. response1 = self.get_and_head("/static/robots.txt")
  979. response2 = self.get_and_head(
  980. "/static/robots.txt",
  981. headers={"If-Modified-Since": response1.headers["Last-Modified"]},
  982. )
  983. self.assertEqual(response2.code, 304)
  984. self.assertTrue("Content-Length" not in response2.headers)
  985. self.assertTrue("Last-Modified" not in response2.headers)
  986. def test_static_304_if_none_match(self):
  987. response1 = self.get_and_head("/static/robots.txt")
  988. response2 = self.get_and_head(
  989. "/static/robots.txt", headers={"If-None-Match": response1.headers["Etag"]}
  990. )
  991. self.assertEqual(response2.code, 304)
  992. def test_static_304_etag_modified_bug(self):
  993. response1 = self.get_and_head("/static/robots.txt")
  994. response2 = self.get_and_head(
  995. "/static/robots.txt",
  996. headers={
  997. "If-None-Match": '"MISMATCH"',
  998. "If-Modified-Since": response1.headers["Last-Modified"],
  999. },
  1000. )
  1001. self.assertEqual(response2.code, 200)
  1002. def test_static_if_modified_since_pre_epoch(self):
  1003. # On windows, the functions that work with time_t do not accept
  1004. # negative values, and at least one client (processing.js) seems
  1005. # to use if-modified-since 1/1/1960 as a cache-busting technique.
  1006. response = self.get_and_head(
  1007. "/static/robots.txt",
  1008. headers={"If-Modified-Since": "Fri, 01 Jan 1960 00:00:00 GMT"},
  1009. )
  1010. self.assertEqual(response.code, 200)
  1011. def test_static_if_modified_since_time_zone(self):
  1012. # Instead of the value from Last-Modified, make requests with times
  1013. # chosen just before and after the known modification time
  1014. # of the file to ensure that the right time zone is being used
  1015. # when parsing If-Modified-Since.
  1016. stat = os.stat(relpath("static/robots.txt"))
  1017. response = self.get_and_head(
  1018. "/static/robots.txt",
  1019. headers={"If-Modified-Since": format_timestamp(stat.st_mtime - 1)},
  1020. )
  1021. self.assertEqual(response.code, 200)
  1022. response = self.get_and_head(
  1023. "/static/robots.txt",
  1024. headers={"If-Modified-Since": format_timestamp(stat.st_mtime + 1)},
  1025. )
  1026. self.assertEqual(response.code, 304)
  1027. def test_static_etag(self):
  1028. response = self.get_and_head("/static/robots.txt")
  1029. self.assertEqual(
  1030. utf8(response.headers.get("Etag")), b'"' + self.robots_txt_hash + b'"'
  1031. )
  1032. def test_static_with_range(self):
  1033. response = self.get_and_head(
  1034. "/static/robots.txt", headers={"Range": "bytes=0-9"}
  1035. )
  1036. self.assertEqual(response.code, 206)
  1037. self.assertEqual(response.body, b"User-agent")
  1038. self.assertEqual(
  1039. utf8(response.headers.get("Etag")), b'"' + self.robots_txt_hash + b'"'
  1040. )
  1041. self.assertEqual(response.headers.get("Content-Length"), "10")
  1042. self.assertEqual(response.headers.get("Content-Range"), "bytes 0-9/26")
  1043. def test_static_with_range_full_file(self):
  1044. response = self.get_and_head(
  1045. "/static/robots.txt", headers={"Range": "bytes=0-"}
  1046. )
  1047. # Note: Chrome refuses to play audio if it gets an HTTP 206 in response
  1048. # to ``Range: bytes=0-`` :(
  1049. self.assertEqual(response.code, 200)
  1050. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1051. with open(robots_file_path) as f:
  1052. self.assertEqual(response.body, utf8(f.read()))
  1053. self.assertEqual(response.headers.get("Content-Length"), "26")
  1054. self.assertEqual(response.headers.get("Content-Range"), None)
  1055. def test_static_with_range_full_past_end(self):
  1056. response = self.get_and_head(
  1057. "/static/robots.txt", headers={"Range": "bytes=0-10000000"}
  1058. )
  1059. self.assertEqual(response.code, 200)
  1060. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1061. with open(robots_file_path) as f:
  1062. self.assertEqual(response.body, utf8(f.read()))
  1063. self.assertEqual(response.headers.get("Content-Length"), "26")
  1064. self.assertEqual(response.headers.get("Content-Range"), None)
  1065. def test_static_with_range_partial_past_end(self):
  1066. response = self.get_and_head(
  1067. "/static/robots.txt", headers={"Range": "bytes=1-10000000"}
  1068. )
  1069. self.assertEqual(response.code, 206)
  1070. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1071. with open(robots_file_path) as f:
  1072. self.assertEqual(response.body, utf8(f.read()[1:]))
  1073. self.assertEqual(response.headers.get("Content-Length"), "25")
  1074. self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26")
  1075. def test_static_with_range_end_edge(self):
  1076. response = self.get_and_head(
  1077. "/static/robots.txt", headers={"Range": "bytes=22-"}
  1078. )
  1079. self.assertEqual(response.body, b": /\n")
  1080. self.assertEqual(response.headers.get("Content-Length"), "4")
  1081. self.assertEqual(response.headers.get("Content-Range"), "bytes 22-25/26")
  1082. def test_static_with_range_neg_end(self):
  1083. response = self.get_and_head(
  1084. "/static/robots.txt", headers={"Range": "bytes=-4"}
  1085. )
  1086. self.assertEqual(response.body, b": /\n")
  1087. self.assertEqual(response.headers.get("Content-Length"), "4")
  1088. self.assertEqual(response.headers.get("Content-Range"), "bytes 22-25/26")
  1089. def test_static_with_range_neg_past_start(self):
  1090. response = self.get_and_head(
  1091. "/static/robots.txt", headers={"Range": "bytes=-1000000"}
  1092. )
  1093. self.assertEqual(response.code, 200)
  1094. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  1095. with open(robots_file_path) as f:
  1096. self.assertEqual(response.body, utf8(f.read()))
  1097. self.assertEqual(response.headers.get("Content-Length"), "26")
  1098. self.assertEqual(response.headers.get("Content-Range"), None)
  1099. def test_static_invalid_range(self):
  1100. response = self.get_and_head("/static/robots.txt", headers={"Range": "asdf"})
  1101. self.assertEqual(response.code, 200)
  1102. def test_static_unsatisfiable_range_zero_suffix(self):
  1103. response = self.get_and_head(
  1104. "/static/robots.txt", headers={"Range": "bytes=-0"}
  1105. )
  1106. self.assertEqual(response.headers.get("Content-Range"), "bytes */26")
  1107. self.assertEqual(response.code, 416)
  1108. def test_static_unsatisfiable_range_invalid_start(self):
  1109. response = self.get_and_head(
  1110. "/static/robots.txt", headers={"Range": "bytes=26"}
  1111. )
  1112. self.assertEqual(response.code, 416)
  1113. self.assertEqual(response.headers.get("Content-Range"), "bytes */26")
  1114. def test_static_unsatisfiable_range_end_less_than_start(self):
  1115. response = self.get_and_head(
  1116. "/static/robots.txt", headers={"Range": "bytes=10-3"}
  1117. )
  1118. self.assertEqual(response.code, 416)
  1119. self.assertEqual(response.headers.get("Content-Range"), "bytes */26")
  1120. def test_static_head(self):
  1121. response = self.fetch("/static/robots.txt", method="HEAD")
  1122. self.assertEqual(response.code, 200)
  1123. # No body was returned, but we did get the right content length.
  1124. self.assertEqual(response.body, b"")
  1125. self.assertEqual(response.headers["Content-Length"], "26")
  1126. self.assertEqual(
  1127. utf8(response.headers["Etag"]), b'"' + self.robots_txt_hash + b'"'
  1128. )
  1129. def test_static_head_range(self):
  1130. response = self.fetch(
  1131. "/static/robots.txt", method="HEAD", headers={"Range": "bytes=1-4"}
  1132. )
  1133. self.assertEqual(response.code, 206)
  1134. self.assertEqual(response.body, b"")
  1135. self.assertEqual(response.headers["Content-Length"], "4")
  1136. self.assertEqual(
  1137. utf8(response.headers["Etag"]), b'"' + self.robots_txt_hash + b'"'
  1138. )
  1139. def test_static_range_if_none_match(self):
  1140. response = self.get_and_head(
  1141. "/static/robots.txt",
  1142. headers={
  1143. "Range": "bytes=1-4",
  1144. "If-None-Match": b'"' + self.robots_txt_hash + b'"',
  1145. },
  1146. )
  1147. self.assertEqual(response.code, 304)
  1148. self.assertEqual(response.body, b"")
  1149. self.assertTrue("Content-Length" not in response.headers)
  1150. self.assertEqual(
  1151. utf8(response.headers["Etag"]), b'"' + self.robots_txt_hash + b'"'
  1152. )
  1153. def test_static_404(self):
  1154. response = self.get_and_head("/static/blarg")
  1155. self.assertEqual(response.code, 404)
  1156. def test_path_traversal_protection(self):
  1157. # curl_httpclient processes ".." on the client side, so we
  1158. # must test this with simple_httpclient.
  1159. self.http_client.close()
  1160. self.http_client = SimpleAsyncHTTPClient()
  1161. with ExpectLog(gen_log, ".*not in root static directory"):
  1162. response = self.get_and_head("/static/../static_foo.txt")
  1163. # Attempted path traversal should result in 403, not 200
  1164. # (which means the check failed and the file was served)
  1165. # or 404 (which means that the file didn't exist and
  1166. # is probably a packaging error).
  1167. self.assertEqual(response.code, 403)
  1168. @unittest.skipIf(os.name != "posix", "non-posix OS")
  1169. def test_root_static_path(self):
  1170. # Sometimes people set the StaticFileHandler's path to '/'
  1171. # to disable Tornado's path validation (in conjunction with
  1172. # their own validation in get_absolute_path). Make sure
  1173. # that the stricter validation in 4.2.1 doesn't break them.
  1174. path = os.path.join(
  1175. os.path.dirname(os.path.abspath(__file__)), "static/robots.txt"
  1176. )
  1177. response = self.get_and_head("/root_static" + urllib.parse.quote(path))
  1178. self.assertEqual(response.code, 200)
  1179. class StaticDefaultFilenameTest(WebTestCase):
  1180. def get_app_kwargs(self):
  1181. return dict(
  1182. static_path=relpath("static"),
  1183. static_handler_args=dict(default_filename="index.html"),
  1184. )
  1185. def get_handlers(self):
  1186. return []
  1187. def test_static_default_filename(self):
  1188. response = self.fetch("/static/dir/", follow_redirects=False)
  1189. self.assertEqual(response.code, 200)
  1190. self.assertEqual(b"this is the index\n", response.body)
  1191. def test_static_default_redirect(self):
  1192. response = self.fetch("/static/dir", follow_redirects=False)
  1193. self.assertEqual(response.code, 301)
  1194. self.assertTrue(response.headers["Location"].endswith("/static/dir/"))
  1195. class StaticFileWithPathTest(WebTestCase):
  1196. def get_app_kwargs(self):
  1197. return dict(
  1198. static_path=relpath("static"),
  1199. static_handler_args=dict(default_filename="index.html"),
  1200. )
  1201. def get_handlers(self):
  1202. return [("/foo/(.*)", StaticFileHandler, {"path": relpath("templates/")})]
  1203. def test_serve(self):
  1204. response = self.fetch("/foo/utf8.html")
  1205. self.assertEqual(response.body, b"H\xc3\xa9llo\n")
  1206. class CustomStaticFileTest(WebTestCase):
  1207. def get_handlers(self):
  1208. class MyStaticFileHandler(StaticFileHandler):
  1209. @classmethod
  1210. def make_static_url(cls, settings, path):
  1211. version_hash = cls.get_version(settings, path)
  1212. extension_index = path.rindex(".")
  1213. before_version = path[:extension_index]
  1214. after_version = path[(extension_index + 1) :]
  1215. return "/static/%s.%s.%s" % (
  1216. before_version,
  1217. version_hash,
  1218. after_version,
  1219. )
  1220. def parse_url_path(self, url_path):
  1221. extension_index = url_path.rindex(".")
  1222. version_index = url_path.rindex(".", 0, extension_index)
  1223. return "%s%s" % (url_path[:version_index], url_path[extension_index:])
  1224. @classmethod
  1225. def get_absolute_path(cls, settings, path):
  1226. return "CustomStaticFileTest:" + path
  1227. def validate_absolute_path(self, root, absolute_path):
  1228. return absolute_path
  1229. @classmethod
  1230. def get_content(self, path, start=None, end=None):
  1231. assert start is None and end is None
  1232. if path == "CustomStaticFileTest:foo.txt":
  1233. return b"bar"
  1234. raise Exception("unexpected path %r" % path)
  1235. def get_content_size(self):
  1236. if self.absolute_path == "CustomStaticFileTest:foo.txt":
  1237. return 3
  1238. raise Exception("unexpected path %r" % self.absolute_path)
  1239. def get_modified_time(self):
  1240. return None
  1241. @classmethod
  1242. def get_version(cls, settings, path):
  1243. return "42"
  1244. class StaticUrlHandler(RequestHandler):
  1245. def get(self, path):
  1246. self.write(self.static_url(path))
  1247. self.static_handler_class = MyStaticFileHandler
  1248. return [("/static_url/(.*)", StaticUrlHandler)]
  1249. def get_app_kwargs(self):
  1250. return dict(static_path="dummy", static_handler_class=self.static_handler_class)
  1251. def test_serve(self):
  1252. response = self.fetch("/static/foo.42.txt")
  1253. self.assertEqual(response.body, b"bar")
  1254. def test_static_url(self):
  1255. with ExpectLog(gen_log, "Could not open static file", required=False):
  1256. response = self.fetch("/static_url/foo.txt")
  1257. self.assertEqual(response.body, b"/static/foo.42.txt")
  1258. class HostMatchingTest(WebTestCase):
  1259. class Handler(RequestHandler):
  1260. def initialize(self, reply):
  1261. self.reply = reply
  1262. def get(self):
  1263. self.write(self.reply)
  1264. def get_handlers(self):
  1265. return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})]
  1266. def test_host_matching(self):
  1267. self.app.add_handlers(
  1268. "www.example.com", [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})]
  1269. )
  1270. self.app.add_handlers(
  1271. r"www\.example\.com", [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})]
  1272. )
  1273. self.app.add_handlers(
  1274. "www.example.com", [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})]
  1275. )
  1276. self.app.add_handlers(
  1277. "www.e.*e.com", [("/baz", HostMatchingTest.Handler, {"reply": "[3]"})]
  1278. )
  1279. response = self.fetch("/foo")
  1280. self.assertEqual(response.body, b"wildcard")
  1281. response = self.fetch("/bar")
  1282. self.assertEqual(response.code, 404)
  1283. response = self.fetch("/baz")
  1284. self.assertEqual(response.code, 404)
  1285. response = self.fetch("/foo", headers={"Host": "www.example.com"})
  1286. self.assertEqual(response.body, b"[0]")
  1287. response = self.fetch("/bar", headers={"Host": "www.example.com"})
  1288. self.assertEqual(response.body, b"[1]")
  1289. response = self.fetch("/baz", headers={"Host": "www.example.com"})
  1290. self.assertEqual(response.body, b"[2]")
  1291. response = self.fetch("/baz", headers={"Host": "www.exe.com"})
  1292. self.assertEqual(response.body, b"[3]")
  1293. class DefaultHostMatchingTest(WebTestCase):
  1294. def get_handlers(self):
  1295. return []
  1296. def get_app_kwargs(self):
  1297. return {"default_host": "www.example.com"}
  1298. def test_default_host_matching(self):
  1299. self.app.add_handlers(
  1300. "www.example.com", [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})]
  1301. )
  1302. self.app.add_handlers(
  1303. r"www\.example\.com", [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})]
  1304. )
  1305. self.app.add_handlers(
  1306. "www.test.com", [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})]
  1307. )
  1308. response = self.fetch("/foo")
  1309. self.assertEqual(response.body, b"[0]")
  1310. response = self.fetch("/bar")
  1311. self.assertEqual(response.body, b"[1]")
  1312. response = self.fetch("/baz")
  1313. self.assertEqual(response.code, 404)
  1314. response = self.fetch("/foo", headers={"X-Real-Ip": "127.0.0.1"})
  1315. self.assertEqual(response.code, 404)
  1316. self.app.default_host = "www.test.com"
  1317. response = self.fetch("/baz")
  1318. self.assertEqual(response.body, b"[2]")
  1319. class NamedURLSpecGroupsTest(WebTestCase):
  1320. def get_handlers(self):
  1321. class EchoHandler(RequestHandler):
  1322. def get(self, path):
  1323. self.write(path)
  1324. return [
  1325. ("/str/(?P<path>.*)", EchoHandler),
  1326. (u"/unicode/(?P<path>.*)", EchoHandler),
  1327. ]
  1328. def test_named_urlspec_groups(self):
  1329. response = self.fetch("/str/foo")
  1330. self.assertEqual(response.body, b"foo")
  1331. response = self.fetch("/unicode/bar")
  1332. self.assertEqual(response.body, b"bar")
  1333. class ClearHeaderTest(SimpleHandlerTestCase):
  1334. class Handler(RequestHandler):
  1335. def get(self):
  1336. self.set_header("h1", "foo")
  1337. self.set_header("h2", "bar")
  1338. self.clear_header("h1")
  1339. self.clear_header("nonexistent")
  1340. def test_clear_header(self):
  1341. response = self.fetch("/")
  1342. self.assertTrue("h1" not in response.headers)
  1343. self.assertEqual(response.headers["h2"], "bar")
  1344. class Header204Test(SimpleHandlerTestCase):
  1345. class Handler(RequestHandler):
  1346. def get(self):
  1347. self.set_status(204)
  1348. self.finish()
  1349. def test_204_headers(self):
  1350. response = self.fetch("/")
  1351. self.assertEqual(response.code, 204)
  1352. self.assertNotIn("Content-Length", response.headers)
  1353. self.assertNotIn("Transfer-Encoding", response.headers)
  1354. class Header304Test(SimpleHandlerTestCase):
  1355. class Handler(RequestHandler):
  1356. def get(self):
  1357. self.set_header("Content-Language", "en_US")
  1358. self.write("hello")
  1359. def test_304_headers(self):
  1360. response1 = self.fetch("/")
  1361. self.assertEqual(response1.headers["Content-Length"], "5")
  1362. self.assertEqual(response1.headers["Content-Language"], "en_US")
  1363. response2 = self.fetch(
  1364. "/", headers={"If-None-Match": response1.headers["Etag"]}
  1365. )
  1366. self.assertEqual(response2.code, 304)
  1367. self.assertTrue("Content-Length" not in response2.headers)
  1368. self.assertTrue("Content-Language" not in response2.headers)
  1369. # Not an entity header, but should not be added to 304s by chunking
  1370. self.assertTrue("Transfer-Encoding" not in response2.headers)
  1371. class StatusReasonTest(SimpleHandlerTestCase):
  1372. class Handler(RequestHandler):
  1373. def get(self):
  1374. reason = self.request.arguments.get("reason", [])
  1375. self.set_status(
  1376. int(self.get_argument("code")), reason=reason[0] if reason else None
  1377. )
  1378. def get_http_client(self):
  1379. # simple_httpclient only: curl doesn't expose the reason string
  1380. return SimpleAsyncHTTPClient()
  1381. def test_status(self):
  1382. response = self.fetch("/?code=304")
  1383. self.assertEqual(response.code, 304)
  1384. self.assertEqual(response.reason, "Not Modified")
  1385. response = self.fetch("/?code=304&reason=Foo")
  1386. self.assertEqual(response.code, 304)
  1387. self.assertEqual(response.reason, "Foo")
  1388. response = self.fetch("/?code=682&reason=Bar")
  1389. self.assertEqual(response.code, 682)
  1390. self.assertEqual(response.reason, "Bar")
  1391. response = self.fetch("/?code=682")
  1392. self.assertEqual(response.code, 682)
  1393. self.assertEqual(response.reason, "Unknown")
  1394. class DateHeaderTest(SimpleHandlerTestCase):
  1395. class Handler(RequestHandler):
  1396. def get(self):
  1397. self.write("hello")
  1398. def test_date_header(self):
  1399. response = self.fetch("/")
  1400. parsed = email.utils.parsedate(response.headers["Date"])
  1401. assert parsed is not None
  1402. header_date = datetime.datetime(*parsed[:6])
  1403. self.assertTrue(
  1404. header_date - datetime.datetime.utcnow() < datetime.timedelta(seconds=2)
  1405. )
  1406. class RaiseWithReasonTest(SimpleHandlerTestCase):
  1407. class Handler(RequestHandler):
  1408. def get(self):
  1409. raise HTTPError(682, reason="Foo")
  1410. def get_http_client(self):
  1411. # simple_httpclient only: curl doesn't expose the reason string
  1412. return SimpleAsyncHTTPClient()
  1413. def test_raise_with_reason(self):
  1414. response = self.fetch("/")
  1415. self.assertEqual(response.code, 682)
  1416. self.assertEqual(response.reason, "Foo")
  1417. self.assertIn(b"682: Foo", response.body)
  1418. def test_httperror_str(self):
  1419. self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
  1420. def test_httperror_str_from_httputil(self):
  1421. self.assertEqual(str(HTTPError(682)), "HTTP 682: Unknown")
  1422. class ErrorHandlerXSRFTest(WebTestCase):
  1423. def get_handlers(self):
  1424. # note that if the handlers list is empty we get the default_host
  1425. # redirect fallback instead of a 404, so test with both an
  1426. # explicitly defined error handler and an implicit 404.
  1427. return [("/error", ErrorHandler, dict(status_code=417))]
  1428. def get_app_kwargs(self):
  1429. return dict(xsrf_cookies=True)
  1430. def test_error_xsrf(self):
  1431. response = self.fetch("/error", method="POST", body="")
  1432. self.assertEqual(response.code, 417)
  1433. def test_404_xsrf(self):
  1434. response = self.fetch("/404", method="POST", body="")
  1435. self.assertEqual(response.code, 404)
  1436. class GzipTestCase(SimpleHandlerTestCase):
  1437. class Handler(RequestHandler):
  1438. def get(self):
  1439. for v in self.get_arguments("vary"):
  1440. self.add_header("Vary", v)
  1441. # Must write at least MIN_LENGTH bytes to activate compression.
  1442. self.write("hello world" + ("!" * GZipContentEncoding.MIN_LENGTH))
  1443. def get_app_kwargs(self):
  1444. return dict(
  1445. gzip=True, static_path=os.path.join(os.path.dirname(__file__), "static")
  1446. )
  1447. def assert_compressed(self, response):
  1448. # simple_httpclient renames the content-encoding header;
  1449. # curl_httpclient doesn't.
  1450. self.assertEqual(
  1451. response.headers.get(
  1452. "Content-Encoding", response.headers.get("X-Consumed-Content-Encoding")
  1453. ),
  1454. "gzip",
  1455. )
  1456. def test_gzip(self):
  1457. response = self.fetch("/")
  1458. self.assert_compressed(response)
  1459. self.assertEqual(response.headers["Vary"], "Accept-Encoding")
  1460. def test_gzip_static(self):
  1461. # The streaming responses in StaticFileHandler have subtle
  1462. # interactions with the gzip output so test this case separately.
  1463. response = self.fetch("/robots.txt")
  1464. self.assert_compressed(response)
  1465. self.assertEqual(response.headers["Vary"], "Accept-Encoding")
  1466. def test_gzip_not_requested(self):
  1467. response = self.fetch("/", use_gzip=False)
  1468. self.assertNotIn("Content-Encoding", response.headers)
  1469. self.assertEqual(response.headers["Vary"], "Accept-Encoding")
  1470. def test_vary_already_present(self):
  1471. response = self.fetch("/?vary=Accept-Language")
  1472. self.assert_compressed(response)
  1473. self.assertEqual(
  1474. [s.strip() for s in response.headers["Vary"].split(",")],
  1475. ["Accept-Language", "Accept-Encoding"],
  1476. )
  1477. def test_vary_already_present_multiple(self):
  1478. # Regression test for https://github.com/tornadoweb/tornado/issues/1670
  1479. response = self.fetch("/?vary=Accept-Language&vary=Cookie")
  1480. self.assert_compressed(response)
  1481. self.assertEqual(
  1482. [s.strip() for s in response.headers["Vary"].split(",")],
  1483. ["Accept-Language", "Cookie", "Accept-Encoding"],
  1484. )
  1485. class PathArgsInPrepareTest(WebTestCase):
  1486. class Handler(RequestHandler):
  1487. def prepare(self):
  1488. self.write(dict(args=self.path_args, kwargs=self.path_kwargs))
  1489. def get(self, path):
  1490. assert path == "foo"
  1491. self.finish()
  1492. def get_handlers(self):
  1493. return [("/pos/(.*)", self.Handler), ("/kw/(?P<path>.*)", self.Handler)]
  1494. def test_pos(self):
  1495. response = self.fetch("/pos/foo")
  1496. response.rethrow()
  1497. data = json_decode(response.body)
  1498. self.assertEqual(data, {"args": ["foo"], "kwargs": {}})
  1499. def test_kw(self):
  1500. response = self.fetch("/kw/foo")
  1501. response.rethrow()
  1502. data = json_decode(response.body)
  1503. self.assertEqual(data, {"args": [], "kwargs": {"path": "foo"}})
  1504. class ClearAllCookiesTest(SimpleHandlerTestCase):
  1505. class Handler(RequestHandler):
  1506. def get(self):
  1507. self.clear_all_cookies()
  1508. self.write("ok")
  1509. def test_clear_all_cookies(self):
  1510. response = self.fetch("/", headers={"Cookie": "foo=bar; baz=xyzzy"})
  1511. set_cookies = sorted(response.headers.get_list("Set-Cookie"))
  1512. # Python 3.5 sends 'baz="";'; older versions use 'baz=;'
  1513. self.assertTrue(
  1514. set_cookies[0].startswith("baz=;") or set_cookies[0].startswith('baz="";')
  1515. )
  1516. self.assertTrue(
  1517. set_cookies[1].startswith("foo=;") or set_cookies[1].startswith('foo="";')
  1518. )
  1519. class PermissionError(Exception):
  1520. pass
  1521. class ExceptionHandlerTest(SimpleHandlerTestCase):
  1522. class Handler(RequestHandler):
  1523. def get(self):
  1524. exc = self.get_argument("exc")
  1525. if exc == "http":
  1526. raise HTTPError(410, "no longer here")
  1527. elif exc == "zero":
  1528. 1 / 0
  1529. elif exc == "permission":
  1530. raise PermissionError("not allowed")
  1531. def write_error(self, status_code, **kwargs):
  1532. if "exc_info" in kwargs:
  1533. typ, value, tb = kwargs["exc_info"]
  1534. if isinstance(value, PermissionError):
  1535. self.set_status(403)
  1536. self.write("PermissionError")
  1537. return
  1538. RequestHandler.write_error(self, status_code, **kwargs)
  1539. def log_exception(self, typ, value, tb):
  1540. if isinstance(value, PermissionError):
  1541. app_log.warning("custom logging for PermissionError: %s", value.args[0])
  1542. else:
  1543. RequestHandler.log_exception(self, typ, value, tb)
  1544. def test_http_error(self):
  1545. # HTTPErrors are logged as warnings with no stack trace.
  1546. # TODO: extend ExpectLog to test this more precisely
  1547. with ExpectLog(gen_log, ".*no longer here"):
  1548. response = self.fetch("/?exc=http")
  1549. self.assertEqual(response.code, 410)
  1550. def test_unknown_error(self):
  1551. # Unknown errors are logged as errors with a stack trace.
  1552. with ExpectLog(app_log, "Uncaught exception"):
  1553. response = self.fetch("/?exc=zero")
  1554. self.assertEqual(response.code, 500)
  1555. def test_known_error(self):
  1556. # log_exception can override logging behavior, and write_error
  1557. # can override the response.
  1558. with ExpectLog(app_log, "custom logging for PermissionError: not allowed"):
  1559. response = self.fetch("/?exc=permission")
  1560. self.assertEqual(response.code, 403)
  1561. class BuggyLoggingTest(SimpleHandlerTestCase):
  1562. class Handler(RequestHandler):
  1563. def get(self):
  1564. 1 / 0
  1565. def log_exception(self, typ, value, tb):
  1566. 1 / 0
  1567. def test_buggy_log_exception(self):
  1568. # Something gets logged even though the application's
  1569. # logger is broken.
  1570. with ExpectLog(app_log, ".*"):
  1571. self.fetch("/")
  1572. class UIMethodUIModuleTest(SimpleHandlerTestCase):
  1573. """Test that UI methods and modules are created correctly and
  1574. associated with the handler.
  1575. """
  1576. class Handler(RequestHandler):
  1577. def get(self):
  1578. self.render("foo.html")
  1579. def value(self):
  1580. return self.get_argument("value")
  1581. def get_app_kwargs(self):
  1582. def my_ui_method(handler, x):
  1583. return "In my_ui_method(%s) with handler value %s." % (x, handler.value())
  1584. class MyModule(UIModule):
  1585. def render(self, x):
  1586. return "In MyModule(%s) with handler value %s." % (
  1587. x,
  1588. self.handler.value(),
  1589. )
  1590. loader = DictLoader(
  1591. {"foo.html": "{{ my_ui_method(42) }} {% module MyModule(123) %}"}
  1592. )
  1593. return dict(
  1594. template_loader=loader,
  1595. ui_methods={"my_ui_method": my_ui_method},
  1596. ui_modules={"MyModule": MyModule},
  1597. )
  1598. def tearDown(self):
  1599. super(UIMethodUIModuleTest, self).tearDown()
  1600. # TODO: fix template loader caching so this isn't necessary.
  1601. RequestHandler._template_loaders.clear()
  1602. def test_ui_method(self):
  1603. response = self.fetch("/?value=asdf")
  1604. self.assertEqual(
  1605. response.body,
  1606. b"In my_ui_method(42) with handler value asdf. "
  1607. b"In MyModule(123) with handler value asdf.",
  1608. )
  1609. class GetArgumentErrorTest(SimpleHandlerTestCase):
  1610. class Handler(RequestHandler):
  1611. def get(self):
  1612. try:
  1613. self.get_argument("foo")
  1614. self.write({})
  1615. except MissingArgumentError as e:
  1616. self.write({"arg_name": e.arg_name, "log_message": e.log_message})
  1617. def test_catch_error(self):
  1618. response = self.fetch("/")
  1619. self.assertEqual(
  1620. json_decode(response.body),
  1621. {"arg_name": "foo", "log_message": "Missing argument foo"},
  1622. )
  1623. class SetLazyPropertiesTest(SimpleHandlerTestCase):
  1624. class Handler(RequestHandler):
  1625. def prepare(self):
  1626. self.current_user = "Ben"
  1627. self.locale = locale.get("en_US")
  1628. def get_user_locale(self):
  1629. raise NotImplementedError()
  1630. def get_current_user(self):
  1631. raise NotImplementedError()
  1632. def get(self):
  1633. self.write("Hello %s (%s)" % (self.current_user, self.locale.code))
  1634. def test_set_properties(self):
  1635. # Ensure that current_user can be assigned to normally for apps
  1636. # that want to forgo the lazy get_current_user property
  1637. response = self.fetch("/")
  1638. self.assertEqual(response.body, b"Hello Ben (en_US)")
  1639. class GetCurrentUserTest(WebTestCase):
  1640. def get_app_kwargs(self):
  1641. class WithoutUserModule(UIModule):
  1642. def render(self):
  1643. return ""
  1644. class WithUserModule(UIModule):
  1645. def render(self):
  1646. return str(self.current_user)
  1647. loader = DictLoader(
  1648. {
  1649. "without_user.html": "",
  1650. "with_user.html": "{{ current_user }}",
  1651. "without_user_module.html": "{% module WithoutUserModule() %}",
  1652. "with_user_module.html": "{% module WithUserModule() %}",
  1653. }
  1654. )
  1655. return dict(
  1656. template_loader=loader,
  1657. ui_modules={
  1658. "WithUserModule": WithUserModule,
  1659. "WithoutUserModule": WithoutUserModule,
  1660. },
  1661. )
  1662. def tearDown(self):
  1663. super(GetCurrentUserTest, self).tearDown()
  1664. RequestHandler._template_loaders.clear()
  1665. def get_handlers(self):
  1666. class CurrentUserHandler(RequestHandler):
  1667. def prepare(self):
  1668. self.has_loaded_current_user = False
  1669. def get_current_user(self):
  1670. self.has_loaded_current_user = True
  1671. return ""
  1672. class WithoutUserHandler(CurrentUserHandler):
  1673. def get(self):
  1674. self.render_string("without_user.html")
  1675. self.finish(str(self.has_loaded_current_user))
  1676. class WithUserHandler(CurrentUserHandler):
  1677. def get(self):
  1678. self.render_string("with_user.html")
  1679. self.finish(str(self.has_loaded_current_user))
  1680. class CurrentUserModuleHandler(CurrentUserHandler):
  1681. def get_template_namespace(self):
  1682. # If RequestHandler.get_template_namespace is called, then
  1683. # get_current_user is evaluated. Until #820 is fixed, this
  1684. # is a small hack to circumvent the issue.
  1685. return self.ui
  1686. class WithoutUserModuleHandler(CurrentUserModuleHandler):
  1687. def get(self):
  1688. self.render_string("without_user_module.html")
  1689. self.finish(str(self.has_loaded_current_user))
  1690. class WithUserModuleHandler(CurrentUserModuleHandler):
  1691. def get(self):
  1692. self.render_string("with_user_module.html")
  1693. self.finish(str(self.has_loaded_current_user))
  1694. return [
  1695. ("/without_user", WithoutUserHandler),
  1696. ("/with_user", WithUserHandler),
  1697. ("/without_user_module", WithoutUserModuleHandler),
  1698. ("/with_user_module", WithUserModuleHandler),
  1699. ]
  1700. @unittest.skip("needs fix")
  1701. def test_get_current_user_is_lazy(self):
  1702. # TODO: Make this test pass. See #820.
  1703. response = self.fetch("/without_user")
  1704. self.assertEqual(response.body, b"False")
  1705. def test_get_current_user_works(self):
  1706. response = self.fetch("/with_user")
  1707. self.assertEqual(response.body, b"True")
  1708. def test_get_current_user_from_ui_module_is_lazy(self):
  1709. response = self.fetch("/without_user_module")
  1710. self.assertEqual(response.body, b"False")
  1711. def test_get_current_user_from_ui_module_works(self):
  1712. response = self.fetch("/with_user_module")
  1713. self.assertEqual(response.body, b"True")
  1714. class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
  1715. class Handler(RequestHandler):
  1716. pass
  1717. def test_unimplemented_standard_methods(self):
  1718. for method in ["HEAD", "GET", "DELETE", "OPTIONS"]:
  1719. response = self.fetch("/", method=method)
  1720. self.assertEqual(response.code, 405)
  1721. for method in ["POST", "PUT"]:
  1722. response = self.fetch("/", method=method, body=b"")
  1723. self.assertEqual(response.code, 405)
  1724. class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
  1725. class Handler(RequestHandler):
  1726. def other(self):
  1727. # Even though this method exists, it won't get called automatically
  1728. # because it is not in SUPPORTED_METHODS.
  1729. self.write("other")
  1730. def test_unimplemented_patch(self):
  1731. # PATCH is recently standardized; Tornado supports it by default
  1732. # but wsgiref.validate doesn't like it.
  1733. response = self.fetch("/", method="PATCH", body=b"")
  1734. self.assertEqual(response.code, 405)
  1735. def test_unimplemented_other(self):
  1736. response = self.fetch("/", method="OTHER", allow_nonstandard_methods=True)
  1737. self.assertEqual(response.code, 405)
  1738. class AllHTTPMethodsTest(SimpleHandlerTestCase):
  1739. class Handler(RequestHandler):
  1740. def method(self):
  1741. self.write(self.request.method)
  1742. get = delete = options = post = put = method # type: ignore
  1743. def test_standard_methods(self):
  1744. response = self.fetch("/", method="HEAD")
  1745. self.assertEqual(response.body, b"")
  1746. for method in ["GET", "DELETE", "OPTIONS"]:
  1747. response = self.fetch("/", method=method)
  1748. self.assertEqual(response.body, utf8(method))
  1749. for method in ["POST", "PUT"]:
  1750. response = self.fetch("/", method=method, body=b"")
  1751. self.assertEqual(response.body, utf8(method))
  1752. class PatchMethodTest(SimpleHandlerTestCase):
  1753. class Handler(RequestHandler):
  1754. SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ( # type: ignore
  1755. "OTHER",
  1756. )
  1757. def patch(self):
  1758. self.write("patch")
  1759. def other(self):
  1760. self.write("other")
  1761. def test_patch(self):
  1762. response = self.fetch("/", method="PATCH", body=b"")
  1763. self.assertEqual(response.body, b"patch")
  1764. def test_other(self):
  1765. response = self.fetch("/", method="OTHER", allow_nonstandard_methods=True)
  1766. self.assertEqual(response.body, b"other")
  1767. class FinishInPrepareTest(SimpleHandlerTestCase):
  1768. class Handler(RequestHandler):
  1769. def prepare(self):
  1770. self.finish("done")
  1771. def get(self):
  1772. # It's difficult to assert for certain that a method did not
  1773. # or will not be called in an asynchronous context, but this
  1774. # will be logged noisily if it is reached.
  1775. raise Exception("should not reach this method")
  1776. def test_finish_in_prepare(self):
  1777. response = self.fetch("/")
  1778. self.assertEqual(response.body, b"done")
  1779. class Default404Test(WebTestCase):
  1780. def get_handlers(self):
  1781. # If there are no handlers at all a default redirect handler gets added.
  1782. return [("/foo", RequestHandler)]
  1783. def test_404(self):
  1784. response = self.fetch("/")
  1785. self.assertEqual(response.code, 404)
  1786. self.assertEqual(
  1787. response.body,
  1788. b"<html><title>404: Not Found</title>"
  1789. b"<body>404: Not Found</body></html>",
  1790. )
  1791. class Custom404Test(WebTestCase):
  1792. def get_handlers(self):
  1793. return [("/foo", RequestHandler)]
  1794. def get_app_kwargs(self):
  1795. class Custom404Handler(RequestHandler):
  1796. def get(self):
  1797. self.set_status(404)
  1798. self.write("custom 404 response")
  1799. return dict(default_handler_class=Custom404Handler)
  1800. def test_404(self):
  1801. response = self.fetch("/")
  1802. self.assertEqual(response.code, 404)
  1803. self.assertEqual(response.body, b"custom 404 response")
  1804. class DefaultHandlerArgumentsTest(WebTestCase):
  1805. def get_handlers(self):
  1806. return [("/foo", RequestHandler)]
  1807. def get_app_kwargs(self):
  1808. return dict(
  1809. default_handler_class=ErrorHandler,
  1810. default_handler_args=dict(status_code=403),
  1811. )
  1812. def test_403(self):
  1813. response = self.fetch("/")
  1814. self.assertEqual(response.code, 403)
  1815. class HandlerByNameTest(WebTestCase):
  1816. def get_handlers(self):
  1817. # All three are equivalent.
  1818. return [
  1819. ("/hello1", HelloHandler),
  1820. ("/hello2", "tornado.test.web_test.HelloHandler"),
  1821. url("/hello3", "tornado.test.web_test.HelloHandler"),
  1822. ]
  1823. def test_handler_by_name(self):
  1824. resp = self.fetch("/hello1")
  1825. self.assertEqual(resp.body, b"hello")
  1826. resp = self.fetch("/hello2")
  1827. self.assertEqual(resp.body, b"hello")
  1828. resp = self.fetch("/hello3")
  1829. self.assertEqual(resp.body, b"hello")
  1830. class StreamingRequestBodyTest(WebTestCase):
  1831. def get_handlers(self):
  1832. @stream_request_body
  1833. class StreamingBodyHandler(RequestHandler):
  1834. def initialize(self, test):
  1835. self.test = test
  1836. def prepare(self):
  1837. self.test.prepared.set_result(None)
  1838. def data_received(self, data):
  1839. self.test.data.set_result(data)
  1840. def get(self):
  1841. self.test.finished.set_result(None)
  1842. self.write({})
  1843. @stream_request_body
  1844. class EarlyReturnHandler(RequestHandler):
  1845. def prepare(self):
  1846. # If we finish the response in prepare, it won't continue to
  1847. # the (non-existent) data_received.
  1848. raise HTTPError(401)
  1849. @stream_request_body
  1850. class CloseDetectionHandler(RequestHandler):
  1851. def initialize(self, test):
  1852. self.test = test
  1853. def on_connection_close(self):
  1854. super(CloseDetectionHandler, self).on_connection_close()
  1855. self.test.close_future.set_result(None)
  1856. return [
  1857. ("/stream_body", StreamingBodyHandler, dict(test=self)),
  1858. ("/early_return", EarlyReturnHandler),
  1859. ("/close_detection", CloseDetectionHandler, dict(test=self)),
  1860. ]
  1861. def connect(self, url, connection_close):
  1862. # Use a raw connection so we can control the sending of data.
  1863. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  1864. s.connect(("127.0.0.1", self.get_http_port()))
  1865. stream = IOStream(s)
  1866. stream.write(b"GET " + url + b" HTTP/1.1\r\n")
  1867. if connection_close:
  1868. stream.write(b"Connection: close\r\n")
  1869. stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
  1870. return stream
  1871. @gen_test
  1872. def test_streaming_body(self):
  1873. self.prepared = Future() # type: Future[None]
  1874. self.data = Future() # type: Future[bytes]
  1875. self.finished = Future() # type: Future[None]
  1876. stream = self.connect(b"/stream_body", connection_close=True)
  1877. yield self.prepared
  1878. stream.write(b"4\r\nasdf\r\n")
  1879. # Ensure the first chunk is received before we send the second.
  1880. data = yield self.data
  1881. self.assertEqual(data, b"asdf")
  1882. self.data = Future()
  1883. stream.write(b"4\r\nqwer\r\n")
  1884. data = yield self.data
  1885. self.assertEquals(data, b"qwer")
  1886. stream.write(b"0\r\n\r\n")
  1887. yield self.finished
  1888. data = yield stream.read_until_close()
  1889. # This would ideally use an HTTP1Connection to read the response.
  1890. self.assertTrue(data.endswith(b"{}"))
  1891. stream.close()
  1892. @gen_test
  1893. def test_early_return(self):
  1894. stream = self.connect(b"/early_return", connection_close=False)
  1895. data = yield stream.read_until_close()
  1896. self.assertTrue(data.startswith(b"HTTP/1.1 401"))
  1897. @gen_test
  1898. def test_early_return_with_data(self):
  1899. stream = self.connect(b"/early_return", connection_close=False)
  1900. stream.write(b"4\r\nasdf\r\n")
  1901. data = yield stream.read_until_close()
  1902. self.assertTrue(data.startswith(b"HTTP/1.1 401"))
  1903. @gen_test
  1904. def test_close_during_upload(self):
  1905. self.close_future = Future() # type: Future[None]
  1906. stream = self.connect(b"/close_detection", connection_close=False)
  1907. stream.close()
  1908. yield self.close_future
  1909. # Each method in this handler returns a yieldable object and yields to the
  1910. # IOLoop so the future is not immediately ready. Ensure that the
  1911. # yieldables are respected and no method is called before the previous
  1912. # one has completed.
  1913. @stream_request_body
  1914. class BaseFlowControlHandler(RequestHandler):
  1915. def initialize(self, test):
  1916. self.test = test
  1917. self.method = None
  1918. self.methods = [] # type: typing.List[str]
  1919. @contextlib.contextmanager
  1920. def in_method(self, method):
  1921. if self.method is not None:
  1922. self.test.fail("entered method %s while in %s" % (method, self.method))
  1923. self.method = method
  1924. self.methods.append(method)
  1925. try:
  1926. yield
  1927. finally:
  1928. self.method = None
  1929. @gen.coroutine
  1930. def prepare(self):
  1931. # Note that asynchronous prepare() does not block data_received,
  1932. # so we don't use in_method here.
  1933. self.methods.append("prepare")
  1934. yield gen.moment
  1935. @gen.coroutine
  1936. def post(self):
  1937. with self.in_method("post"):
  1938. yield gen.moment
  1939. self.write(dict(methods=self.methods))
  1940. class BaseStreamingRequestFlowControlTest(object):
  1941. def get_httpserver_options(self):
  1942. # Use a small chunk size so flow control is relevant even though
  1943. # all the data arrives at once.
  1944. return dict(chunk_size=10, decompress_request=True)
  1945. def get_http_client(self):
  1946. # simple_httpclient only: curl doesn't support body_producer.
  1947. return SimpleAsyncHTTPClient()
  1948. # Test all the slightly different code paths for fixed, chunked, etc bodies.
  1949. def test_flow_control_fixed_body(self):
  1950. response = self.fetch("/", body="abcdefghijklmnopqrstuvwxyz", method="POST")
  1951. response.rethrow()
  1952. self.assertEqual(
  1953. json_decode(response.body),
  1954. dict(
  1955. methods=[
  1956. "prepare",
  1957. "data_received",
  1958. "data_received",
  1959. "data_received",
  1960. "post",
  1961. ]
  1962. ),
  1963. )
  1964. def test_flow_control_chunked_body(self):
  1965. chunks = [b"abcd", b"efgh", b"ijkl"]
  1966. @gen.coroutine
  1967. def body_producer(write):
  1968. for i in chunks:
  1969. yield write(i)
  1970. response = self.fetch("/", body_producer=body_producer, method="POST")
  1971. response.rethrow()
  1972. self.assertEqual(
  1973. json_decode(response.body),
  1974. dict(
  1975. methods=[
  1976. "prepare",
  1977. "data_received",
  1978. "data_received",
  1979. "data_received",
  1980. "post",
  1981. ]
  1982. ),
  1983. )
  1984. def test_flow_control_compressed_body(self):
  1985. bytesio = BytesIO()
  1986. gzip_file = gzip.GzipFile(mode="w", fileobj=bytesio)
  1987. gzip_file.write(b"abcdefghijklmnopqrstuvwxyz")
  1988. gzip_file.close()
  1989. compressed_body = bytesio.getvalue()
  1990. response = self.fetch(
  1991. "/",
  1992. body=compressed_body,
  1993. method="POST",
  1994. headers={"Content-Encoding": "gzip"},
  1995. )
  1996. response.rethrow()
  1997. self.assertEqual(
  1998. json_decode(response.body),
  1999. dict(
  2000. methods=[
  2001. "prepare",
  2002. "data_received",
  2003. "data_received",
  2004. "data_received",
  2005. "post",
  2006. ]
  2007. ),
  2008. )
  2009. class DecoratedStreamingRequestFlowControlTest(
  2010. BaseStreamingRequestFlowControlTest, WebTestCase
  2011. ):
  2012. def get_handlers(self):
  2013. class DecoratedFlowControlHandler(BaseFlowControlHandler):
  2014. @gen.coroutine
  2015. def data_received(self, data):
  2016. with self.in_method("data_received"):
  2017. yield gen.moment
  2018. return [("/", DecoratedFlowControlHandler, dict(test=self))]
  2019. class NativeStreamingRequestFlowControlTest(
  2020. BaseStreamingRequestFlowControlTest, WebTestCase
  2021. ):
  2022. def get_handlers(self):
  2023. class NativeFlowControlHandler(BaseFlowControlHandler):
  2024. async def data_received(self, data):
  2025. with self.in_method("data_received"):
  2026. import asyncio
  2027. await asyncio.sleep(0)
  2028. return [("/", NativeFlowControlHandler, dict(test=self))]
  2029. class IncorrectContentLengthTest(SimpleHandlerTestCase):
  2030. def get_handlers(self):
  2031. test = self
  2032. self.server_error = None
  2033. # Manually set a content-length that doesn't match the actual content.
  2034. class TooHigh(RequestHandler):
  2035. def get(self):
  2036. self.set_header("Content-Length", "42")
  2037. try:
  2038. self.finish("ok")
  2039. except Exception as e:
  2040. test.server_error = e
  2041. raise
  2042. class TooLow(RequestHandler):
  2043. def get(self):
  2044. self.set_header("Content-Length", "2")
  2045. try:
  2046. self.finish("hello")
  2047. except Exception as e:
  2048. test.server_error = e
  2049. raise
  2050. return [("/high", TooHigh), ("/low", TooLow)]
  2051. def test_content_length_too_high(self):
  2052. # When the content-length is too high, the connection is simply
  2053. # closed without completing the response. An error is logged on
  2054. # the server.
  2055. with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
  2056. with ExpectLog(
  2057. gen_log,
  2058. "(Cannot send error response after headers written"
  2059. "|Failed to flush partial response)",
  2060. ):
  2061. with self.assertRaises(HTTPClientError):
  2062. self.fetch("/high", raise_error=True)
  2063. self.assertEqual(
  2064. str(self.server_error), "Tried to write 40 bytes less than Content-Length"
  2065. )
  2066. def test_content_length_too_low(self):
  2067. # When the content-length is too low, the connection is closed
  2068. # without writing the last chunk, so the client never sees the request
  2069. # complete (which would be a framing error).
  2070. with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
  2071. with ExpectLog(
  2072. gen_log,
  2073. "(Cannot send error response after headers written"
  2074. "|Failed to flush partial response)",
  2075. ):
  2076. with self.assertRaises(HTTPClientError):
  2077. self.fetch("/low", raise_error=True)
  2078. self.assertEqual(
  2079. str(self.server_error), "Tried to write more data than Content-Length"
  2080. )
  2081. class ClientCloseTest(SimpleHandlerTestCase):
  2082. class Handler(RequestHandler):
  2083. def get(self):
  2084. if self.request.version.startswith("HTTP/1"):
  2085. # Simulate a connection closed by the client during
  2086. # request processing. The client will see an error, but the
  2087. # server should respond gracefully (without logging errors
  2088. # because we were unable to write out as many bytes as
  2089. # Content-Length said we would)
  2090. self.request.connection.stream.close()
  2091. self.write("hello")
  2092. else:
  2093. # TODO: add a HTTP2-compatible version of this test.
  2094. self.write("requires HTTP/1.x")
  2095. def test_client_close(self):
  2096. with self.assertRaises((HTTPClientError, unittest.SkipTest)):
  2097. response = self.fetch("/", raise_error=True)
  2098. if response.body == b"requires HTTP/1.x":
  2099. self.skipTest("requires HTTP/1.x")
  2100. self.assertEqual(response.code, 599)
  2101. class SignedValueTest(unittest.TestCase):
  2102. SECRET = "It's a secret to everybody"
  2103. SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
  2104. def past(self):
  2105. return self.present() - 86400 * 32
  2106. def present(self):
  2107. return 1300000000
  2108. def test_known_values(self):
  2109. signed_v1 = create_signed_value(
  2110. SignedValueTest.SECRET, "key", "value", version=1, clock=self.present
  2111. )
  2112. self.assertEqual(
  2113. signed_v1, b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f"
  2114. )
  2115. signed_v2 = create_signed_value(
  2116. SignedValueTest.SECRET, "key", "value", version=2, clock=self.present
  2117. )
  2118. self.assertEqual(
  2119. signed_v2,
  2120. b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
  2121. b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152",
  2122. )
  2123. signed_default = create_signed_value(
  2124. SignedValueTest.SECRET, "key", "value", clock=self.present
  2125. )
  2126. self.assertEqual(signed_default, signed_v2)
  2127. decoded_v1 = decode_signed_value(
  2128. SignedValueTest.SECRET, "key", signed_v1, min_version=1, clock=self.present
  2129. )
  2130. self.assertEqual(decoded_v1, b"value")
  2131. decoded_v2 = decode_signed_value(
  2132. SignedValueTest.SECRET, "key", signed_v2, min_version=2, clock=self.present
  2133. )
  2134. self.assertEqual(decoded_v2, b"value")
  2135. def test_name_swap(self):
  2136. signed1 = create_signed_value(
  2137. SignedValueTest.SECRET, "key1", "value", clock=self.present
  2138. )
  2139. signed2 = create_signed_value(
  2140. SignedValueTest.SECRET, "key2", "value", clock=self.present
  2141. )
  2142. # Try decoding each string with the other's "name"
  2143. decoded1 = decode_signed_value(
  2144. SignedValueTest.SECRET, "key2", signed1, clock=self.present
  2145. )
  2146. self.assertIs(decoded1, None)
  2147. decoded2 = decode_signed_value(
  2148. SignedValueTest.SECRET, "key1", signed2, clock=self.present
  2149. )
  2150. self.assertIs(decoded2, None)
  2151. def test_expired(self):
  2152. signed = create_signed_value(
  2153. SignedValueTest.SECRET, "key1", "value", clock=self.past
  2154. )
  2155. decoded_past = decode_signed_value(
  2156. SignedValueTest.SECRET, "key1", signed, clock=self.past
  2157. )
  2158. self.assertEqual(decoded_past, b"value")
  2159. decoded_present = decode_signed_value(
  2160. SignedValueTest.SECRET, "key1", signed, clock=self.present
  2161. )
  2162. self.assertIs(decoded_present, None)
  2163. def test_payload_tampering(self):
  2164. # These cookies are variants of the one in test_known_values.
  2165. sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
  2166. def validate(prefix):
  2167. return b"value" == decode_signed_value(
  2168. SignedValueTest.SECRET, "key", prefix + sig, clock=self.present
  2169. )
  2170. self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
  2171. # Change key version
  2172. self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
  2173. # length mismatch (field too short)
  2174. self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
  2175. # length mismatch (field too long)
  2176. self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
  2177. def test_signature_tampering(self):
  2178. prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
  2179. def validate(sig):
  2180. return b"value" == decode_signed_value(
  2181. SignedValueTest.SECRET, "key", prefix + sig, clock=self.present
  2182. )
  2183. self.assertTrue(
  2184. validate("3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
  2185. )
  2186. # All zeros
  2187. self.assertFalse(validate("0" * 32))
  2188. # Change one character
  2189. self.assertFalse(
  2190. validate("4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
  2191. )
  2192. # Change another character
  2193. self.assertFalse(
  2194. validate("3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153")
  2195. )
  2196. # Truncate
  2197. self.assertFalse(
  2198. validate("3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15")
  2199. )
  2200. # Lengthen
  2201. self.assertFalse(
  2202. validate(
  2203. "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"
  2204. )
  2205. )
  2206. def test_non_ascii(self):
  2207. value = b"\xe9"
  2208. signed = create_signed_value(
  2209. SignedValueTest.SECRET, "key", value, clock=self.present
  2210. )
  2211. decoded = decode_signed_value(
  2212. SignedValueTest.SECRET, "key", signed, clock=self.present
  2213. )
  2214. self.assertEqual(value, decoded)
  2215. def test_key_versioning_read_write_default_key(self):
  2216. value = b"\xe9"
  2217. signed = create_signed_value(
  2218. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=0
  2219. )
  2220. decoded = decode_signed_value(
  2221. SignedValueTest.SECRET_DICT, "key", signed, clock=self.present
  2222. )
  2223. self.assertEqual(value, decoded)
  2224. def test_key_versioning_read_write_non_default_key(self):
  2225. value = b"\xe9"
  2226. signed = create_signed_value(
  2227. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=1
  2228. )
  2229. decoded = decode_signed_value(
  2230. SignedValueTest.SECRET_DICT, "key", signed, clock=self.present
  2231. )
  2232. self.assertEqual(value, decoded)
  2233. def test_key_versioning_invalid_key(self):
  2234. value = b"\xe9"
  2235. signed = create_signed_value(
  2236. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=0
  2237. )
  2238. newkeys = SignedValueTest.SECRET_DICT.copy()
  2239. newkeys.pop(0)
  2240. decoded = decode_signed_value(newkeys, "key", signed, clock=self.present)
  2241. self.assertEqual(None, decoded)
  2242. def test_key_version_retrieval(self):
  2243. value = b"\xe9"
  2244. signed = create_signed_value(
  2245. SignedValueTest.SECRET_DICT, "key", value, clock=self.present, key_version=1
  2246. )
  2247. key_version = get_signature_key_version(signed)
  2248. self.assertEqual(1, key_version)
  2249. class XSRFTest(SimpleHandlerTestCase):
  2250. class Handler(RequestHandler):
  2251. def get(self):
  2252. version = int(self.get_argument("version", "2"))
  2253. # This would be a bad idea in a real app, but in this test
  2254. # it's fine.
  2255. self.settings["xsrf_cookie_version"] = version
  2256. self.write(self.xsrf_token)
  2257. def post(self):
  2258. self.write("ok")
  2259. def get_app_kwargs(self):
  2260. return dict(xsrf_cookies=True)
  2261. def setUp(self):
  2262. super(XSRFTest, self).setUp()
  2263. self.xsrf_token = self.get_token()
  2264. def get_token(self, old_token=None, version=None):
  2265. if old_token is not None:
  2266. headers = self.cookie_headers(old_token)
  2267. else:
  2268. headers = None
  2269. response = self.fetch(
  2270. "/" if version is None else ("/?version=%d" % version), headers=headers
  2271. )
  2272. response.rethrow()
  2273. return native_str(response.body)
  2274. def cookie_headers(self, token=None):
  2275. if token is None:
  2276. token = self.xsrf_token
  2277. return {"Cookie": "_xsrf=" + token}
  2278. def test_xsrf_fail_no_token(self):
  2279. with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
  2280. response = self.fetch("/", method="POST", body=b"")
  2281. self.assertEqual(response.code, 403)
  2282. def test_xsrf_fail_body_no_cookie(self):
  2283. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2284. response = self.fetch(
  2285. "/",
  2286. method="POST",
  2287. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2288. )
  2289. self.assertEqual(response.code, 403)
  2290. def test_xsrf_fail_argument_invalid_format(self):
  2291. with ExpectLog(gen_log, ".*'_xsrf' argument has invalid format"):
  2292. response = self.fetch(
  2293. "/",
  2294. method="POST",
  2295. headers=self.cookie_headers(),
  2296. body=urllib.parse.urlencode(dict(_xsrf="3|")),
  2297. )
  2298. self.assertEqual(response.code, 403)
  2299. def test_xsrf_fail_cookie_invalid_format(self):
  2300. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2301. response = self.fetch(
  2302. "/",
  2303. method="POST",
  2304. headers=self.cookie_headers(token="3|"),
  2305. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2306. )
  2307. self.assertEqual(response.code, 403)
  2308. def test_xsrf_fail_cookie_no_body(self):
  2309. with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
  2310. response = self.fetch(
  2311. "/", method="POST", body=b"", headers=self.cookie_headers()
  2312. )
  2313. self.assertEqual(response.code, 403)
  2314. def test_xsrf_success_short_token(self):
  2315. response = self.fetch(
  2316. "/",
  2317. method="POST",
  2318. body=urllib.parse.urlencode(dict(_xsrf="deadbeef")),
  2319. headers=self.cookie_headers(token="deadbeef"),
  2320. )
  2321. self.assertEqual(response.code, 200)
  2322. def test_xsrf_success_non_hex_token(self):
  2323. response = self.fetch(
  2324. "/",
  2325. method="POST",
  2326. body=urllib.parse.urlencode(dict(_xsrf="xoxo")),
  2327. headers=self.cookie_headers(token="xoxo"),
  2328. )
  2329. self.assertEqual(response.code, 200)
  2330. def test_xsrf_success_post_body(self):
  2331. response = self.fetch(
  2332. "/",
  2333. method="POST",
  2334. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2335. headers=self.cookie_headers(),
  2336. )
  2337. self.assertEqual(response.code, 200)
  2338. def test_xsrf_success_query_string(self):
  2339. response = self.fetch(
  2340. "/?" + urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2341. method="POST",
  2342. body=b"",
  2343. headers=self.cookie_headers(),
  2344. )
  2345. self.assertEqual(response.code, 200)
  2346. def test_xsrf_success_header(self):
  2347. response = self.fetch(
  2348. "/",
  2349. method="POST",
  2350. body=b"",
  2351. headers=dict(
  2352. {"X-Xsrftoken": self.xsrf_token}, # type: ignore
  2353. **self.cookie_headers()
  2354. ),
  2355. )
  2356. self.assertEqual(response.code, 200)
  2357. def test_distinct_tokens(self):
  2358. # Every request gets a distinct token.
  2359. NUM_TOKENS = 10
  2360. tokens = set()
  2361. for i in range(NUM_TOKENS):
  2362. tokens.add(self.get_token())
  2363. self.assertEqual(len(tokens), NUM_TOKENS)
  2364. def test_cross_user(self):
  2365. token2 = self.get_token()
  2366. # Each token can be used to authenticate its own request.
  2367. for token in (self.xsrf_token, token2):
  2368. response = self.fetch(
  2369. "/",
  2370. method="POST",
  2371. body=urllib.parse.urlencode(dict(_xsrf=token)),
  2372. headers=self.cookie_headers(token),
  2373. )
  2374. self.assertEqual(response.code, 200)
  2375. # Sending one in the cookie and the other in the body is not allowed.
  2376. for cookie_token, body_token in (
  2377. (self.xsrf_token, token2),
  2378. (token2, self.xsrf_token),
  2379. ):
  2380. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2381. response = self.fetch(
  2382. "/",
  2383. method="POST",
  2384. body=urllib.parse.urlencode(dict(_xsrf=body_token)),
  2385. headers=self.cookie_headers(cookie_token),
  2386. )
  2387. self.assertEqual(response.code, 403)
  2388. def test_refresh_token(self):
  2389. token = self.xsrf_token
  2390. tokens_seen = set([token])
  2391. # A user's token is stable over time. Refreshing the page in one tab
  2392. # might update the cookie while an older tab still has the old cookie
  2393. # in its DOM. Simulate this scenario by passing a constant token
  2394. # in the body and re-querying for the token.
  2395. for i in range(5):
  2396. token = self.get_token(token)
  2397. # Tokens are encoded uniquely each time
  2398. tokens_seen.add(token)
  2399. response = self.fetch(
  2400. "/",
  2401. method="POST",
  2402. body=urllib.parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2403. headers=self.cookie_headers(token),
  2404. )
  2405. self.assertEqual(response.code, 200)
  2406. self.assertEqual(len(tokens_seen), 6)
  2407. def test_versioning(self):
  2408. # Version 1 still produces distinct tokens per request.
  2409. self.assertNotEqual(self.get_token(version=1), self.get_token(version=1))
  2410. # Refreshed v1 tokens are all identical.
  2411. v1_token = self.get_token(version=1)
  2412. for i in range(5):
  2413. self.assertEqual(self.get_token(v1_token, version=1), v1_token)
  2414. # Upgrade to a v2 version of the same token
  2415. v2_token = self.get_token(v1_token)
  2416. self.assertNotEqual(v1_token, v2_token)
  2417. # Each v1 token can map to many v2 tokens.
  2418. self.assertNotEqual(v2_token, self.get_token(v1_token))
  2419. # The tokens are cross-compatible.
  2420. for cookie_token, body_token in ((v1_token, v2_token), (v2_token, v1_token)):
  2421. response = self.fetch(
  2422. "/",
  2423. method="POST",
  2424. body=urllib.parse.urlencode(dict(_xsrf=body_token)),
  2425. headers=self.cookie_headers(cookie_token),
  2426. )
  2427. self.assertEqual(response.code, 200)
  2428. class XSRFCookieKwargsTest(SimpleHandlerTestCase):
  2429. class Handler(RequestHandler):
  2430. def get(self):
  2431. self.write(self.xsrf_token)
  2432. def get_app_kwargs(self):
  2433. return dict(
  2434. xsrf_cookies=True, xsrf_cookie_kwargs=dict(httponly=True, expires_days=2)
  2435. )
  2436. def test_xsrf_httponly(self):
  2437. response = self.fetch("/")
  2438. self.assertIn("httponly;", response.headers["Set-Cookie"].lower())
  2439. self.assertIn("expires=", response.headers["Set-Cookie"].lower())
  2440. header = response.headers.get("Set-Cookie")
  2441. match = re.match(".*; expires=(?P<expires>.+);.*", header)
  2442. assert match is not None
  2443. expires = datetime.datetime.utcnow() + datetime.timedelta(days=2)
  2444. parsed = email.utils.parsedate(match.groupdict()["expires"])
  2445. assert parsed is not None
  2446. header_expires = datetime.datetime(*parsed[:6])
  2447. self.assertTrue(abs((expires - header_expires).total_seconds()) < 10)
  2448. class FinishExceptionTest(SimpleHandlerTestCase):
  2449. class Handler(RequestHandler):
  2450. def get(self):
  2451. self.set_status(401)
  2452. self.set_header("WWW-Authenticate", 'Basic realm="something"')
  2453. if self.get_argument("finish_value", ""):
  2454. raise Finish("authentication required")
  2455. else:
  2456. self.write("authentication required")
  2457. raise Finish()
  2458. def test_finish_exception(self):
  2459. for u in ["/", "/?finish_value=1"]:
  2460. response = self.fetch(u)
  2461. self.assertEqual(response.code, 401)
  2462. self.assertEqual(
  2463. 'Basic realm="something"', response.headers.get("WWW-Authenticate")
  2464. )
  2465. self.assertEqual(b"authentication required", response.body)
  2466. class DecoratorTest(WebTestCase):
  2467. def get_handlers(self):
  2468. class RemoveSlashHandler(RequestHandler):
  2469. @removeslash
  2470. def get(self):
  2471. pass
  2472. class AddSlashHandler(RequestHandler):
  2473. @addslash
  2474. def get(self):
  2475. pass
  2476. return [("/removeslash/", RemoveSlashHandler), ("/addslash", AddSlashHandler)]
  2477. def test_removeslash(self):
  2478. response = self.fetch("/removeslash/", follow_redirects=False)
  2479. self.assertEqual(response.code, 301)
  2480. self.assertEqual(response.headers["Location"], "/removeslash")
  2481. response = self.fetch("/removeslash/?foo=bar", follow_redirects=False)
  2482. self.assertEqual(response.code, 301)
  2483. self.assertEqual(response.headers["Location"], "/removeslash?foo=bar")
  2484. def test_addslash(self):
  2485. response = self.fetch("/addslash", follow_redirects=False)
  2486. self.assertEqual(response.code, 301)
  2487. self.assertEqual(response.headers["Location"], "/addslash/")
  2488. response = self.fetch("/addslash?foo=bar", follow_redirects=False)
  2489. self.assertEqual(response.code, 301)
  2490. self.assertEqual(response.headers["Location"], "/addslash/?foo=bar")
  2491. class CacheTest(WebTestCase):
  2492. def get_handlers(self):
  2493. class EtagHandler(RequestHandler):
  2494. def get(self, computed_etag):
  2495. self.write(computed_etag)
  2496. def compute_etag(self):
  2497. return self._write_buffer[0]
  2498. return [("/etag/(.*)", EtagHandler)]
  2499. def test_wildcard_etag(self):
  2500. computed_etag = '"xyzzy"'
  2501. etags = "*"
  2502. self._test_etag(computed_etag, etags, 304)
  2503. def test_strong_etag_match(self):
  2504. computed_etag = '"xyzzy"'
  2505. etags = '"xyzzy"'
  2506. self._test_etag(computed_etag, etags, 304)
  2507. def test_multiple_strong_etag_match(self):
  2508. computed_etag = '"xyzzy1"'
  2509. etags = '"xyzzy1", "xyzzy2"'
  2510. self._test_etag(computed_etag, etags, 304)
  2511. def test_strong_etag_not_match(self):
  2512. computed_etag = '"xyzzy"'
  2513. etags = '"xyzzy1"'
  2514. self._test_etag(computed_etag, etags, 200)
  2515. def test_multiple_strong_etag_not_match(self):
  2516. computed_etag = '"xyzzy"'
  2517. etags = '"xyzzy1", "xyzzy2"'
  2518. self._test_etag(computed_etag, etags, 200)
  2519. def test_weak_etag_match(self):
  2520. computed_etag = '"xyzzy1"'
  2521. etags = 'W/"xyzzy1"'
  2522. self._test_etag(computed_etag, etags, 304)
  2523. def test_multiple_weak_etag_match(self):
  2524. computed_etag = '"xyzzy2"'
  2525. etags = 'W/"xyzzy1", W/"xyzzy2"'
  2526. self._test_etag(computed_etag, etags, 304)
  2527. def test_weak_etag_not_match(self):
  2528. computed_etag = '"xyzzy2"'
  2529. etags = 'W/"xyzzy1"'
  2530. self._test_etag(computed_etag, etags, 200)
  2531. def test_multiple_weak_etag_not_match(self):
  2532. computed_etag = '"xyzzy3"'
  2533. etags = 'W/"xyzzy1", W/"xyzzy2"'
  2534. self._test_etag(computed_etag, etags, 200)
  2535. def _test_etag(self, computed_etag, etags, status_code):
  2536. response = self.fetch(
  2537. "/etag/" + computed_etag, headers={"If-None-Match": etags}
  2538. )
  2539. self.assertEqual(response.code, status_code)
  2540. class RequestSummaryTest(SimpleHandlerTestCase):
  2541. class Handler(RequestHandler):
  2542. def get(self):
  2543. # remote_ip is optional, although it's set by
  2544. # both HTTPServer and WSGIAdapter.
  2545. # Clobber it to make sure it doesn't break logging.
  2546. self.request.remote_ip = None
  2547. self.finish(self._request_summary())
  2548. def test_missing_remote_ip(self):
  2549. resp = self.fetch("/")
  2550. self.assertEqual(resp.body, b"GET / (None)")
  2551. class HTTPErrorTest(unittest.TestCase):
  2552. def test_copy(self):
  2553. e = HTTPError(403, reason="Go away")
  2554. e2 = copy.copy(e)
  2555. self.assertIsNot(e, e2)
  2556. self.assertEqual(e.status_code, e2.status_code)
  2557. self.assertEqual(e.reason, e2.reason)
  2558. class ApplicationTest(AsyncTestCase):
  2559. def test_listen(self):
  2560. app = Application([])
  2561. server = app.listen(0, address="127.0.0.1")
  2562. server.stop()
  2563. class URLSpecReverseTest(unittest.TestCase):
  2564. def test_reverse(self):
  2565. self.assertEqual("/favicon.ico", url(r"/favicon\.ico", None).reverse())
  2566. self.assertEqual("/favicon.ico", url(r"^/favicon\.ico$", None).reverse())
  2567. def test_non_reversible(self):
  2568. # URLSpecs are non-reversible if they include non-constant
  2569. # regex features outside capturing groups. Currently, this is
  2570. # only strictly enforced for backslash-escaped character
  2571. # classes.
  2572. paths = [r"^/api/v\d+/foo/(\w+)$"]
  2573. for path in paths:
  2574. # A URLSpec can still be created even if it cannot be reversed.
  2575. url_spec = url(path, None)
  2576. try:
  2577. result = url_spec.reverse()
  2578. self.fail(
  2579. "did not get expected exception when reversing %s. "
  2580. "result: %s" % (path, result)
  2581. )
  2582. except ValueError:
  2583. pass
  2584. def test_reverse_arguments(self):
  2585. self.assertEqual(
  2586. "/api/v1/foo/bar", url(r"^/api/v1/foo/(\w+)$", None).reverse("bar")
  2587. )
  2588. class RedirectHandlerTest(WebTestCase):
  2589. def get_handlers(self):
  2590. return [
  2591. ("/src", WebRedirectHandler, {"url": "/dst"}),
  2592. ("/src2", WebRedirectHandler, {"url": "/dst2?foo=bar"}),
  2593. (r"/(.*?)/(.*?)/(.*)", WebRedirectHandler, {"url": "/{1}/{0}/{2}"}),
  2594. ]
  2595. def test_basic_redirect(self):
  2596. response = self.fetch("/src", follow_redirects=False)
  2597. self.assertEqual(response.code, 301)
  2598. self.assertEqual(response.headers["Location"], "/dst")
  2599. def test_redirect_with_argument(self):
  2600. response = self.fetch("/src?foo=bar", follow_redirects=False)
  2601. self.assertEqual(response.code, 301)
  2602. self.assertEqual(response.headers["Location"], "/dst?foo=bar")
  2603. def test_redirect_with_appending_argument(self):
  2604. response = self.fetch("/src2?foo2=bar2", follow_redirects=False)
  2605. self.assertEqual(response.code, 301)
  2606. self.assertEqual(response.headers["Location"], "/dst2?foo=bar&foo2=bar2")
  2607. def test_redirect_pattern(self):
  2608. response = self.fetch("/a/b/c", follow_redirects=False)
  2609. self.assertEqual(response.code, 301)
  2610. self.assertEqual(response.headers["Location"], "/b/a/c")