| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- # coding: utf-8
- from hashlib import md5
- import unittest
- from tornado.escape import utf8
- from tornado.testing import AsyncHTTPTestCase
- from tornado.test import httpclient_test
- from tornado.web import Application, RequestHandler
- try:
- import pycurl # type: ignore
- except ImportError:
- pycurl = None
- if pycurl is not None:
- from tornado.curl_httpclient import CurlAsyncHTTPClient
- @unittest.skipIf(pycurl is None, "pycurl module not present")
- class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
- def get_http_client(self):
- client = CurlAsyncHTTPClient(defaults=dict(allow_ipv6=False))
- # make sure AsyncHTTPClient magic doesn't give us the wrong class
- self.assertTrue(isinstance(client, CurlAsyncHTTPClient))
- return client
- class DigestAuthHandler(RequestHandler):
- def initialize(self, username, password):
- self.username = username
- self.password = password
- def get(self):
- realm = "test"
- opaque = "asdf"
- # Real implementations would use a random nonce.
- nonce = "1234"
- auth_header = self.request.headers.get("Authorization", None)
- if auth_header is not None:
- auth_mode, params = auth_header.split(" ", 1)
- assert auth_mode == "Digest"
- param_dict = {}
- for pair in params.split(","):
- k, v = pair.strip().split("=", 1)
- if v[0] == '"' and v[-1] == '"':
- v = v[1:-1]
- param_dict[k] = v
- assert param_dict["realm"] == realm
- assert param_dict["opaque"] == opaque
- assert param_dict["nonce"] == nonce
- assert param_dict["username"] == self.username
- assert param_dict["uri"] == self.request.path
- h1 = md5(
- utf8("%s:%s:%s" % (self.username, realm, self.password))
- ).hexdigest()
- h2 = md5(
- utf8("%s:%s" % (self.request.method, self.request.path))
- ).hexdigest()
- digest = md5(utf8("%s:%s:%s" % (h1, nonce, h2))).hexdigest()
- if digest == param_dict["response"]:
- self.write("ok")
- else:
- self.write("fail")
- else:
- self.set_status(401)
- self.set_header(
- "WWW-Authenticate",
- 'Digest realm="%s", nonce="%s", opaque="%s"' % (realm, nonce, opaque),
- )
- class CustomReasonHandler(RequestHandler):
- def get(self):
- self.set_status(200, "Custom reason")
- class CustomFailReasonHandler(RequestHandler):
- def get(self):
- self.set_status(400, "Custom reason")
- @unittest.skipIf(pycurl is None, "pycurl module not present")
- class CurlHTTPClientTestCase(AsyncHTTPTestCase):
- def setUp(self):
- super(CurlHTTPClientTestCase, self).setUp()
- self.http_client = self.create_client()
- def get_app(self):
- return Application(
- [
- ("/digest", DigestAuthHandler, {"username": "foo", "password": "bar"}),
- (
- "/digest_non_ascii",
- DigestAuthHandler,
- {"username": "foo", "password": "barユ£"},
- ),
- ("/custom_reason", CustomReasonHandler),
- ("/custom_fail_reason", CustomFailReasonHandler),
- ]
- )
- def create_client(self, **kwargs):
- return CurlAsyncHTTPClient(
- force_instance=True, defaults=dict(allow_ipv6=False), **kwargs
- )
- def test_digest_auth(self):
- response = self.fetch(
- "/digest", auth_mode="digest", auth_username="foo", auth_password="bar"
- )
- self.assertEqual(response.body, b"ok")
- def test_custom_reason(self):
- response = self.fetch("/custom_reason")
- self.assertEqual(response.reason, "Custom reason")
- def test_fail_custom_reason(self):
- response = self.fetch("/custom_fail_reason")
- self.assertEqual(str(response.error), "HTTP 400: Custom reason")
- def test_digest_auth_non_ascii(self):
- response = self.fetch(
- "/digest_non_ascii",
- auth_mode="digest",
- auth_username="foo",
- auth_password="barユ£",
- )
- self.assertEqual(response.body, b"ok")
|