log_test.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. #
  2. # Copyright 2012 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import contextlib
  16. import glob
  17. import logging
  18. import os
  19. import re
  20. import subprocess
  21. import sys
  22. import tempfile
  23. import unittest
  24. import warnings
  25. from tornado.escape import utf8
  26. from tornado.log import LogFormatter, define_logging_options, enable_pretty_logging
  27. from tornado.options import OptionParser
  28. from tornado.util import basestring_type
  29. @contextlib.contextmanager
  30. def ignore_bytes_warning():
  31. with warnings.catch_warnings():
  32. warnings.simplefilter("ignore", category=BytesWarning)
  33. yield
  34. class LogFormatterTest(unittest.TestCase):
  35. # Matches the output of a single logging call (which may be multiple lines
  36. # if a traceback was included, so we use the DOTALL option)
  37. LINE_RE = re.compile(
  38. b"(?s)\x01\\[E [0-9]{6} [0-9]{2}:[0-9]{2}:[0-9]{2} log_test:[0-9]+\\]\x02 (.*)"
  39. )
  40. def setUp(self):
  41. self.formatter = LogFormatter(color=False)
  42. # Fake color support. We can't guarantee anything about the $TERM
  43. # variable when the tests are run, so just patch in some values
  44. # for testing. (testing with color off fails to expose some potential
  45. # encoding issues from the control characters)
  46. self.formatter._colors = {logging.ERROR: u"\u0001"}
  47. self.formatter._normal = u"\u0002"
  48. # construct a Logger directly to bypass getLogger's caching
  49. self.logger = logging.Logger("LogFormatterTest")
  50. self.logger.propagate = False
  51. self.tempdir = tempfile.mkdtemp()
  52. self.filename = os.path.join(self.tempdir, "log.out")
  53. self.handler = self.make_handler(self.filename)
  54. self.handler.setFormatter(self.formatter)
  55. self.logger.addHandler(self.handler)
  56. def tearDown(self):
  57. self.handler.close()
  58. os.unlink(self.filename)
  59. os.rmdir(self.tempdir)
  60. def make_handler(self, filename):
  61. # Base case: default setup without explicit encoding.
  62. # In python 2, supports arbitrary byte strings and unicode objects
  63. # that contain only ascii. In python 3, supports ascii-only unicode
  64. # strings (but byte strings will be repr'd automatically).
  65. return logging.FileHandler(filename)
  66. def get_output(self):
  67. with open(self.filename, "rb") as f:
  68. line = f.read().strip()
  69. m = LogFormatterTest.LINE_RE.match(line)
  70. if m:
  71. return m.group(1)
  72. else:
  73. raise Exception("output didn't match regex: %r" % line)
  74. def test_basic_logging(self):
  75. self.logger.error("foo")
  76. self.assertEqual(self.get_output(), b"foo")
  77. def test_bytes_logging(self):
  78. with ignore_bytes_warning():
  79. # This will be "\xe9" on python 2 or "b'\xe9'" on python 3
  80. self.logger.error(b"\xe9")
  81. self.assertEqual(self.get_output(), utf8(repr(b"\xe9")))
  82. def test_utf8_logging(self):
  83. with ignore_bytes_warning():
  84. self.logger.error(u"\u00e9".encode("utf8"))
  85. if issubclass(bytes, basestring_type):
  86. # on python 2, utf8 byte strings (and by extension ascii byte
  87. # strings) are passed through as-is.
  88. self.assertEqual(self.get_output(), utf8(u"\u00e9"))
  89. else:
  90. # on python 3, byte strings always get repr'd even if
  91. # they're ascii-only, so this degenerates into another
  92. # copy of test_bytes_logging.
  93. self.assertEqual(self.get_output(), utf8(repr(utf8(u"\u00e9"))))
  94. def test_bytes_exception_logging(self):
  95. try:
  96. raise Exception(b"\xe9")
  97. except Exception:
  98. self.logger.exception("caught exception")
  99. # This will be "Exception: \xe9" on python 2 or
  100. # "Exception: b'\xe9'" on python 3.
  101. output = self.get_output()
  102. self.assertRegexpMatches(output, br"Exception.*\\xe9")
  103. # The traceback contains newlines, which should not have been escaped.
  104. self.assertNotIn(br"\n", output)
  105. class UnicodeLogFormatterTest(LogFormatterTest):
  106. def make_handler(self, filename):
  107. # Adding an explicit encoding configuration allows non-ascii unicode
  108. # strings in both python 2 and 3, without changing the behavior
  109. # for byte strings.
  110. return logging.FileHandler(filename, encoding="utf8")
  111. def test_unicode_logging(self):
  112. self.logger.error(u"\u00e9")
  113. self.assertEqual(self.get_output(), utf8(u"\u00e9"))
  114. class EnablePrettyLoggingTest(unittest.TestCase):
  115. def setUp(self):
  116. super(EnablePrettyLoggingTest, self).setUp()
  117. self.options = OptionParser()
  118. define_logging_options(self.options)
  119. self.logger = logging.Logger("tornado.test.log_test.EnablePrettyLoggingTest")
  120. self.logger.propagate = False
  121. def test_log_file(self):
  122. tmpdir = tempfile.mkdtemp()
  123. try:
  124. self.options.log_file_prefix = tmpdir + "/test_log"
  125. enable_pretty_logging(options=self.options, logger=self.logger)
  126. self.assertEqual(1, len(self.logger.handlers))
  127. self.logger.error("hello")
  128. self.logger.handlers[0].flush()
  129. filenames = glob.glob(tmpdir + "/test_log*")
  130. self.assertEqual(1, len(filenames))
  131. with open(filenames[0]) as f:
  132. self.assertRegexpMatches(f.read(), r"^\[E [^]]*\] hello$")
  133. finally:
  134. for handler in self.logger.handlers:
  135. handler.flush()
  136. handler.close()
  137. for filename in glob.glob(tmpdir + "/test_log*"):
  138. os.unlink(filename)
  139. os.rmdir(tmpdir)
  140. def test_log_file_with_timed_rotating(self):
  141. tmpdir = tempfile.mkdtemp()
  142. try:
  143. self.options.log_file_prefix = tmpdir + "/test_log"
  144. self.options.log_rotate_mode = "time"
  145. enable_pretty_logging(options=self.options, logger=self.logger)
  146. self.logger.error("hello")
  147. self.logger.handlers[0].flush()
  148. filenames = glob.glob(tmpdir + "/test_log*")
  149. self.assertEqual(1, len(filenames))
  150. with open(filenames[0]) as f:
  151. self.assertRegexpMatches(f.read(), r"^\[E [^]]*\] hello$")
  152. finally:
  153. for handler in self.logger.handlers:
  154. handler.flush()
  155. handler.close()
  156. for filename in glob.glob(tmpdir + "/test_log*"):
  157. os.unlink(filename)
  158. os.rmdir(tmpdir)
  159. def test_wrong_rotate_mode_value(self):
  160. try:
  161. self.options.log_file_prefix = "some_path"
  162. self.options.log_rotate_mode = "wrong_mode"
  163. self.assertRaises(
  164. ValueError,
  165. enable_pretty_logging,
  166. options=self.options,
  167. logger=self.logger,
  168. )
  169. finally:
  170. for handler in self.logger.handlers:
  171. handler.flush()
  172. handler.close()
  173. class LoggingOptionTest(unittest.TestCase):
  174. """Test the ability to enable and disable Tornado's logging hooks."""
  175. def logs_present(self, statement, args=None):
  176. # Each test may manipulate and/or parse the options and then logs
  177. # a line at the 'info' level. This level is ignored in the
  178. # logging module by default, but Tornado turns it on by default
  179. # so it is the easiest way to tell whether tornado's logging hooks
  180. # ran.
  181. IMPORT = "from tornado.options import options, parse_command_line"
  182. LOG_INFO = 'import logging; logging.info("hello")'
  183. program = ";".join([IMPORT, statement, LOG_INFO])
  184. proc = subprocess.Popen(
  185. [sys.executable, "-c", program] + (args or []),
  186. stdout=subprocess.PIPE,
  187. stderr=subprocess.STDOUT,
  188. )
  189. stdout, stderr = proc.communicate()
  190. self.assertEqual(proc.returncode, 0, "process failed: %r" % stdout)
  191. return b"hello" in stdout
  192. def test_default(self):
  193. self.assertFalse(self.logs_present("pass"))
  194. def test_tornado_default(self):
  195. self.assertTrue(self.logs_present("parse_command_line()"))
  196. def test_disable_command_line(self):
  197. self.assertFalse(self.logs_present("parse_command_line()", ["--logging=none"]))
  198. def test_disable_command_line_case_insensitive(self):
  199. self.assertFalse(self.logs_present("parse_command_line()", ["--logging=None"]))
  200. def test_disable_code_string(self):
  201. self.assertFalse(
  202. self.logs_present('options.logging = "none"; parse_command_line()')
  203. )
  204. def test_disable_code_none(self):
  205. self.assertFalse(
  206. self.logs_present("options.logging = None; parse_command_line()")
  207. )
  208. def test_disable_override(self):
  209. # command line trumps code defaults
  210. self.assertTrue(
  211. self.logs_present(
  212. "options.logging = None; parse_command_line()", ["--logging=info"]
  213. )
  214. )