curl_httpclient_test.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. # coding: utf-8
  2. from hashlib import md5
  3. import unittest
  4. from tornado.escape import utf8
  5. from tornado.testing import AsyncHTTPTestCase
  6. from tornado.test import httpclient_test
  7. from tornado.web import Application, RequestHandler
  8. try:
  9. import pycurl # type: ignore
  10. except ImportError:
  11. pycurl = None
  12. if pycurl is not None:
  13. from tornado.curl_httpclient import CurlAsyncHTTPClient
  14. @unittest.skipIf(pycurl is None, "pycurl module not present")
  15. class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
  16. def get_http_client(self):
  17. client = CurlAsyncHTTPClient(defaults=dict(allow_ipv6=False))
  18. # make sure AsyncHTTPClient magic doesn't give us the wrong class
  19. self.assertTrue(isinstance(client, CurlAsyncHTTPClient))
  20. return client
  21. class DigestAuthHandler(RequestHandler):
  22. def initialize(self, username, password):
  23. self.username = username
  24. self.password = password
  25. def get(self):
  26. realm = "test"
  27. opaque = "asdf"
  28. # Real implementations would use a random nonce.
  29. nonce = "1234"
  30. auth_header = self.request.headers.get("Authorization", None)
  31. if auth_header is not None:
  32. auth_mode, params = auth_header.split(" ", 1)
  33. assert auth_mode == "Digest"
  34. param_dict = {}
  35. for pair in params.split(","):
  36. k, v = pair.strip().split("=", 1)
  37. if v[0] == '"' and v[-1] == '"':
  38. v = v[1:-1]
  39. param_dict[k] = v
  40. assert param_dict["realm"] == realm
  41. assert param_dict["opaque"] == opaque
  42. assert param_dict["nonce"] == nonce
  43. assert param_dict["username"] == self.username
  44. assert param_dict["uri"] == self.request.path
  45. h1 = md5(
  46. utf8("%s:%s:%s" % (self.username, realm, self.password))
  47. ).hexdigest()
  48. h2 = md5(
  49. utf8("%s:%s" % (self.request.method, self.request.path))
  50. ).hexdigest()
  51. digest = md5(utf8("%s:%s:%s" % (h1, nonce, h2))).hexdigest()
  52. if digest == param_dict["response"]:
  53. self.write("ok")
  54. else:
  55. self.write("fail")
  56. else:
  57. self.set_status(401)
  58. self.set_header(
  59. "WWW-Authenticate",
  60. 'Digest realm="%s", nonce="%s", opaque="%s"' % (realm, nonce, opaque),
  61. )
  62. class CustomReasonHandler(RequestHandler):
  63. def get(self):
  64. self.set_status(200, "Custom reason")
  65. class CustomFailReasonHandler(RequestHandler):
  66. def get(self):
  67. self.set_status(400, "Custom reason")
  68. @unittest.skipIf(pycurl is None, "pycurl module not present")
  69. class CurlHTTPClientTestCase(AsyncHTTPTestCase):
  70. def setUp(self):
  71. super(CurlHTTPClientTestCase, self).setUp()
  72. self.http_client = self.create_client()
  73. def get_app(self):
  74. return Application(
  75. [
  76. ("/digest", DigestAuthHandler, {"username": "foo", "password": "bar"}),
  77. (
  78. "/digest_non_ascii",
  79. DigestAuthHandler,
  80. {"username": "foo", "password": "barユ£"},
  81. ),
  82. ("/custom_reason", CustomReasonHandler),
  83. ("/custom_fail_reason", CustomFailReasonHandler),
  84. ]
  85. )
  86. def create_client(self, **kwargs):
  87. return CurlAsyncHTTPClient(
  88. force_instance=True, defaults=dict(allow_ipv6=False), **kwargs
  89. )
  90. def test_digest_auth(self):
  91. response = self.fetch(
  92. "/digest", auth_mode="digest", auth_username="foo", auth_password="bar"
  93. )
  94. self.assertEqual(response.body, b"ok")
  95. def test_custom_reason(self):
  96. response = self.fetch("/custom_reason")
  97. self.assertEqual(response.reason, "Custom reason")
  98. def test_fail_custom_reason(self):
  99. response = self.fetch("/custom_fail_reason")
  100. self.assertEqual(str(response.error), "HTTP 400: Custom reason")
  101. def test_digest_auth_non_ascii(self):
  102. response = self.fetch(
  103. "/digest_non_ascii",
  104. auth_mode="digest",
  105. auth_username="foo",
  106. auth_password="barユ£",
  107. )
  108. self.assertEqual(response.body, b"ok")