process_executor.py 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181
  1. ###############################################################################
  2. # Re-implementation of the ProcessPoolExecutor more robust to faults
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from concurrent/futures/process_pool_executor.py (17/02/2017)
  7. # * Backport for python2.7/3.3,
  8. # * Add an extra management thread to detect executor_manager_thread failures,
  9. # * Improve the shutdown process to avoid deadlocks,
  10. # * Add timeout for workers,
  11. # * More robust pickling process.
  12. #
  13. # Copyright 2009 Brian Quinlan. All Rights Reserved.
  14. # Licensed to PSF under a Contributor Agreement.
  15. """Implements ProcessPoolExecutor.
  16. The follow diagram and text describe the data-flow through the system:
  17. |======================= In-process =====================|== Out-of-process ==|
  18. +----------+ +----------+ +--------+ +-----------+ +---------+
  19. | | => | Work Ids | | | | Call Q | | Process |
  20. | | +----------+ | | +-----------+ | Pool |
  21. | | | ... | | | | ... | +---------+
  22. | | | 6 | => | | => | 5, call() | => | |
  23. | | | 7 | | | | ... | | |
  24. | Process | | ... | | Local | +-----------+ | Process |
  25. | Pool | +----------+ | Worker | | #1..n |
  26. | Executor | | Thread | | |
  27. | | +----------- + | | +-----------+ | |
  28. | | <=> | Work Items | <=> | | <= | Result Q | <= | |
  29. | | +------------+ | | +-----------+ | |
  30. | | | 6: call() | | | | ... | | |
  31. | | | future | +--------+ | 4, result | | |
  32. | | | ... | | 3, except | | |
  33. +----------+ +------------+ +-----------+ +---------+
  34. Executor.submit() called:
  35. - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
  36. - adds the id of the _WorkItem to the "Work Ids" queue
  37. Local worker thread:
  38. - reads work ids from the "Work Ids" queue and looks up the corresponding
  39. WorkItem from the "Work Items" dict: if the work item has been cancelled then
  40. it is simply removed from the dict, otherwise it is repackaged as a
  41. _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
  42. until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
  43. calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
  44. - reads _ResultItems from "Result Q", updates the future stored in the
  45. "Work Items" dict and deletes the dict entry
  46. Process #1..n:
  47. - reads _CallItems from "Call Q", executes the calls, and puts the resulting
  48. _ResultItems in "Result Q"
  49. """
  50. __author__ = 'Thomas Moreau (thomas.moreau.2010@gmail.com)'
  51. import os
  52. import gc
  53. import sys
  54. import struct
  55. import weakref
  56. import warnings
  57. import itertools
  58. import traceback
  59. import threading
  60. from time import time
  61. import multiprocessing as mp
  62. from functools import partial
  63. from pickle import PicklingError
  64. from . import _base
  65. from .backend import get_context
  66. from .backend.compat import queue
  67. from .backend.compat import wait
  68. from .backend.compat import set_cause
  69. from .backend.context import cpu_count
  70. from .backend.queues import Queue, SimpleQueue
  71. from .backend.reduction import set_loky_pickler, get_loky_pickler_name
  72. from .backend.utils import recursive_terminate, get_exitcodes_terminated_worker
  73. try:
  74. from concurrent.futures.process import BrokenProcessPool as _BPPException
  75. except ImportError:
  76. _BPPException = RuntimeError
  77. # Compatibility for python2.7
  78. if sys.version_info[0] == 2:
  79. ProcessLookupError = OSError
  80. # Mechanism to prevent infinite process spawning. When a worker of a
  81. # ProcessPoolExecutor nested in MAX_DEPTH Executor tries to create a new
  82. # Executor, a LokyRecursionError is raised
  83. MAX_DEPTH = int(os.environ.get("LOKY_MAX_DEPTH", 10))
  84. _CURRENT_DEPTH = 0
  85. # Minimum time interval between two consecutive memory leak protection checks.
  86. _MEMORY_LEAK_CHECK_DELAY = 1.
  87. # Number of bytes of memory usage allowed over the reference process size.
  88. _MAX_MEMORY_LEAK_SIZE = int(3e8)
  89. try:
  90. from psutil import Process
  91. _USE_PSUTIL = True
  92. def _get_memory_usage(pid, force_gc=False):
  93. if force_gc:
  94. gc.collect()
  95. return Process(pid).memory_info().rss
  96. except ImportError:
  97. _USE_PSUTIL = False
  98. class _ThreadWakeup:
  99. def __init__(self):
  100. self._closed = False
  101. self._reader, self._writer = mp.Pipe(duplex=False)
  102. def close(self):
  103. if not self._closed:
  104. self._closed = True
  105. self._writer.close()
  106. self._reader.close()
  107. def wakeup(self):
  108. if not self._closed:
  109. if sys.platform == "win32" and sys.version_info[:2] < (3, 4):
  110. # Compat for python2.7 on windows, where poll return false for
  111. # b"" messages. Use the slightly larger message b"0".
  112. self._writer.send_bytes(b"0")
  113. else:
  114. self._writer.send_bytes(b"")
  115. def clear(self):
  116. if not self._closed:
  117. while self._reader.poll():
  118. self._reader.recv_bytes()
  119. class _ExecutorFlags(object):
  120. """necessary references to maintain executor states without preventing gc
  121. It permits to keep the information needed by executor_manager_thread
  122. and crash_detection_thread to maintain the pool without preventing the
  123. garbage collection of unreferenced executors.
  124. """
  125. def __init__(self, shutdown_lock):
  126. self.shutdown = False
  127. self.broken = None
  128. self.kill_workers = False
  129. self.shutdown_lock = shutdown_lock
  130. def flag_as_shutting_down(self, kill_workers=None):
  131. with self.shutdown_lock:
  132. self.shutdown = True
  133. if kill_workers is not None:
  134. self.kill_workers = kill_workers
  135. def flag_as_broken(self, broken):
  136. with self.shutdown_lock:
  137. self.shutdown = True
  138. self.broken = broken
  139. # Prior to 3.9, executor_manager_thread is created as daemon thread. This means
  140. # that it is not joined automatically when the interpreter is shutting down.
  141. # To work around this problem, an exit handler is installed to tell the
  142. # thread to exit when the interpreter is shutting down and then waits until
  143. # it finishes. The thread needs to be daemonized because the atexit hooks are
  144. # called after all non daemonized threads are joined.
  145. #
  146. # Starting 3.9, there exists a specific atexit hook to be called before joining
  147. # the threads so the executor_manager_thread does not need to be daemonized
  148. # anymore.
  149. #
  150. # The atexit hooks are registered when starting the first ProcessPoolExecutor
  151. # to avoid import having an effect on the interpreter.
  152. _threads_wakeups = weakref.WeakKeyDictionary()
  153. _global_shutdown = False
  154. def _python_exit():
  155. global _global_shutdown
  156. _global_shutdown = True
  157. items = list(_threads_wakeups.items())
  158. mp.util.debug("Interpreter shutting down. Waking up "
  159. "executor_manager_thread {}".format(items))
  160. for _, (shutdown_lock, thread_wakeup) in items:
  161. with shutdown_lock:
  162. thread_wakeup.wakeup()
  163. for thread, _ in items:
  164. thread.join()
  165. # With the fork context, _thread_wakeups is propagated to children.
  166. # Clear it after fork to avoid some situation that can cause some
  167. # freeze when joining the workers.
  168. mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear())
  169. # Module variable to register the at_exit call
  170. process_pool_executor_at_exit = None
  171. # Controls how many more calls than processes will be queued in the call queue.
  172. # A smaller number will mean that processes spend more time idle waiting for
  173. # work while a larger number will make Future.cancel() succeed less frequently
  174. # (Futures in the call queue cannot be cancelled).
  175. EXTRA_QUEUED_CALLS = 1
  176. class _RemoteTraceback(Exception):
  177. """Embed stringification of remote traceback in local traceback
  178. """
  179. def __init__(self, tb=None):
  180. self.tb = '\n"""\n{}"""'.format(tb)
  181. def __str__(self):
  182. return self.tb
  183. class _ExceptionWithTraceback(BaseException):
  184. def __init__(self, exc):
  185. tb = getattr(exc, "__traceback__", None)
  186. if tb is None:
  187. _, _, tb = sys.exc_info()
  188. tb = traceback.format_exception(type(exc), exc, tb)
  189. tb = ''.join(tb)
  190. self.exc = exc
  191. self.tb = tb
  192. def __reduce__(self):
  193. return _rebuild_exc, (self.exc, self.tb)
  194. def _rebuild_exc(exc, tb):
  195. exc = set_cause(exc, _RemoteTraceback(tb))
  196. return exc
  197. class _WorkItem(object):
  198. __slots__ = ["future", "fn", "args", "kwargs"]
  199. def __init__(self, future, fn, args, kwargs):
  200. self.future = future
  201. self.fn = fn
  202. self.args = args
  203. self.kwargs = kwargs
  204. class _ResultItem(object):
  205. def __init__(self, work_id, exception=None, result=None):
  206. self.work_id = work_id
  207. self.exception = exception
  208. self.result = result
  209. class _CallItem(object):
  210. def __init__(self, work_id, fn, args, kwargs):
  211. self.work_id = work_id
  212. self.fn = fn
  213. self.args = args
  214. self.kwargs = kwargs
  215. # Store the current loky_pickler so it is correctly set in the worker
  216. self.loky_pickler = get_loky_pickler_name()
  217. def __call__(self):
  218. set_loky_pickler(self.loky_pickler)
  219. return self.fn(*self.args, **self.kwargs)
  220. def __repr__(self):
  221. return "CallItem({}, {}, {}, {})".format(
  222. self.work_id, self.fn, self.args, self.kwargs)
  223. class _SafeQueue(Queue):
  224. """Safe Queue set exception to the future object linked to a job"""
  225. def __init__(self, max_size=0, ctx=None, pending_work_items=None,
  226. running_work_items=None, thread_wakeup=None, reducers=None):
  227. self.thread_wakeup = thread_wakeup
  228. self.pending_work_items = pending_work_items
  229. self.running_work_items = running_work_items
  230. super(_SafeQueue, self).__init__(max_size, reducers=reducers, ctx=ctx)
  231. def _on_queue_feeder_error(self, e, obj):
  232. if isinstance(obj, _CallItem):
  233. # format traceback only works on python3
  234. if isinstance(e, struct.error):
  235. raised_error = RuntimeError(
  236. "The task could not be sent to the workers as it is too "
  237. "large for `send_bytes`.")
  238. else:
  239. raised_error = PicklingError(
  240. "Could not pickle the task to send it to the workers.")
  241. tb = traceback.format_exception(
  242. type(e), e, getattr(e, "__traceback__", None))
  243. raised_error = set_cause(raised_error,
  244. _RemoteTraceback(''.join(tb)))
  245. work_item = self.pending_work_items.pop(obj.work_id, None)
  246. self.running_work_items.remove(obj.work_id)
  247. # work_item can be None if another process terminated. In this
  248. # case, the executor_manager_thread fails all work_items with
  249. # BrokenProcessPool
  250. if work_item is not None:
  251. work_item.future.set_exception(raised_error)
  252. del work_item
  253. self.thread_wakeup.wakeup()
  254. else:
  255. super(_SafeQueue, self)._on_queue_feeder_error(e, obj)
  256. def _get_chunks(chunksize, *iterables):
  257. """Iterates over zip()ed iterables in chunks. """
  258. if sys.version_info < (3, 3):
  259. it = itertools.izip(*iterables)
  260. else:
  261. it = zip(*iterables)
  262. while True:
  263. chunk = tuple(itertools.islice(it, chunksize))
  264. if not chunk:
  265. return
  266. yield chunk
  267. def _process_chunk(fn, chunk):
  268. """Processes a chunk of an iterable passed to map.
  269. Runs the function passed to map() on a chunk of the
  270. iterable passed to map.
  271. This function is run in a separate process.
  272. """
  273. return [fn(*args) for args in chunk]
  274. def _sendback_result(result_queue, work_id, result=None, exception=None):
  275. """Safely send back the given result or exception"""
  276. try:
  277. result_queue.put(_ResultItem(work_id, result=result,
  278. exception=exception))
  279. except BaseException as e:
  280. exc = _ExceptionWithTraceback(e)
  281. result_queue.put(_ResultItem(work_id, exception=exc))
  282. def _process_worker(call_queue, result_queue, initializer, initargs,
  283. processes_management_lock, timeout, worker_exit_lock,
  284. current_depth):
  285. """Evaluates calls from call_queue and places the results in result_queue.
  286. This worker is run in a separate process.
  287. Args:
  288. call_queue: A ctx.Queue of _CallItems that will be read and
  289. evaluated by the worker.
  290. result_queue: A ctx.Queue of _ResultItems that will written
  291. to by the worker.
  292. initializer: A callable initializer, or None
  293. initargs: A tuple of args for the initializer
  294. process_management_lock: A ctx.Lock avoiding worker timeout while some
  295. workers are being spawned.
  296. timeout: maximum time to wait for a new item in the call_queue. If that
  297. time is expired, the worker will shutdown.
  298. worker_exit_lock: Lock to avoid flagging the executor as broken on
  299. workers timeout.
  300. current_depth: Nested parallelism level, to avoid infinite spawning.
  301. """
  302. if initializer is not None:
  303. try:
  304. initializer(*initargs)
  305. except BaseException:
  306. _base.LOGGER.critical('Exception in initializer:', exc_info=True)
  307. # The parent will notice that the process stopped and
  308. # mark the pool broken
  309. return
  310. # set the global _CURRENT_DEPTH mechanism to limit recursive call
  311. global _CURRENT_DEPTH
  312. _CURRENT_DEPTH = current_depth
  313. _process_reference_size = None
  314. _last_memory_leak_check = None
  315. pid = os.getpid()
  316. mp.util.debug('Worker started with timeout=%s' % timeout)
  317. while True:
  318. try:
  319. call_item = call_queue.get(block=True, timeout=timeout)
  320. if call_item is None:
  321. mp.util.info("Shutting down worker on sentinel")
  322. except queue.Empty:
  323. mp.util.info("Shutting down worker after timeout %0.3fs"
  324. % timeout)
  325. if processes_management_lock.acquire(block=False):
  326. processes_management_lock.release()
  327. call_item = None
  328. else:
  329. mp.util.info("Could not acquire processes_management_lock")
  330. continue
  331. except BaseException:
  332. previous_tb = traceback.format_exc()
  333. try:
  334. result_queue.put(_RemoteTraceback(previous_tb))
  335. except BaseException:
  336. # If we cannot format correctly the exception, at least print
  337. # the traceback.
  338. print(previous_tb)
  339. sys.exit(1)
  340. if call_item is None:
  341. # Notify queue management thread about clean worker shutdown
  342. result_queue.put(pid)
  343. with worker_exit_lock:
  344. return
  345. try:
  346. r = call_item()
  347. except BaseException as e:
  348. exc = _ExceptionWithTraceback(e)
  349. result_queue.put(_ResultItem(call_item.work_id, exception=exc))
  350. else:
  351. _sendback_result(result_queue, call_item.work_id, result=r)
  352. del r
  353. # Free the resource as soon as possible, to avoid holding onto
  354. # open files or shared memory that is not needed anymore
  355. del call_item
  356. if _USE_PSUTIL:
  357. if _process_reference_size is None:
  358. # Make reference measurement after the first call
  359. _process_reference_size = _get_memory_usage(pid, force_gc=True)
  360. _last_memory_leak_check = time()
  361. continue
  362. if time() - _last_memory_leak_check > _MEMORY_LEAK_CHECK_DELAY:
  363. mem_usage = _get_memory_usage(pid)
  364. _last_memory_leak_check = time()
  365. if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE:
  366. # Memory usage stays within bounds: everything is fine.
  367. continue
  368. # Check again memory usage; this time take the measurement
  369. # after a forced garbage collection to break any reference
  370. # cycles.
  371. mem_usage = _get_memory_usage(pid, force_gc=True)
  372. _last_memory_leak_check = time()
  373. if mem_usage - _process_reference_size < _MAX_MEMORY_LEAK_SIZE:
  374. # The GC managed to free the memory: everything is fine.
  375. continue
  376. # The process is leaking memory: let the master process
  377. # know that we need to start a new worker.
  378. mp.util.info("Memory leak detected: shutting down worker")
  379. result_queue.put(pid)
  380. with worker_exit_lock:
  381. return
  382. else:
  383. # if psutil is not installed, trigger gc.collect events
  384. # regularly to limit potential memory leaks due to reference cycles
  385. if ((_last_memory_leak_check is None) or
  386. (time() - _last_memory_leak_check >
  387. _MEMORY_LEAK_CHECK_DELAY)):
  388. gc.collect()
  389. _last_memory_leak_check = time()
  390. class _ExecutorManagerThread(threading.Thread):
  391. """Manages the communication between this process and the worker processes.
  392. The manager is run in a local thread.
  393. Args:
  394. executor: A reference to the ProcessPoolExecutor that owns
  395. this thread. A weakref will be own by the manager as well as
  396. references to internal objects used to introspect the state of
  397. the executor.
  398. """
  399. def __init__(self, executor):
  400. # Store references to necessary internals of the executor.
  401. # A _ThreadWakeup to allow waking up the executor_manager_thread from
  402. # the main Thread and avoid deadlocks caused by permanently
  403. # locked queues.
  404. self.thread_wakeup = executor._executor_manager_thread_wakeup
  405. self.shutdown_lock = executor._shutdown_lock
  406. # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
  407. # to determine if the ProcessPoolExecutor has been garbage collected
  408. # and that the manager can exit.
  409. # When the executor gets garbage collected, the weakref callback
  410. # will wake up the queue management thread so that it can terminate
  411. # if there is no pending work item.
  412. def weakref_cb(_,
  413. thread_wakeup=self.thread_wakeup,
  414. shutdown_lock=self.shutdown_lock):
  415. mp.util.debug('Executor collected: triggering callback for'
  416. ' QueueManager wakeup')
  417. with shutdown_lock:
  418. thread_wakeup.wakeup()
  419. self.executor_reference = weakref.ref(executor, weakref_cb)
  420. # The flags of the executor
  421. self.executor_flags = executor._flags
  422. # A list of the ctx.Process instances used as workers.
  423. self.processes = executor._processes
  424. # A ctx.Queue that will be filled with _CallItems derived from
  425. # _WorkItems for processing by the process workers.
  426. self.call_queue = executor._call_queue
  427. # A ctx.SimpleQueue of _ResultItems generated by the process workers.
  428. self.result_queue = executor._result_queue
  429. # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
  430. self.work_ids_queue = executor._work_ids
  431. # A dict mapping work ids to _WorkItems e.g.
  432. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
  433. self.pending_work_items = executor._pending_work_items
  434. # A list of the work_ids that are currently running
  435. self.running_work_items = executor._running_work_items
  436. # A lock to avoid concurrent shutdown of workers on timeout and spawn
  437. # of new processes or shut down
  438. self.processes_management_lock = executor._processes_management_lock
  439. super(_ExecutorManagerThread, self).__init__()
  440. if sys.version_info < (3, 9):
  441. self.daemon = True
  442. def run(self):
  443. # Main loop for the executor manager thread.
  444. while True:
  445. self.add_call_item_to_queue()
  446. result_item, is_broken, bpe = self.wait_result_broken_or_wakeup()
  447. if is_broken:
  448. self.terminate_broken(bpe)
  449. return
  450. if result_item is not None:
  451. self.process_result_item(result_item)
  452. # Delete reference to result_item to avoid keeping references
  453. # while waiting on new results.
  454. del result_item
  455. if self.is_shutting_down():
  456. self.flag_executor_shutting_down()
  457. # Since no new work items can be added, it is safe to shutdown
  458. # this thread if there are no pending work items.
  459. if not self.pending_work_items:
  460. self.join_executor_internals()
  461. return
  462. def add_call_item_to_queue(self):
  463. # Fills call_queue with _WorkItems from pending_work_items.
  464. # This function never blocks.
  465. while True:
  466. if self.call_queue.full():
  467. return
  468. try:
  469. work_id = self.work_ids_queue.get(block=False)
  470. except queue.Empty:
  471. return
  472. else:
  473. work_item = self.pending_work_items[work_id]
  474. if work_item.future.set_running_or_notify_cancel():
  475. self.running_work_items += [work_id]
  476. self.call_queue.put(_CallItem(work_id,
  477. work_item.fn,
  478. work_item.args,
  479. work_item.kwargs),
  480. block=True)
  481. else:
  482. del self.pending_work_items[work_id]
  483. continue
  484. def wait_result_broken_or_wakeup(self):
  485. # Wait for a result to be ready in the result_queue while checking
  486. # that all worker processes are still running, or for a wake up
  487. # signal send. The wake up signals come either from new tasks being
  488. # submitted, from the executor being shutdown/gc-ed, or from the
  489. # shutdown of the python interpreter.
  490. result_reader = self.result_queue._reader
  491. wakeup_reader = self.thread_wakeup._reader
  492. readers = [result_reader, wakeup_reader]
  493. worker_sentinels = [p.sentinel for p in self.processes.values()]
  494. ready = wait(readers + worker_sentinels)
  495. bpe = None
  496. is_broken = True
  497. result_item = None
  498. if result_reader in ready:
  499. try:
  500. result_item = result_reader.recv()
  501. if isinstance(result_item, _RemoteTraceback):
  502. bpe = BrokenProcessPool(
  503. "A task has failed to un-serialize. Please ensure that"
  504. " the arguments of the function are all picklable."
  505. )
  506. set_cause(bpe, result_item)
  507. else:
  508. is_broken = False
  509. except BaseException as e:
  510. bpe = BrokenProcessPool(
  511. "A result has failed to un-serialize. Please ensure that "
  512. "the objects returned by the function are always "
  513. "picklable."
  514. )
  515. tb = traceback.format_exception(
  516. type(e), e, getattr(e, "__traceback__", None))
  517. set_cause(bpe, _RemoteTraceback(''.join(tb)))
  518. elif wakeup_reader in ready:
  519. # This is simply a wake-up event that might either trigger putting
  520. # more tasks in the queue or trigger the clean up of resources.
  521. is_broken = False
  522. else:
  523. # A worker has terminated and we don't know why, set the state of
  524. # the executor as broken
  525. exit_codes = ''
  526. if sys.platform != "win32":
  527. # In Windows, introspecting terminated workers exitcodes seems
  528. # unstable, therefore they are not appended in the exception
  529. # message.
  530. exit_codes = "\nThe exit codes of the workers are {}".format(
  531. get_exitcodes_terminated_worker(self.processes))
  532. bpe = TerminatedWorkerError(
  533. "A worker process managed by the executor was unexpectedly "
  534. "terminated. This could be caused by a segmentation fault "
  535. "while calling the function or by an excessive memory usage "
  536. "causing the Operating System to kill the worker.\n"
  537. "{}".format(exit_codes)
  538. )
  539. self.thread_wakeup.clear()
  540. return result_item, is_broken, bpe
  541. def process_result_item(self, result_item):
  542. # Process the received a result_item. This can be either the PID of a
  543. # worker that exited gracefully or a _ResultItem
  544. if isinstance(result_item, int):
  545. # Clean shutdown of a worker using its PID, either on request
  546. # by the executor.shutdown method or by the timeout of the worker
  547. # itself: we should not mark the executor as broken.
  548. with self.processes_management_lock:
  549. p = self.processes.pop(result_item, None)
  550. # p can be None is the executor is concurrently shutting down.
  551. if p is not None:
  552. p._worker_exit_lock.release()
  553. p.join()
  554. del p
  555. # Make sure the executor have the right number of worker, even if a
  556. # worker timeout while some jobs were submitted. If some work is
  557. # pending or there is less processes than running items, we need to
  558. # start a new Process and raise a warning.
  559. n_pending = len(self.pending_work_items)
  560. n_running = len(self.running_work_items)
  561. if (n_pending - n_running > 0 or n_running > len(self.processes)):
  562. executor = self.executor_reference()
  563. if (executor is not None
  564. and len(self.processes) < executor._max_workers):
  565. warnings.warn(
  566. "A worker stopped while some jobs were given to the "
  567. "executor. This can be caused by a too short worker "
  568. "timeout or by a memory leak.", UserWarning
  569. )
  570. executor._adjust_process_count()
  571. executor = None
  572. else:
  573. # Received a _ResultItem so mark the future as completed.
  574. work_item = self.pending_work_items.pop(result_item.work_id, None)
  575. # work_item can be None if another process terminated (see above)
  576. if work_item is not None:
  577. if result_item.exception:
  578. work_item.future.set_exception(result_item.exception)
  579. else:
  580. work_item.future.set_result(result_item.result)
  581. self.running_work_items.remove(result_item.work_id)
  582. def is_shutting_down(self):
  583. # Check whether we should start shutting down the executor.
  584. executor = self.executor_reference()
  585. # No more work items can be added if:
  586. # - The interpreter is shutting down OR
  587. # - The executor that owns this thread is not broken AND
  588. # * The executor that owns this worker has been collected OR
  589. # * The executor that owns this worker has been shutdown.
  590. # If the executor is broken, it should be detected in the next loop.
  591. return (_global_shutdown or
  592. ((executor is None or self.executor_flags.shutdown)
  593. and not self.executor_flags.broken))
  594. def terminate_broken(self, bpe):
  595. # Terminate the executor because it is in a broken state. The bpe
  596. # argument can be used to display more information on the error that
  597. # lead the executor into becoming broken.
  598. # Mark the process pool broken so that submits fail right now.
  599. self.executor_flags.flag_as_broken(bpe)
  600. # Mark pending tasks as failed.
  601. for work_id, work_item in self.pending_work_items.items():
  602. work_item.future.set_exception(bpe)
  603. # Delete references to object. See issue16284
  604. del work_item
  605. self.pending_work_items.clear()
  606. # Terminate remaining workers forcibly: the queues or their
  607. # locks may be in a dirty state and block forever.
  608. self.kill_workers()
  609. # clean up resources
  610. self.join_executor_internals()
  611. def flag_executor_shutting_down(self):
  612. # Flag the executor as shutting down and cancel remaining tasks if
  613. # requested as early as possible if it is not gc-ed yet.
  614. self.executor_flags.flag_as_shutting_down()
  615. # Cancel pending work items if requested.
  616. if self.executor_flags.kill_workers:
  617. while self.pending_work_items:
  618. _, work_item = self.pending_work_items.popitem()
  619. work_item.future.set_exception(ShutdownExecutorError(
  620. "The Executor was shutdown with `kill_workers=True` "
  621. "before this job could complete."))
  622. del work_item
  623. # Kill the remaining worker forcibly to no waste time joining them
  624. self.kill_workers()
  625. def kill_workers(self):
  626. # Terminate the remaining workers using SIGKILL. This function also
  627. # terminates descendant workers of the children in case there is some
  628. # nested parallelism.
  629. while self.processes:
  630. _, p = self.processes.popitem()
  631. mp.util.debug('terminate process {}'.format(p.name))
  632. try:
  633. recursive_terminate(p)
  634. except ProcessLookupError: # pragma: no cover
  635. pass
  636. def shutdown_workers(self):
  637. # shutdown all workers in self.processes
  638. # Create a list to avoid RuntimeError due to concurrent modification of
  639. # processes. nb_children_alive is thus an upper bound. Also release the
  640. # processes' _worker_exit_lock to accelerate the shutdown procedure, as
  641. # there is no need for hand-shake here.
  642. with self.processes_management_lock:
  643. n_children_to_stop = 0
  644. for p in list(self.processes.values()):
  645. p._worker_exit_lock.release()
  646. n_children_to_stop += 1
  647. # Send the right number of sentinels, to make sure all children are
  648. # properly terminated. Do it with a mechanism that avoid hanging on
  649. # Full queue when all workers have already been shutdown.
  650. n_sentinels_sent = 0
  651. while (n_sentinels_sent < n_children_to_stop
  652. and self.get_n_children_alive() > 0):
  653. for i in range(n_children_to_stop - n_sentinels_sent):
  654. try:
  655. self.call_queue.put_nowait(None)
  656. n_sentinels_sent += 1
  657. except queue.Full:
  658. break
  659. def join_executor_internals(self):
  660. self.shutdown_workers()
  661. # Release the queue's resources as soon as possible. Flag the feeder
  662. # thread for clean exit to avoid having the crash detection thread flag
  663. # the Executor as broken during the shutdown. This is safe as either:
  664. # * We don't need to communicate with the workers anymore
  665. # * There is nothing left in the Queue buffer except None sentinels
  666. mp.util.debug("closing call_queue")
  667. self.call_queue.close()
  668. self.call_queue.join_thread()
  669. # Closing result_queue
  670. mp.util.debug("closing result_queue")
  671. self.result_queue.close()
  672. mp.util.debug("closing thread_wakeup")
  673. with self.shutdown_lock:
  674. self.thread_wakeup.close()
  675. # If .join() is not called on the created processes then
  676. # some ctx.Queue methods may deadlock on Mac OS X.
  677. mp.util.debug("joining processes")
  678. for p in self.processes.values():
  679. p.join()
  680. mp.util.debug("executor management thread clean shutdown of worker "
  681. "processes: {}".format(list(self.processes)))
  682. def get_n_children_alive(self):
  683. # This is an upper bound on the number of children alive.
  684. with self.processes_management_lock:
  685. return sum(p.is_alive() for p in list(self.processes.values()))
  686. _system_limits_checked = False
  687. _system_limited = None
  688. def _check_system_limits():
  689. global _system_limits_checked, _system_limited
  690. if _system_limits_checked:
  691. if _system_limited:
  692. raise NotImplementedError(_system_limited)
  693. _system_limits_checked = True
  694. try:
  695. nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
  696. except (AttributeError, ValueError):
  697. # sysconf not available or setting not available
  698. return
  699. if nsems_max == -1:
  700. # undetermined limit, assume that limit is determined
  701. # by available memory only
  702. return
  703. if nsems_max >= 256:
  704. # minimum number of semaphores available
  705. # according to POSIX
  706. return
  707. _system_limited = ("system provides too few semaphores (%d available, "
  708. "256 necessary)" % nsems_max)
  709. raise NotImplementedError(_system_limited)
  710. def _chain_from_iterable_of_lists(iterable):
  711. """
  712. Specialized implementation of itertools.chain.from_iterable.
  713. Each item in *iterable* should be a list. This function is
  714. careful not to keep references to yielded objects.
  715. """
  716. for element in iterable:
  717. element.reverse()
  718. while element:
  719. yield element.pop()
  720. def _check_max_depth(context):
  721. # Limit the maxmal recursion level
  722. global _CURRENT_DEPTH
  723. if context.get_start_method() == "fork" and _CURRENT_DEPTH > 0:
  724. raise LokyRecursionError(
  725. "Could not spawn extra nested processes at depth superior to "
  726. "MAX_DEPTH=1. It is not possible to increase this limit when "
  727. "using the 'fork' start method.")
  728. if 0 < MAX_DEPTH and _CURRENT_DEPTH + 1 > MAX_DEPTH:
  729. raise LokyRecursionError(
  730. "Could not spawn extra nested processes at depth superior to "
  731. "MAX_DEPTH={}. If this is intendend, you can change this limit "
  732. "with the LOKY_MAX_DEPTH environment variable.".format(MAX_DEPTH))
  733. class LokyRecursionError(RuntimeError):
  734. """Raised when a process try to spawn too many levels of nested processes.
  735. """
  736. class BrokenProcessPool(_BPPException):
  737. """
  738. Raised when the executor is broken while a future was in the running state.
  739. The cause can an error raised when unpickling the task in the worker
  740. process or when unpickling the result value in the parent process. It can
  741. also be caused by a worker process being terminated unexpectedly.
  742. """
  743. class TerminatedWorkerError(BrokenProcessPool):
  744. """
  745. Raised when a process in a ProcessPoolExecutor terminated abruptly
  746. while a future was in the running state.
  747. """
  748. # Alias for backward compat (for code written for loky 1.1.4 and earlier). Do
  749. # not use in new code.
  750. BrokenExecutor = BrokenProcessPool
  751. class ShutdownExecutorError(RuntimeError):
  752. """
  753. Raised when a ProcessPoolExecutor is shutdown while a future was in the
  754. running or pending state.
  755. """
  756. class ProcessPoolExecutor(_base.Executor):
  757. _at_exit = None
  758. def __init__(self, max_workers=None, job_reducers=None,
  759. result_reducers=None, timeout=None, context=None,
  760. initializer=None, initargs=(), env=None):
  761. """Initializes a new ProcessPoolExecutor instance.
  762. Args:
  763. max_workers: int, optional (default: cpu_count())
  764. The maximum number of processes that can be used to execute the
  765. given calls. If None or not given then as many worker processes
  766. will be created as the number of CPUs the current process
  767. can use.
  768. job_reducers, result_reducers: dict(type: reducer_func)
  769. Custom reducer for pickling the jobs and the results from the
  770. Executor. If only `job_reducers` is provided, `result_reducer`
  771. will use the same reducers
  772. timeout: int, optional (default: None)
  773. Idle workers exit after timeout seconds. If a new job is
  774. submitted after the timeout, the executor will start enough
  775. new Python processes to make sure the pool of workers is full.
  776. context: A multiprocessing context to launch the workers. This
  777. object should provide SimpleQueue, Queue and Process.
  778. initializer: An callable used to initialize worker processes.
  779. initargs: A tuple of arguments to pass to the initializer.
  780. env: A dict of environment variable to overwrite in the child
  781. process. The environment variables are set before any module is
  782. loaded. Note that this only works with the loky context and it
  783. is unreliable under windows with Python < 3.6.
  784. """
  785. _check_system_limits()
  786. if max_workers is None:
  787. self._max_workers = cpu_count()
  788. else:
  789. if max_workers <= 0:
  790. raise ValueError("max_workers must be greater than 0")
  791. self._max_workers = max_workers
  792. if context is None:
  793. context = get_context()
  794. self._context = context
  795. self._env = env
  796. if initializer is not None and not callable(initializer):
  797. raise TypeError("initializer must be a callable")
  798. self._initializer = initializer
  799. self._initargs = initargs
  800. _check_max_depth(self._context)
  801. if result_reducers is None:
  802. result_reducers = job_reducers
  803. # Timeout
  804. self._timeout = timeout
  805. # Management thread
  806. self._executor_manager_thread = None
  807. # Map of pids to processes
  808. self._processes = {}
  809. # Internal variables of the ProcessPoolExecutor
  810. self._processes = {}
  811. self._queue_count = 0
  812. self._pending_work_items = {}
  813. self._running_work_items = []
  814. self._work_ids = queue.Queue()
  815. self._processes_management_lock = self._context.Lock()
  816. self._executor_manager_thread = None
  817. self._shutdown_lock = threading.Lock()
  818. # _ThreadWakeup is a communication channel used to interrupt the wait
  819. # of the main loop of executor_manager_thread from another thread (e.g.
  820. # when calling executor.submit or executor.shutdown). We do not use the
  821. # _result_queue to send wakeup signals to the executor_manager_thread
  822. # as it could result in a deadlock if a worker process dies with the
  823. # _result_queue write lock still acquired.
  824. #
  825. # _shutdown_lock must be locked to access _ThreadWakeup.wakeup.
  826. self._executor_manager_thread_wakeup = _ThreadWakeup()
  827. # Flag to hold the state of the Executor. This permits to introspect
  828. # the Executor state even once it has been garbage collected.
  829. self._flags = _ExecutorFlags(self._shutdown_lock)
  830. # Finally setup the queues for interprocess communication
  831. self._setup_queues(job_reducers, result_reducers)
  832. mp.util.debug('ProcessPoolExecutor is setup')
  833. def _setup_queues(self, job_reducers, result_reducers, queue_size=None):
  834. # Make the call queue slightly larger than the number of processes to
  835. # prevent the worker processes from idling. But don't make it too big
  836. # because futures in the call queue cannot be cancelled.
  837. if queue_size is None:
  838. queue_size = 2 * self._max_workers + EXTRA_QUEUED_CALLS
  839. self._call_queue = _SafeQueue(
  840. max_size=queue_size, pending_work_items=self._pending_work_items,
  841. running_work_items=self._running_work_items,
  842. thread_wakeup=self._executor_manager_thread_wakeup,
  843. reducers=job_reducers, ctx=self._context)
  844. # Killed worker processes can produce spurious "broken pipe"
  845. # tracebacks in the queue's own worker thread. But we detect killed
  846. # processes anyway, so silence the tracebacks.
  847. self._call_queue._ignore_epipe = True
  848. self._result_queue = SimpleQueue(reducers=result_reducers,
  849. ctx=self._context)
  850. def _start_executor_manager_thread(self):
  851. if self._executor_manager_thread is None:
  852. mp.util.debug('_start_executor_manager_thread called')
  853. # When the executor gets garbarge collected, the weakref callback
  854. # will wake up the queue management thread so that it can terminate
  855. # if there is no pending work item.
  856. def weakref_cb(
  857. _, thread_wakeup=self._executor_manager_thread_wakeup,
  858. shutdown_lock=self._shutdown_lock):
  859. mp.util.debug('Executor collected: triggering callback for'
  860. ' QueueManager wakeup')
  861. with self._shutdown_lock:
  862. thread_wakeup.wakeup()
  863. # Start the processes so that their sentinels are known.
  864. self._executor_manager_thread = _ExecutorManagerThread(self)
  865. self._executor_manager_thread.start()
  866. # register this executor in a mechanism that ensures it will wakeup
  867. # when the interpreter is exiting.
  868. _threads_wakeups[self._executor_manager_thread] = \
  869. (self._shutdown_lock,
  870. self._executor_manager_thread_wakeup)
  871. global process_pool_executor_at_exit
  872. if process_pool_executor_at_exit is None:
  873. # Ensure that the _python_exit function will be called before
  874. # the multiprocessing.Queue._close finalizers which have an
  875. # exitpriority of 10.
  876. if sys.version_info < (3, 9):
  877. process_pool_executor_at_exit = mp.util.Finalize(
  878. None, _python_exit, exitpriority=20)
  879. else:
  880. process_pool_executor_at_exit = threading._register_atexit(
  881. _python_exit)
  882. def _adjust_process_count(self):
  883. for _ in range(len(self._processes), self._max_workers):
  884. worker_exit_lock = self._context.BoundedSemaphore(1)
  885. args = (self._call_queue, self._result_queue, self._initializer,
  886. self._initargs, self._processes_management_lock,
  887. self._timeout, worker_exit_lock, _CURRENT_DEPTH + 1)
  888. worker_exit_lock.acquire()
  889. try:
  890. # Try to spawn the process with some environment variable to
  891. # overwrite but it only works with the loky context for now.
  892. p = self._context.Process(target=_process_worker, args=args,
  893. env=self._env)
  894. except TypeError:
  895. p = self._context.Process(target=_process_worker, args=args)
  896. p._worker_exit_lock = worker_exit_lock
  897. p.start()
  898. self._processes[p.pid] = p
  899. mp.util.debug('Adjust process count : {}'.format(self._processes))
  900. def _ensure_executor_running(self):
  901. """ensures all workers and management thread are running
  902. """
  903. with self._processes_management_lock:
  904. if len(self._processes) != self._max_workers:
  905. self._adjust_process_count()
  906. self._start_executor_manager_thread()
  907. def submit(self, fn, *args, **kwargs):
  908. with self._flags.shutdown_lock:
  909. if self._flags.broken is not None:
  910. raise self._flags.broken
  911. if self._flags.shutdown:
  912. raise ShutdownExecutorError(
  913. 'cannot schedule new futures after shutdown')
  914. # Cannot submit a new calls once the interpreter is shutting down.
  915. # This check avoids spawning new processes at exit.
  916. if _global_shutdown:
  917. raise RuntimeError('cannot schedule new futures after '
  918. 'interpreter shutdown')
  919. f = _base.Future()
  920. w = _WorkItem(f, fn, args, kwargs)
  921. self._pending_work_items[self._queue_count] = w
  922. self._work_ids.put(self._queue_count)
  923. self._queue_count += 1
  924. # Wake up queue management thread
  925. self._executor_manager_thread_wakeup.wakeup()
  926. self._ensure_executor_running()
  927. return f
  928. submit.__doc__ = _base.Executor.submit.__doc__
  929. def map(self, fn, *iterables, **kwargs):
  930. """Returns an iterator equivalent to map(fn, iter).
  931. Args:
  932. fn: A callable that will take as many arguments as there are
  933. passed iterables.
  934. timeout: The maximum number of seconds to wait. If None, then there
  935. is no limit on the wait time.
  936. chunksize: If greater than one, the iterables will be chopped into
  937. chunks of size chunksize and submitted to the process pool.
  938. If set to one, the items in the list will be sent one at a
  939. time.
  940. Returns:
  941. An iterator equivalent to: map(func, *iterables) but the calls may
  942. be evaluated out-of-order.
  943. Raises:
  944. TimeoutError: If the entire result iterator could not be generated
  945. before the given timeout.
  946. Exception: If fn(*args) raises for any values.
  947. """
  948. timeout = kwargs.get('timeout', None)
  949. chunksize = kwargs.get('chunksize', 1)
  950. if chunksize < 1:
  951. raise ValueError("chunksize must be >= 1.")
  952. results = super(ProcessPoolExecutor, self).map(
  953. partial(_process_chunk, fn), _get_chunks(chunksize, *iterables),
  954. timeout=timeout)
  955. return _chain_from_iterable_of_lists(results)
  956. def shutdown(self, wait=True, kill_workers=False):
  957. mp.util.debug('shutting down executor %s' % self)
  958. self._flags.flag_as_shutting_down(kill_workers)
  959. executor_manager_thread = self._executor_manager_thread
  960. executor_manager_thread_wakeup = self._executor_manager_thread_wakeup
  961. if executor_manager_thread_wakeup is not None:
  962. # Wake up queue management thread
  963. with self._shutdown_lock:
  964. self._executor_manager_thread_wakeup.wakeup()
  965. if executor_manager_thread is not None and wait:
  966. executor_manager_thread.join()
  967. # To reduce the risk of opening too many files, remove references to
  968. # objects that use file descriptors.
  969. self._executor_manager_thread = None
  970. self._executor_manager_thread_wakeup = None
  971. self._call_queue = None
  972. self._result_queue = None
  973. self._processes_management_lock = None
  974. shutdown.__doc__ = _base.Executor.shutdown.__doc__