resource_tracker.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. ###############################################################################
  2. # Server process to keep track of unlinked resources, like folders and
  3. # semaphores and clean them.
  4. #
  5. # author: Thomas Moreau
  6. #
  7. # adapted from multiprocessing/semaphore_tracker.py (17/02/2017)
  8. # * include custom spawnv_passfds to start the process
  9. # * use custom unlink from our own SemLock implementation
  10. # * add some VERBOSE logging
  11. #
  12. #
  13. # On Unix we run a server process which keeps track of unlinked
  14. # resources. The server ignores SIGINT and SIGTERM and reads from a
  15. # pipe. The resource_tracker implements a reference counting scheme: each time
  16. # a Python process anticipates the shared usage of a resource by another
  17. # process, it signals the resource_tracker of this shared usage, and in return,
  18. # the resource_tracker increments the resource's reference count by 1.
  19. # Similarly, when access to a resource is closed by a Python process, the
  20. # process notifies the resource_tracker by asking it to decrement the
  21. # resource's reference count by 1. When the reference count drops to 0, the
  22. # resource_tracker attempts to clean up the underlying resource.
  23. # Finally, every other process connected to the resource tracker has a copy of
  24. # the writable end of the pipe used to communicate with it, so the resource
  25. # tracker gets EOF when all other processes have exited. Then the
  26. # resource_tracker process unlinks any remaining leaked resources (with
  27. # reference count above 0)
  28. # For semaphores, this is important because the system only supports a limited
  29. # number of named semaphores, and they will not be automatically removed till
  30. # the next reboot. Without this resource tracker process, "killall python"
  31. # would probably leave unlinked semaphores.
  32. # Note that this behavior differs from CPython's resource_tracker, which only
  33. # implements list of shared resources, and not a proper refcounting scheme.
  34. # Also, CPython's resource tracker will only attempt to cleanup those shared
  35. # resources once all procsses connected to the resouce tracker have exited.
  36. import os
  37. import shutil
  38. import sys
  39. import signal
  40. import warnings
  41. import threading
  42. from . import spawn
  43. from multiprocessing import util
  44. if sys.platform == "win32":
  45. from .compat_win32 import _winapi
  46. from .reduction import duplicate
  47. import msvcrt
  48. try:
  49. from _multiprocessing import sem_unlink
  50. except ImportError:
  51. from .semlock import sem_unlink
  52. if sys.version_info < (3,):
  53. BrokenPipeError = OSError
  54. from os import fdopen as open
  55. __all__ = ['ensure_running', 'register', 'unregister']
  56. _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
  57. _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)
  58. _CLEANUP_FUNCS = {
  59. 'folder': shutil.rmtree,
  60. 'file': os.unlink
  61. }
  62. if os.name == "posix":
  63. _CLEANUP_FUNCS['semlock'] = sem_unlink
  64. VERBOSE = False
  65. class ResourceTracker(object):
  66. def __init__(self):
  67. self._lock = threading.Lock()
  68. self._fd = None
  69. self._pid = None
  70. def getfd(self):
  71. self.ensure_running()
  72. return self._fd
  73. def ensure_running(self):
  74. '''Make sure that resource tracker process is running.
  75. This can be run from any process. Usually a child process will use
  76. the resource created by its parent.'''
  77. with self._lock:
  78. if self._fd is not None:
  79. # resource tracker was launched before, is it still running?
  80. if self._check_alive():
  81. # => still alive
  82. return
  83. # => dead, launch it again
  84. os.close(self._fd)
  85. if os.name == "posix":
  86. try:
  87. # At this point, the resource_tracker process has been
  88. # killed or crashed. Let's remove the process entry
  89. # from the process table to avoid zombie processes.
  90. os.waitpid(self._pid, 0)
  91. except OSError:
  92. # The process was terminated or is a child from an
  93. # ancestor of the current process.
  94. pass
  95. self._fd = None
  96. self._pid = None
  97. warnings.warn('resource_tracker: process died unexpectedly, '
  98. 'relaunching. Some folders/sempahores might '
  99. 'leak.')
  100. fds_to_pass = []
  101. try:
  102. fds_to_pass.append(sys.stderr.fileno())
  103. except Exception:
  104. pass
  105. r, w = os.pipe()
  106. if sys.platform == "win32":
  107. _r = duplicate(msvcrt.get_osfhandle(r), inheritable=True)
  108. os.close(r)
  109. r = _r
  110. cmd = 'from {} import main; main({}, {})'.format(
  111. main.__module__, r, VERBOSE)
  112. try:
  113. fds_to_pass.append(r)
  114. # process will out live us, so no need to wait on pid
  115. exe = spawn.get_executable()
  116. args = [exe] + util._args_from_interpreter_flags()
  117. # In python 3.3, there is a bug which put `-RRRRR..` instead of
  118. # `-R` in args. Replace it to get the correct flags.
  119. # See https://github.com/python/cpython/blob/3.3/Lib/subprocess.py#L488
  120. if sys.version_info[:2] <= (3, 3):
  121. import re
  122. for i in range(1, len(args)):
  123. args[i] = re.sub("-R+", "-R", args[i])
  124. args += ['-c', cmd]
  125. util.debug("launching resource tracker: {}".format(args))
  126. # bpo-33613: Register a signal mask that will block the
  127. # signals. This signal mask will be inherited by the child
  128. # that is going to be spawned and will protect the child from a
  129. # race condition that can make the child die before it
  130. # registers signal handlers for SIGINT and SIGTERM. The mask is
  131. # unregistered after spawning the child.
  132. try:
  133. if _HAVE_SIGMASK:
  134. signal.pthread_sigmask(signal.SIG_BLOCK,
  135. _IGNORED_SIGNALS)
  136. pid = spawnv_passfds(exe, args, fds_to_pass)
  137. finally:
  138. if _HAVE_SIGMASK:
  139. signal.pthread_sigmask(signal.SIG_UNBLOCK,
  140. _IGNORED_SIGNALS)
  141. except BaseException:
  142. os.close(w)
  143. raise
  144. else:
  145. self._fd = w
  146. self._pid = pid
  147. finally:
  148. if sys.platform == "win32":
  149. _winapi.CloseHandle(r)
  150. else:
  151. os.close(r)
  152. def _check_alive(self):
  153. '''Check for the existence of the resource tracker process.'''
  154. try:
  155. self._send('PROBE', '', '')
  156. except BrokenPipeError:
  157. return False
  158. else:
  159. return True
  160. def register(self, name, rtype):
  161. '''Register a named resource, and increment its refcount.'''
  162. self.ensure_running()
  163. self._send('REGISTER', name, rtype)
  164. def unregister(self, name, rtype):
  165. '''Unregister a named resource with resource tracker.'''
  166. self.ensure_running()
  167. self._send('UNREGISTER', name, rtype)
  168. def maybe_unlink(self, name, rtype):
  169. '''Decrement the refcount of a resource, and delete it if it hits 0'''
  170. self.ensure_running()
  171. self._send("MAYBE_UNLINK", name, rtype)
  172. def _send(self, cmd, name, rtype):
  173. msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
  174. if len(name) > 512:
  175. # posix guarantees that writes to a pipe of less than PIPE_BUF
  176. # bytes are atomic, and that PIPE_BUF >= 512
  177. raise ValueError('name too long')
  178. nbytes = os.write(self._fd, msg)
  179. assert nbytes == len(msg)
  180. _resource_tracker = ResourceTracker()
  181. ensure_running = _resource_tracker.ensure_running
  182. register = _resource_tracker.register
  183. maybe_unlink = _resource_tracker.maybe_unlink
  184. unregister = _resource_tracker.unregister
  185. getfd = _resource_tracker.getfd
  186. def main(fd, verbose=0):
  187. '''Run resource tracker.'''
  188. # protect the process from ^C and "killall python" etc
  189. if verbose:
  190. util.log_to_stderr(level=util.DEBUG)
  191. signal.signal(signal.SIGINT, signal.SIG_IGN)
  192. signal.signal(signal.SIGTERM, signal.SIG_IGN)
  193. if _HAVE_SIGMASK:
  194. signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
  195. for f in (sys.stdin, sys.stdout):
  196. try:
  197. f.close()
  198. except Exception:
  199. pass
  200. if verbose:
  201. util.debug("Main resource tracker is running")
  202. registry = {rtype: dict() for rtype in _CLEANUP_FUNCS.keys()}
  203. try:
  204. # keep track of registered/unregistered resources
  205. if sys.platform == "win32":
  206. fd = msvcrt.open_osfhandle(fd, os.O_RDONLY)
  207. with open(fd, 'rb') as f:
  208. while True:
  209. line = f.readline()
  210. if line == b'': # EOF
  211. break
  212. try:
  213. splitted = line.strip().decode('ascii').split(':')
  214. # name can potentially contain separator symbols (for
  215. # instance folders on Windows)
  216. cmd, name, rtype = (
  217. splitted[0], ':'.join(splitted[1:-1]), splitted[-1])
  218. if cmd == 'PROBE':
  219. continue
  220. if rtype not in _CLEANUP_FUNCS:
  221. raise ValueError(
  222. 'Cannot register {} for automatic cleanup: '
  223. 'unknown resource type ({}). Resource type should '
  224. 'be one of the following: {}'.format(
  225. name, rtype, list(_CLEANUP_FUNCS.keys())))
  226. if cmd == 'REGISTER':
  227. if name not in registry[rtype]:
  228. registry[rtype][name] = 1
  229. else:
  230. registry[rtype][name] += 1
  231. if verbose:
  232. util.debug(
  233. "[ResourceTracker] incremented refcount of {} "
  234. "{} (current {})".format(
  235. rtype, name, registry[rtype][name]))
  236. elif cmd == 'UNREGISTER':
  237. del registry[rtype][name]
  238. if verbose:
  239. util.debug(
  240. "[ResourceTracker] unregister {} {}: "
  241. "registry({})".format(name, rtype, len(registry)))
  242. elif cmd == 'MAYBE_UNLINK':
  243. registry[rtype][name] -= 1
  244. if verbose:
  245. util.debug(
  246. "[ResourceTracker] decremented refcount of {} "
  247. "{} (current {})".format(
  248. rtype, name, registry[rtype][name]))
  249. if registry[rtype][name] == 0:
  250. del registry[rtype][name]
  251. try:
  252. if verbose:
  253. util.debug(
  254. "[ResourceTracker] unlink {}"
  255. .format(name))
  256. _CLEANUP_FUNCS[rtype](name)
  257. except Exception as e:
  258. warnings.warn(
  259. 'resource_tracker: %s: %r' % (name, e))
  260. else:
  261. raise RuntimeError('unrecognized command %r' % cmd)
  262. except BaseException:
  263. try:
  264. sys.excepthook(*sys.exc_info())
  265. except BaseException:
  266. pass
  267. finally:
  268. # all processes have terminated; cleanup any remaining resources
  269. def _unlink_resources(rtype_registry, rtype):
  270. if rtype_registry:
  271. try:
  272. warnings.warn('resource_tracker: There appear to be %d '
  273. 'leaked %s objects to clean up at shutdown' %
  274. (len(rtype_registry), rtype))
  275. except Exception:
  276. pass
  277. for name in rtype_registry:
  278. # For some reason the process which created and registered this
  279. # resource has failed to unregister it. Presumably it has
  280. # died. We therefore clean it up.
  281. try:
  282. _CLEANUP_FUNCS[rtype](name)
  283. if verbose:
  284. util.debug("[ResourceTracker] unlink {}"
  285. .format(name))
  286. except Exception as e:
  287. warnings.warn('resource_tracker: %s: %r' % (name, e))
  288. for rtype, rtype_registry in registry.items():
  289. if rtype == "folder":
  290. continue
  291. else:
  292. _unlink_resources(rtype_registry, rtype)
  293. # The default cleanup routine for folders deletes everything inside
  294. # those folders recursively, which can include other resources tracked
  295. # by the resource tracker). To limit the risk of the resource tracker
  296. # attempting to delete twice a resource (once as part of a tracked
  297. # folder, and once as a resource), we delete the folders after all
  298. # other resource types.
  299. if "folder" in registry:
  300. _unlink_resources(registry["folder"], "folder")
  301. if verbose:
  302. util.debug("resource tracker shut down")
  303. #
  304. # Start a program with only specified fds kept open
  305. #
  306. def spawnv_passfds(path, args, passfds):
  307. passfds = sorted(passfds)
  308. if sys.platform != "win32":
  309. errpipe_read, errpipe_write = os.pipe()
  310. try:
  311. from .reduction import _mk_inheritable
  312. _pass = []
  313. for fd in passfds:
  314. _pass += [_mk_inheritable(fd)]
  315. from .fork_exec import fork_exec
  316. return fork_exec(args, _pass)
  317. finally:
  318. os.close(errpipe_read)
  319. os.close(errpipe_write)
  320. else:
  321. cmd = ' '.join('"%s"' % x for x in args)
  322. try:
  323. hp, ht, pid, tid = _winapi.CreateProcess(
  324. path, cmd, None, None, True, 0, None, None, None)
  325. _winapi.CloseHandle(ht)
  326. except BaseException:
  327. pass
  328. return pid