reusable_executor.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. ###############################################################################
  2. # Reusable ProcessPoolExecutor
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. import time
  7. import warnings
  8. import threading
  9. import multiprocessing as mp
  10. from .process_executor import ProcessPoolExecutor, EXTRA_QUEUED_CALLS
  11. from .backend.context import cpu_count
  12. from .backend import get_context
  13. __all__ = ['get_reusable_executor']
  14. # Python 2 compat helper
  15. STRING_TYPE = type("")
  16. # Singleton executor and id management
  17. _executor_lock = threading.RLock()
  18. _next_executor_id = 0
  19. _executor = None
  20. _executor_kwargs = None
  21. def _get_next_executor_id():
  22. """Ensure that each successive executor instance has a unique, monotonic id.
  23. The purpose of this monotonic id is to help debug and test automated
  24. instance creation.
  25. """
  26. global _next_executor_id
  27. with _executor_lock:
  28. executor_id = _next_executor_id
  29. _next_executor_id += 1
  30. return executor_id
  31. def get_reusable_executor(max_workers=None, context=None, timeout=10,
  32. kill_workers=False, reuse="auto",
  33. job_reducers=None, result_reducers=None,
  34. initializer=None, initargs=(), env=None):
  35. """Return the current ReusableExectutor instance.
  36. Start a new instance if it has not been started already or if the previous
  37. instance was left in a broken state.
  38. If the previous instance does not have the requested number of workers, the
  39. executor is dynamically resized to adjust the number of workers prior to
  40. returning.
  41. Reusing a singleton instance spares the overhead of starting new worker
  42. processes and importing common python packages each time.
  43. ``max_workers`` controls the maximum number of tasks that can be running in
  44. parallel in worker processes. By default this is set to the number of
  45. CPUs on the host.
  46. Setting ``timeout`` (in seconds) makes idle workers automatically shutdown
  47. so as to release system resources. New workers are respawn upon submission
  48. of new tasks so that ``max_workers`` are available to accept the newly
  49. submitted tasks. Setting ``timeout`` to around 100 times the time required
  50. to spawn new processes and import packages in them (on the order of 100ms)
  51. ensures that the overhead of spawning workers is negligible.
  52. Setting ``kill_workers=True`` makes it possible to forcibly interrupt
  53. previously spawned jobs to get a new instance of the reusable executor
  54. with new constructor argument values.
  55. The ``job_reducers`` and ``result_reducers`` are used to customize the
  56. pickling of tasks and results send to the executor.
  57. When provided, the ``initializer`` is run first in newly spawned
  58. processes with argument ``initargs``.
  59. The environment variable in the child process are a copy of the values in
  60. the main process. One can provide a dict ``{ENV: VAL}`` where ``ENV`` and
  61. ``VAR`` are string literals to overwrite the environment variable ``ENV``
  62. in the child processes to value ``VAL``. The environment variables are set
  63. in the children before any module is loaded. This only works with with the
  64. ``loky`` context and it is unreliable on Windows with Python < 3.6.
  65. """
  66. _executor, _ = _ReusablePoolExecutor.get_reusable_executor(
  67. max_workers=max_workers, context=context, timeout=timeout,
  68. kill_workers=kill_workers, reuse=reuse, job_reducers=job_reducers,
  69. result_reducers=result_reducers, initializer=initializer,
  70. initargs=initargs, env=env
  71. )
  72. return _executor
  73. class _ReusablePoolExecutor(ProcessPoolExecutor):
  74. def __init__(self, submit_resize_lock, max_workers=None, context=None,
  75. timeout=None, executor_id=0, job_reducers=None,
  76. result_reducers=None, initializer=None, initargs=(),
  77. env=None):
  78. super(_ReusablePoolExecutor, self).__init__(
  79. max_workers=max_workers, context=context, timeout=timeout,
  80. job_reducers=job_reducers, result_reducers=result_reducers,
  81. initializer=initializer, initargs=initargs, env=env)
  82. self.executor_id = executor_id
  83. self._submit_resize_lock = submit_resize_lock
  84. @classmethod
  85. def get_reusable_executor(cls, max_workers=None, context=None, timeout=10,
  86. kill_workers=False, reuse="auto",
  87. job_reducers=None, result_reducers=None,
  88. initializer=None, initargs=(), env=None):
  89. with _executor_lock:
  90. global _executor, _executor_kwargs
  91. executor = _executor
  92. if max_workers is None:
  93. if reuse is True and executor is not None:
  94. max_workers = executor._max_workers
  95. else:
  96. max_workers = cpu_count()
  97. elif max_workers <= 0:
  98. raise ValueError(
  99. "max_workers must be greater than 0, got {}."
  100. .format(max_workers))
  101. if isinstance(context, STRING_TYPE):
  102. context = get_context(context)
  103. if context is not None and context.get_start_method() == "fork":
  104. raise ValueError(
  105. "Cannot use reusable executor with the 'fork' context"
  106. )
  107. kwargs = dict(context=context, timeout=timeout,
  108. job_reducers=job_reducers,
  109. result_reducers=result_reducers,
  110. initializer=initializer, initargs=initargs,
  111. env=env)
  112. if executor is None:
  113. is_reused = False
  114. mp.util.debug("Create a executor with max_workers={}."
  115. .format(max_workers))
  116. executor_id = _get_next_executor_id()
  117. _executor_kwargs = kwargs
  118. _executor = executor = cls(
  119. _executor_lock, max_workers=max_workers,
  120. executor_id=executor_id, **kwargs)
  121. else:
  122. if reuse == 'auto':
  123. reuse = kwargs == _executor_kwargs
  124. if (executor._flags.broken or executor._flags.shutdown
  125. or not reuse):
  126. if executor._flags.broken:
  127. reason = "broken"
  128. elif executor._flags.shutdown:
  129. reason = "shutdown"
  130. else:
  131. reason = "arguments have changed"
  132. mp.util.debug(
  133. "Creating a new executor with max_workers={} as the "
  134. "previous instance cannot be reused ({})."
  135. .format(max_workers, reason))
  136. executor.shutdown(wait=True, kill_workers=kill_workers)
  137. _executor = executor = _executor_kwargs = None
  138. # Recursive call to build a new instance
  139. return cls.get_reusable_executor(max_workers=max_workers,
  140. **kwargs)
  141. else:
  142. mp.util.debug(
  143. "Reusing existing executor with max_workers={}."
  144. .format(executor._max_workers)
  145. )
  146. is_reused = True
  147. executor._resize(max_workers)
  148. return executor, is_reused
  149. def submit(self, fn, *args, **kwargs):
  150. with self._submit_resize_lock:
  151. return super(_ReusablePoolExecutor, self).submit(
  152. fn, *args, **kwargs)
  153. def _resize(self, max_workers):
  154. with self._submit_resize_lock:
  155. if max_workers is None:
  156. raise ValueError("Trying to resize with max_workers=None")
  157. elif max_workers == self._max_workers:
  158. return
  159. if self._executor_manager_thread is None:
  160. # If the executor_manager_thread has not been started
  161. # then no processes have been spawned and we can just
  162. # update _max_workers and return
  163. self._max_workers = max_workers
  164. return
  165. self._wait_job_completion()
  166. # Some process might have returned due to timeout so check how many
  167. # children are still alive. Use the _process_management_lock to
  168. # ensure that no process are spawned or timeout during the resize.
  169. with self._processes_management_lock:
  170. processes = list(self._processes.values())
  171. nb_children_alive = sum(p.is_alive() for p in processes)
  172. self._max_workers = max_workers
  173. for _ in range(max_workers, nb_children_alive):
  174. self._call_queue.put(None)
  175. while (len(self._processes) > max_workers
  176. and not self._flags.broken):
  177. time.sleep(1e-3)
  178. self._adjust_process_count()
  179. processes = list(self._processes.values())
  180. while not all([p.is_alive() for p in processes]):
  181. time.sleep(1e-3)
  182. def _wait_job_completion(self):
  183. """Wait for the cache to be empty before resizing the pool."""
  184. # Issue a warning to the user about the bad effect of this usage.
  185. if len(self._pending_work_items) > 0:
  186. warnings.warn("Trying to resize an executor with running jobs: "
  187. "waiting for jobs completion before resizing.",
  188. UserWarning)
  189. mp.util.debug("Executor {} waiting for jobs completion before"
  190. " resizing".format(self.executor_id))
  191. # Wait for the completion of the jobs
  192. while len(self._pending_work_items) > 0:
  193. time.sleep(1e-3)
  194. def _setup_queues(self, job_reducers, result_reducers):
  195. # As this executor can be resized, use a large queue size to avoid
  196. # underestimating capacity and introducing overhead
  197. queue_size = 2 * cpu_count() + EXTRA_QUEUED_CALLS
  198. super(_ReusablePoolExecutor, self)._setup_queues(
  199. job_reducers, result_reducers, queue_size=queue_size)