| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- #
- # Copyright 2012 Facebook
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- import contextlib
- import glob
- import logging
- import os
- import re
- import subprocess
- import sys
- import tempfile
- import unittest
- import warnings
- from tornado.escape import utf8
- from tornado.log import LogFormatter, define_logging_options, enable_pretty_logging
- from tornado.options import OptionParser
- from tornado.util import basestring_type
- @contextlib.contextmanager
- def ignore_bytes_warning():
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", category=BytesWarning)
- yield
- class LogFormatterTest(unittest.TestCase):
- # Matches the output of a single logging call (which may be multiple lines
- # if a traceback was included, so we use the DOTALL option)
- LINE_RE = re.compile(
- b"(?s)\x01\\[E [0-9]{6} [0-9]{2}:[0-9]{2}:[0-9]{2} log_test:[0-9]+\\]\x02 (.*)"
- )
- def setUp(self):
- self.formatter = LogFormatter(color=False)
- # Fake color support. We can't guarantee anything about the $TERM
- # variable when the tests are run, so just patch in some values
- # for testing. (testing with color off fails to expose some potential
- # encoding issues from the control characters)
- self.formatter._colors = {logging.ERROR: u"\u0001"}
- self.formatter._normal = u"\u0002"
- # construct a Logger directly to bypass getLogger's caching
- self.logger = logging.Logger("LogFormatterTest")
- self.logger.propagate = False
- self.tempdir = tempfile.mkdtemp()
- self.filename = os.path.join(self.tempdir, "log.out")
- self.handler = self.make_handler(self.filename)
- self.handler.setFormatter(self.formatter)
- self.logger.addHandler(self.handler)
- def tearDown(self):
- self.handler.close()
- os.unlink(self.filename)
- os.rmdir(self.tempdir)
- def make_handler(self, filename):
- # Base case: default setup without explicit encoding.
- # In python 2, supports arbitrary byte strings and unicode objects
- # that contain only ascii. In python 3, supports ascii-only unicode
- # strings (but byte strings will be repr'd automatically).
- return logging.FileHandler(filename)
- def get_output(self):
- with open(self.filename, "rb") as f:
- line = f.read().strip()
- m = LogFormatterTest.LINE_RE.match(line)
- if m:
- return m.group(1)
- else:
- raise Exception("output didn't match regex: %r" % line)
- def test_basic_logging(self):
- self.logger.error("foo")
- self.assertEqual(self.get_output(), b"foo")
- def test_bytes_logging(self):
- with ignore_bytes_warning():
- # This will be "\xe9" on python 2 or "b'\xe9'" on python 3
- self.logger.error(b"\xe9")
- self.assertEqual(self.get_output(), utf8(repr(b"\xe9")))
- def test_utf8_logging(self):
- with ignore_bytes_warning():
- self.logger.error(u"\u00e9".encode("utf8"))
- if issubclass(bytes, basestring_type):
- # on python 2, utf8 byte strings (and by extension ascii byte
- # strings) are passed through as-is.
- self.assertEqual(self.get_output(), utf8(u"\u00e9"))
- else:
- # on python 3, byte strings always get repr'd even if
- # they're ascii-only, so this degenerates into another
- # copy of test_bytes_logging.
- self.assertEqual(self.get_output(), utf8(repr(utf8(u"\u00e9"))))
- def test_bytes_exception_logging(self):
- try:
- raise Exception(b"\xe9")
- except Exception:
- self.logger.exception("caught exception")
- # This will be "Exception: \xe9" on python 2 or
- # "Exception: b'\xe9'" on python 3.
- output = self.get_output()
- self.assertRegexpMatches(output, br"Exception.*\\xe9")
- # The traceback contains newlines, which should not have been escaped.
- self.assertNotIn(br"\n", output)
- class UnicodeLogFormatterTest(LogFormatterTest):
- def make_handler(self, filename):
- # Adding an explicit encoding configuration allows non-ascii unicode
- # strings in both python 2 and 3, without changing the behavior
- # for byte strings.
- return logging.FileHandler(filename, encoding="utf8")
- def test_unicode_logging(self):
- self.logger.error(u"\u00e9")
- self.assertEqual(self.get_output(), utf8(u"\u00e9"))
- class EnablePrettyLoggingTest(unittest.TestCase):
- def setUp(self):
- super(EnablePrettyLoggingTest, self).setUp()
- self.options = OptionParser()
- define_logging_options(self.options)
- self.logger = logging.Logger("tornado.test.log_test.EnablePrettyLoggingTest")
- self.logger.propagate = False
- def test_log_file(self):
- tmpdir = tempfile.mkdtemp()
- try:
- self.options.log_file_prefix = tmpdir + "/test_log"
- enable_pretty_logging(options=self.options, logger=self.logger)
- self.assertEqual(1, len(self.logger.handlers))
- self.logger.error("hello")
- self.logger.handlers[0].flush()
- filenames = glob.glob(tmpdir + "/test_log*")
- self.assertEqual(1, len(filenames))
- with open(filenames[0]) as f:
- self.assertRegexpMatches(f.read(), r"^\[E [^]]*\] hello$")
- finally:
- for handler in self.logger.handlers:
- handler.flush()
- handler.close()
- for filename in glob.glob(tmpdir + "/test_log*"):
- os.unlink(filename)
- os.rmdir(tmpdir)
- def test_log_file_with_timed_rotating(self):
- tmpdir = tempfile.mkdtemp()
- try:
- self.options.log_file_prefix = tmpdir + "/test_log"
- self.options.log_rotate_mode = "time"
- enable_pretty_logging(options=self.options, logger=self.logger)
- self.logger.error("hello")
- self.logger.handlers[0].flush()
- filenames = glob.glob(tmpdir + "/test_log*")
- self.assertEqual(1, len(filenames))
- with open(filenames[0]) as f:
- self.assertRegexpMatches(f.read(), r"^\[E [^]]*\] hello$")
- finally:
- for handler in self.logger.handlers:
- handler.flush()
- handler.close()
- for filename in glob.glob(tmpdir + "/test_log*"):
- os.unlink(filename)
- os.rmdir(tmpdir)
- def test_wrong_rotate_mode_value(self):
- try:
- self.options.log_file_prefix = "some_path"
- self.options.log_rotate_mode = "wrong_mode"
- self.assertRaises(
- ValueError,
- enable_pretty_logging,
- options=self.options,
- logger=self.logger,
- )
- finally:
- for handler in self.logger.handlers:
- handler.flush()
- handler.close()
- class LoggingOptionTest(unittest.TestCase):
- """Test the ability to enable and disable Tornado's logging hooks."""
- def logs_present(self, statement, args=None):
- # Each test may manipulate and/or parse the options and then logs
- # a line at the 'info' level. This level is ignored in the
- # logging module by default, but Tornado turns it on by default
- # so it is the easiest way to tell whether tornado's logging hooks
- # ran.
- IMPORT = "from tornado.options import options, parse_command_line"
- LOG_INFO = 'import logging; logging.info("hello")'
- program = ";".join([IMPORT, statement, LOG_INFO])
- proc = subprocess.Popen(
- [sys.executable, "-c", program] + (args or []),
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- )
- stdout, stderr = proc.communicate()
- self.assertEqual(proc.returncode, 0, "process failed: %r" % stdout)
- return b"hello" in stdout
- def test_default(self):
- self.assertFalse(self.logs_present("pass"))
- def test_tornado_default(self):
- self.assertTrue(self.logs_present("parse_command_line()"))
- def test_disable_command_line(self):
- self.assertFalse(self.logs_present("parse_command_line()", ["--logging=none"]))
- def test_disable_command_line_case_insensitive(self):
- self.assertFalse(self.logs_present("parse_command_line()", ["--logging=None"]))
- def test_disable_code_string(self):
- self.assertFalse(
- self.logs_present('options.logging = "none"; parse_command_line()')
- )
- def test_disable_code_none(self):
- self.assertFalse(
- self.logs_present("options.logging = None; parse_command_line()")
- )
- def test_disable_override(self):
- # command line trumps code defaults
- self.assertTrue(
- self.logs_present(
- "options.logging = None; parse_command_line()", ["--logging=info"]
- )
- )
|