process.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. #
  2. # Copyright 2011 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Utilities for working with multiple processes, including both forking
  16. the server into multiple processes and managing subprocesses.
  17. """
  18. import errno
  19. import os
  20. import multiprocessing
  21. import signal
  22. import subprocess
  23. import sys
  24. import time
  25. from binascii import hexlify
  26. from tornado.concurrent import (
  27. Future,
  28. future_set_result_unless_cancelled,
  29. future_set_exception_unless_cancelled,
  30. )
  31. from tornado import ioloop
  32. from tornado.iostream import PipeIOStream
  33. from tornado.log import gen_log
  34. from tornado.platform.auto import set_close_exec
  35. from tornado.util import errno_from_exception
  36. import typing
  37. from typing import Tuple, Optional, Any, Callable
  38. if typing.TYPE_CHECKING:
  39. from typing import List # noqa: F401
  40. # Re-export this exception for convenience.
  41. CalledProcessError = subprocess.CalledProcessError
  42. def cpu_count() -> int:
  43. """Returns the number of processors on this machine."""
  44. if multiprocessing is None:
  45. return 1
  46. try:
  47. return multiprocessing.cpu_count()
  48. except NotImplementedError:
  49. pass
  50. try:
  51. return os.sysconf("SC_NPROCESSORS_CONF")
  52. except (AttributeError, ValueError):
  53. pass
  54. gen_log.error("Could not detect number of processors; assuming 1")
  55. return 1
  56. def _reseed_random() -> None:
  57. if "random" not in sys.modules:
  58. return
  59. import random
  60. # If os.urandom is available, this method does the same thing as
  61. # random.seed (at least as of python 2.6). If os.urandom is not
  62. # available, we mix in the pid in addition to a timestamp.
  63. try:
  64. seed = int(hexlify(os.urandom(16)), 16)
  65. except NotImplementedError:
  66. seed = int(time.time() * 1000) ^ os.getpid()
  67. random.seed(seed)
  68. def _pipe_cloexec() -> Tuple[int, int]:
  69. r, w = os.pipe()
  70. set_close_exec(r)
  71. set_close_exec(w)
  72. return r, w
  73. _task_id = None
  74. def fork_processes(num_processes: Optional[int], max_restarts: int = None) -> int:
  75. """Starts multiple worker processes.
  76. If ``num_processes`` is None or <= 0, we detect the number of cores
  77. available on this machine and fork that number of child
  78. processes. If ``num_processes`` is given and > 0, we fork that
  79. specific number of sub-processes.
  80. Since we use processes and not threads, there is no shared memory
  81. between any server code.
  82. Note that multiple processes are not compatible with the autoreload
  83. module (or the ``autoreload=True`` option to `tornado.web.Application`
  84. which defaults to True when ``debug=True``).
  85. When using multiple processes, no IOLoops can be created or
  86. referenced until after the call to ``fork_processes``.
  87. In each child process, ``fork_processes`` returns its *task id*, a
  88. number between 0 and ``num_processes``. Processes that exit
  89. abnormally (due to a signal or non-zero exit status) are restarted
  90. with the same id (up to ``max_restarts`` times). In the parent
  91. process, ``fork_processes`` returns None if all child processes
  92. have exited normally, but will otherwise only exit by throwing an
  93. exception.
  94. max_restarts defaults to 100.
  95. """
  96. if max_restarts is None:
  97. max_restarts = 100
  98. global _task_id
  99. assert _task_id is None
  100. if num_processes is None or num_processes <= 0:
  101. num_processes = cpu_count()
  102. gen_log.info("Starting %d processes", num_processes)
  103. children = {}
  104. def start_child(i: int) -> Optional[int]:
  105. pid = os.fork()
  106. if pid == 0:
  107. # child process
  108. _reseed_random()
  109. global _task_id
  110. _task_id = i
  111. return i
  112. else:
  113. children[pid] = i
  114. return None
  115. for i in range(num_processes):
  116. id = start_child(i)
  117. if id is not None:
  118. return id
  119. num_restarts = 0
  120. while children:
  121. try:
  122. pid, status = os.wait()
  123. except OSError as e:
  124. if errno_from_exception(e) == errno.EINTR:
  125. continue
  126. raise
  127. if pid not in children:
  128. continue
  129. id = children.pop(pid)
  130. if os.WIFSIGNALED(status):
  131. gen_log.warning(
  132. "child %d (pid %d) killed by signal %d, restarting",
  133. id,
  134. pid,
  135. os.WTERMSIG(status),
  136. )
  137. elif os.WEXITSTATUS(status) != 0:
  138. gen_log.warning(
  139. "child %d (pid %d) exited with status %d, restarting",
  140. id,
  141. pid,
  142. os.WEXITSTATUS(status),
  143. )
  144. else:
  145. gen_log.info("child %d (pid %d) exited normally", id, pid)
  146. continue
  147. num_restarts += 1
  148. if num_restarts > max_restarts:
  149. raise RuntimeError("Too many child restarts, giving up")
  150. new_id = start_child(id)
  151. if new_id is not None:
  152. return new_id
  153. # All child processes exited cleanly, so exit the master process
  154. # instead of just returning to right after the call to
  155. # fork_processes (which will probably just start up another IOLoop
  156. # unless the caller checks the return value).
  157. sys.exit(0)
  158. def task_id() -> Optional[int]:
  159. """Returns the current task id, if any.
  160. Returns None if this process was not created by `fork_processes`.
  161. """
  162. global _task_id
  163. return _task_id
  164. class Subprocess(object):
  165. """Wraps ``subprocess.Popen`` with IOStream support.
  166. The constructor is the same as ``subprocess.Popen`` with the following
  167. additions:
  168. * ``stdin``, ``stdout``, and ``stderr`` may have the value
  169. ``tornado.process.Subprocess.STREAM``, which will make the corresponding
  170. attribute of the resulting Subprocess a `.PipeIOStream`. If this option
  171. is used, the caller is responsible for closing the streams when done
  172. with them.
  173. The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and
  174. ``wait_for_exit`` methods do not work on Windows. There is
  175. therefore no reason to use this class instead of
  176. ``subprocess.Popen`` on that platform.
  177. .. versionchanged:: 5.0
  178. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  179. """
  180. STREAM = object()
  181. _initialized = False
  182. _waiting = {} # type: ignore
  183. _old_sigchld = None
  184. def __init__(self, *args: Any, **kwargs: Any) -> None:
  185. self.io_loop = ioloop.IOLoop.current()
  186. # All FDs we create should be closed on error; those in to_close
  187. # should be closed in the parent process on success.
  188. pipe_fds = [] # type: List[int]
  189. to_close = [] # type: List[int]
  190. if kwargs.get("stdin") is Subprocess.STREAM:
  191. in_r, in_w = _pipe_cloexec()
  192. kwargs["stdin"] = in_r
  193. pipe_fds.extend((in_r, in_w))
  194. to_close.append(in_r)
  195. self.stdin = PipeIOStream(in_w)
  196. if kwargs.get("stdout") is Subprocess.STREAM:
  197. out_r, out_w = _pipe_cloexec()
  198. kwargs["stdout"] = out_w
  199. pipe_fds.extend((out_r, out_w))
  200. to_close.append(out_w)
  201. self.stdout = PipeIOStream(out_r)
  202. if kwargs.get("stderr") is Subprocess.STREAM:
  203. err_r, err_w = _pipe_cloexec()
  204. kwargs["stderr"] = err_w
  205. pipe_fds.extend((err_r, err_w))
  206. to_close.append(err_w)
  207. self.stderr = PipeIOStream(err_r)
  208. try:
  209. self.proc = subprocess.Popen(*args, **kwargs)
  210. except:
  211. for fd in pipe_fds:
  212. os.close(fd)
  213. raise
  214. for fd in to_close:
  215. os.close(fd)
  216. self.pid = self.proc.pid
  217. for attr in ["stdin", "stdout", "stderr"]:
  218. if not hasattr(self, attr): # don't clobber streams set above
  219. setattr(self, attr, getattr(self.proc, attr))
  220. self._exit_callback = None # type: Optional[Callable[[int], None]]
  221. self.returncode = None # type: Optional[int]
  222. def set_exit_callback(self, callback: Callable[[int], None]) -> None:
  223. """Runs ``callback`` when this process exits.
  224. The callback takes one argument, the return code of the process.
  225. This method uses a ``SIGCHLD`` handler, which is a global setting
  226. and may conflict if you have other libraries trying to handle the
  227. same signal. If you are using more than one ``IOLoop`` it may
  228. be necessary to call `Subprocess.initialize` first to designate
  229. one ``IOLoop`` to run the signal handlers.
  230. In many cases a close callback on the stdout or stderr streams
  231. can be used as an alternative to an exit callback if the
  232. signal handler is causing a problem.
  233. """
  234. self._exit_callback = callback
  235. Subprocess.initialize()
  236. Subprocess._waiting[self.pid] = self
  237. Subprocess._try_cleanup_process(self.pid)
  238. def wait_for_exit(self, raise_error: bool = True) -> "Future[int]":
  239. """Returns a `.Future` which resolves when the process exits.
  240. Usage::
  241. ret = yield proc.wait_for_exit()
  242. This is a coroutine-friendly alternative to `set_exit_callback`
  243. (and a replacement for the blocking `subprocess.Popen.wait`).
  244. By default, raises `subprocess.CalledProcessError` if the process
  245. has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
  246. to suppress this behavior and return the exit status without raising.
  247. .. versionadded:: 4.2
  248. """
  249. future = Future() # type: Future[int]
  250. def callback(ret: int) -> None:
  251. if ret != 0 and raise_error:
  252. # Unfortunately we don't have the original args any more.
  253. future_set_exception_unless_cancelled(
  254. future, CalledProcessError(ret, "unknown")
  255. )
  256. else:
  257. future_set_result_unless_cancelled(future, ret)
  258. self.set_exit_callback(callback)
  259. return future
  260. @classmethod
  261. def initialize(cls) -> None:
  262. """Initializes the ``SIGCHLD`` handler.
  263. The signal handler is run on an `.IOLoop` to avoid locking issues.
  264. Note that the `.IOLoop` used for signal handling need not be the
  265. same one used by individual Subprocess objects (as long as the
  266. ``IOLoops`` are each running in separate threads).
  267. .. versionchanged:: 5.0
  268. The ``io_loop`` argument (deprecated since version 4.1) has been
  269. removed.
  270. """
  271. if cls._initialized:
  272. return
  273. io_loop = ioloop.IOLoop.current()
  274. cls._old_sigchld = signal.signal(
  275. signal.SIGCHLD,
  276. lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup),
  277. )
  278. cls._initialized = True
  279. @classmethod
  280. def uninitialize(cls) -> None:
  281. """Removes the ``SIGCHLD`` handler."""
  282. if not cls._initialized:
  283. return
  284. signal.signal(signal.SIGCHLD, cls._old_sigchld)
  285. cls._initialized = False
  286. @classmethod
  287. def _cleanup(cls) -> None:
  288. for pid in list(cls._waiting.keys()): # make a copy
  289. cls._try_cleanup_process(pid)
  290. @classmethod
  291. def _try_cleanup_process(cls, pid: int) -> None:
  292. try:
  293. ret_pid, status = os.waitpid(pid, os.WNOHANG)
  294. except OSError as e:
  295. if errno_from_exception(e) == errno.ECHILD:
  296. return
  297. if ret_pid == 0:
  298. return
  299. assert ret_pid == pid
  300. subproc = cls._waiting.pop(pid)
  301. subproc.io_loop.add_callback_from_signal(subproc._set_returncode, status)
  302. def _set_returncode(self, status: int) -> None:
  303. if os.WIFSIGNALED(status):
  304. self.returncode = -os.WTERMSIG(status)
  305. else:
  306. assert os.WIFEXITED(status)
  307. self.returncode = os.WEXITSTATUS(status)
  308. # We've taken over wait() duty from the subprocess.Popen
  309. # object. If we don't inform it of the process's return code,
  310. # it will log a warning at destruction in python 3.6+.
  311. self.proc.returncode = self.returncode
  312. if self._exit_callback:
  313. callback = self._exit_callback
  314. self._exit_callback = None
  315. callback(self.returncode)