_parallel_backends.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. """
  2. Backends for embarrassingly parallel code.
  3. """
  4. import gc
  5. import os
  6. import warnings
  7. import threading
  8. import functools
  9. import contextlib
  10. from abc import ABCMeta, abstractmethod
  11. from .my_exceptions import WorkerInterrupt
  12. from ._multiprocessing_helpers import mp
  13. if mp is not None:
  14. from .pool import MemmappingPool
  15. from multiprocessing.pool import ThreadPool
  16. from .executor import get_memmapping_executor
  17. # Compat between concurrent.futures and multiprocessing TimeoutError
  18. from multiprocessing import TimeoutError
  19. from concurrent.futures._base import TimeoutError as CfTimeoutError
  20. from .externals.loky import process_executor, cpu_count
  21. class ParallelBackendBase(metaclass=ABCMeta):
  22. """Helper abc which defines all methods a ParallelBackend must implement"""
  23. supports_timeout = False
  24. supports_inner_max_num_threads = False
  25. nesting_level = None
  26. def __init__(self, nesting_level=None, inner_max_num_threads=None,
  27. **kwargs):
  28. super().__init__(**kwargs)
  29. self.nesting_level = nesting_level
  30. self.inner_max_num_threads = inner_max_num_threads
  31. MAX_NUM_THREADS_VARS = [
  32. 'OMP_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS',
  33. 'BLIS_NUM_THREADS', 'VECLIB_MAXIMUM_THREADS', 'NUMBA_NUM_THREADS',
  34. 'NUMEXPR_NUM_THREADS',
  35. ]
  36. TBB_ENABLE_IPC_VAR = "ENABLE_IPC"
  37. @abstractmethod
  38. def effective_n_jobs(self, n_jobs):
  39. """Determine the number of jobs that can actually run in parallel
  40. n_jobs is the number of workers requested by the callers. Passing
  41. n_jobs=-1 means requesting all available workers for instance matching
  42. the number of CPU cores on the worker host(s).
  43. This method should return a guesstimate of the number of workers that
  44. can actually perform work concurrently. The primary use case is to make
  45. it possible for the caller to know in how many chunks to slice the
  46. work.
  47. In general working on larger data chunks is more efficient (less
  48. scheduling overhead and better use of CPU cache prefetching heuristics)
  49. as long as all the workers have enough work to do.
  50. """
  51. @abstractmethod
  52. def apply_async(self, func, callback=None):
  53. """Schedule a func to be run"""
  54. def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
  55. **backend_args):
  56. """Reconfigure the backend and return the number of workers.
  57. This makes it possible to reuse an existing backend instance for
  58. successive independent calls to Parallel with different parameters.
  59. """
  60. self.parallel = parallel
  61. return self.effective_n_jobs(n_jobs)
  62. def start_call(self):
  63. """Call-back method called at the beginning of a Parallel call"""
  64. def stop_call(self):
  65. """Call-back method called at the end of a Parallel call"""
  66. def terminate(self):
  67. """Shutdown the workers and free the shared memory."""
  68. def compute_batch_size(self):
  69. """Determine the optimal batch size"""
  70. return 1
  71. def batch_completed(self, batch_size, duration):
  72. """Callback indicate how long it took to run a batch"""
  73. def get_exceptions(self):
  74. """List of exception types to be captured."""
  75. return []
  76. def abort_everything(self, ensure_ready=True):
  77. """Abort any running tasks
  78. This is called when an exception has been raised when executing a tasks
  79. and all the remaining tasks will be ignored and can therefore be
  80. aborted to spare computation resources.
  81. If ensure_ready is True, the backend should be left in an operating
  82. state as future tasks might be re-submitted via that same backend
  83. instance.
  84. If ensure_ready is False, the implementer of this method can decide
  85. to leave the backend in a closed / terminated state as no new task
  86. are expected to be submitted to this backend.
  87. Setting ensure_ready to False is an optimization that can be leveraged
  88. when aborting tasks via killing processes from a local process pool
  89. managed by the backend it-self: if we expect no new tasks, there is no
  90. point in re-creating new workers.
  91. """
  92. # Does nothing by default: to be overridden in subclasses when
  93. # canceling tasks is possible.
  94. pass
  95. def get_nested_backend(self):
  96. """Backend instance to be used by nested Parallel calls.
  97. By default a thread-based backend is used for the first level of
  98. nesting. Beyond, switch to sequential backend to avoid spawning too
  99. many threads on the host.
  100. """
  101. nesting_level = getattr(self, 'nesting_level', 0) + 1
  102. if nesting_level > 1:
  103. return SequentialBackend(nesting_level=nesting_level), None
  104. else:
  105. return ThreadingBackend(nesting_level=nesting_level), None
  106. @contextlib.contextmanager
  107. def retrieval_context(self):
  108. """Context manager to manage an execution context.
  109. Calls to Parallel.retrieve will be made inside this context.
  110. By default, this does nothing. It may be useful for subclasses to
  111. handle nested parallelism. In particular, it may be required to avoid
  112. deadlocks if a backend manages a fixed number of workers, when those
  113. workers may be asked to do nested Parallel calls. Without
  114. 'retrieval_context' this could lead to deadlock, as all the workers
  115. managed by the backend may be "busy" waiting for the nested parallel
  116. calls to finish, but the backend has no free workers to execute those
  117. tasks.
  118. """
  119. yield
  120. def _prepare_worker_env(self, n_jobs):
  121. """Return environment variables limiting threadpools in external libs.
  122. This function return a dict containing environment variables to pass
  123. when creating a pool of process. These environment variables limit the
  124. number of threads to `n_threads` for OpenMP, MKL, Accelerated and
  125. OpenBLAS libraries in the child processes.
  126. """
  127. explicit_n_threads = self.inner_max_num_threads
  128. default_n_threads = str(max(cpu_count() // n_jobs, 1))
  129. # Set the inner environment variables to self.inner_max_num_threads if
  130. # it is given. Else, default to cpu_count // n_jobs unless the variable
  131. # is already present in the parent process environment.
  132. env = {}
  133. for var in self.MAX_NUM_THREADS_VARS:
  134. if explicit_n_threads is None:
  135. var_value = os.environ.get(var, None)
  136. if var_value is None:
  137. var_value = default_n_threads
  138. else:
  139. var_value = str(explicit_n_threads)
  140. env[var] = var_value
  141. if self.TBB_ENABLE_IPC_VAR not in os.environ:
  142. # To avoid over-subscription when using TBB, let the TBB schedulers
  143. # use Inter Process Communication to coordinate:
  144. env[self.TBB_ENABLE_IPC_VAR] = "1"
  145. return env
  146. @staticmethod
  147. def in_main_thread():
  148. return isinstance(threading.current_thread(), threading._MainThread)
  149. class SequentialBackend(ParallelBackendBase):
  150. """A ParallelBackend which will execute all batches sequentially.
  151. Does not use/create any threading objects, and hence has minimal
  152. overhead. Used when n_jobs == 1.
  153. """
  154. uses_threads = True
  155. supports_sharedmem = True
  156. def effective_n_jobs(self, n_jobs):
  157. """Determine the number of jobs which are going to run in parallel"""
  158. if n_jobs == 0:
  159. raise ValueError('n_jobs == 0 in Parallel has no meaning')
  160. return 1
  161. def apply_async(self, func, callback=None):
  162. """Schedule a func to be run"""
  163. result = ImmediateResult(func)
  164. if callback:
  165. callback(result)
  166. return result
  167. def get_nested_backend(self):
  168. # import is not top level to avoid cyclic import errors.
  169. from .parallel import get_active_backend
  170. # SequentialBackend should neither change the nesting level, the
  171. # default backend or the number of jobs. Just return the current one.
  172. return get_active_backend()
  173. class PoolManagerMixin(object):
  174. """A helper class for managing pool of workers."""
  175. _pool = None
  176. def effective_n_jobs(self, n_jobs):
  177. """Determine the number of jobs which are going to run in parallel"""
  178. if n_jobs == 0:
  179. raise ValueError('n_jobs == 0 in Parallel has no meaning')
  180. elif mp is None or n_jobs is None:
  181. # multiprocessing is not available or disabled, fallback
  182. # to sequential mode
  183. return 1
  184. elif n_jobs < 0:
  185. n_jobs = max(cpu_count() + 1 + n_jobs, 1)
  186. return n_jobs
  187. def terminate(self):
  188. """Shutdown the process or thread pool"""
  189. if self._pool is not None:
  190. self._pool.close()
  191. self._pool.terminate() # terminate does a join()
  192. self._pool = None
  193. def _get_pool(self):
  194. """Used by apply_async to make it possible to implement lazy init"""
  195. return self._pool
  196. def apply_async(self, func, callback=None):
  197. """Schedule a func to be run"""
  198. return self._get_pool().apply_async(
  199. SafeFunction(func), callback=callback)
  200. def abort_everything(self, ensure_ready=True):
  201. """Shutdown the pool and restart a new one with the same parameters"""
  202. self.terminate()
  203. if ensure_ready:
  204. self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
  205. **self.parallel._backend_args)
  206. class AutoBatchingMixin(object):
  207. """A helper class for automagically batching jobs."""
  208. # In seconds, should be big enough to hide multiprocessing dispatching
  209. # overhead.
  210. # This settings was found by running benchmarks/bench_auto_batching.py
  211. # with various parameters on various platforms.
  212. MIN_IDEAL_BATCH_DURATION = .2
  213. # Should not be too high to avoid stragglers: long jobs running alone
  214. # on a single worker while other workers have no work to process any more.
  215. MAX_IDEAL_BATCH_DURATION = 2
  216. # Batching counters default values
  217. _DEFAULT_EFFECTIVE_BATCH_SIZE = 1
  218. _DEFAULT_SMOOTHED_BATCH_DURATION = 0.0
  219. def __init__(self, **kwargs):
  220. super().__init__(**kwargs)
  221. self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
  222. self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
  223. def compute_batch_size(self):
  224. """Determine the optimal batch size"""
  225. old_batch_size = self._effective_batch_size
  226. batch_duration = self._smoothed_batch_duration
  227. if (batch_duration > 0 and
  228. batch_duration < self.MIN_IDEAL_BATCH_DURATION):
  229. # The current batch size is too small: the duration of the
  230. # processing of a batch of task is not large enough to hide
  231. # the scheduling overhead.
  232. ideal_batch_size = int(old_batch_size *
  233. self.MIN_IDEAL_BATCH_DURATION /
  234. batch_duration)
  235. # Multiply by two to limit oscilations between min and max.
  236. ideal_batch_size *= 2
  237. # dont increase the batch size too fast to limit huge batch sizes
  238. # potentially leading to starving worker
  239. batch_size = min(2 * old_batch_size, ideal_batch_size)
  240. batch_size = max(batch_size, 1)
  241. self._effective_batch_size = batch_size
  242. if self.parallel.verbose >= 10:
  243. self.parallel._print(
  244. "Batch computation too fast (%.4fs.) "
  245. "Setting batch_size=%d.", (batch_duration, batch_size))
  246. elif (batch_duration > self.MAX_IDEAL_BATCH_DURATION and
  247. old_batch_size >= 2):
  248. # The current batch size is too big. If we schedule overly long
  249. # running batches some CPUs might wait with nothing left to do
  250. # while a couple of CPUs a left processing a few long running
  251. # batches. Better reduce the batch size a bit to limit the
  252. # likelihood of scheduling such stragglers.
  253. # decrease the batch size quickly to limit potential starving
  254. ideal_batch_size = int(
  255. old_batch_size * self.MIN_IDEAL_BATCH_DURATION / batch_duration
  256. )
  257. # Multiply by two to limit oscilations between min and max.
  258. batch_size = max(2 * ideal_batch_size, 1)
  259. self._effective_batch_size = batch_size
  260. if self.parallel.verbose >= 10:
  261. self.parallel._print(
  262. "Batch computation too slow (%.4fs.) "
  263. "Setting batch_size=%d.", (batch_duration, batch_size))
  264. else:
  265. # No batch size adjustment
  266. batch_size = old_batch_size
  267. if batch_size != old_batch_size:
  268. # Reset estimation of the smoothed mean batch duration: this
  269. # estimate is updated in the multiprocessing apply_async
  270. # CallBack as long as the batch_size is constant. Therefore
  271. # we need to reset the estimate whenever we re-tune the batch
  272. # size.
  273. self._smoothed_batch_duration = \
  274. self._DEFAULT_SMOOTHED_BATCH_DURATION
  275. return batch_size
  276. def batch_completed(self, batch_size, duration):
  277. """Callback indicate how long it took to run a batch"""
  278. if batch_size == self._effective_batch_size:
  279. # Update the smoothed streaming estimate of the duration of a batch
  280. # from dispatch to completion
  281. old_duration = self._smoothed_batch_duration
  282. if old_duration == self._DEFAULT_SMOOTHED_BATCH_DURATION:
  283. # First record of duration for this batch size after the last
  284. # reset.
  285. new_duration = duration
  286. else:
  287. # Update the exponentially weighted average of the duration of
  288. # batch for the current effective size.
  289. new_duration = 0.8 * old_duration + 0.2 * duration
  290. self._smoothed_batch_duration = new_duration
  291. def reset_batch_stats(self):
  292. """Reset batch statistics to default values.
  293. This avoids interferences with future jobs.
  294. """
  295. self._effective_batch_size = self._DEFAULT_EFFECTIVE_BATCH_SIZE
  296. self._smoothed_batch_duration = self._DEFAULT_SMOOTHED_BATCH_DURATION
  297. class ThreadingBackend(PoolManagerMixin, ParallelBackendBase):
  298. """A ParallelBackend which will use a thread pool to execute batches in.
  299. This is a low-overhead backend but it suffers from the Python Global
  300. Interpreter Lock if the called function relies a lot on Python objects.
  301. Mostly useful when the execution bottleneck is a compiled extension that
  302. explicitly releases the GIL (for instance a Cython loop wrapped in a "with
  303. nogil" block or an expensive call to a library such as NumPy).
  304. The actual thread pool is lazily initialized: the actual thread pool
  305. construction is delayed to the first call to apply_async.
  306. ThreadingBackend is used as the default backend for nested calls.
  307. """
  308. supports_timeout = True
  309. uses_threads = True
  310. supports_sharedmem = True
  311. def configure(self, n_jobs=1, parallel=None, **backend_args):
  312. """Build a process or thread pool and return the number of workers"""
  313. n_jobs = self.effective_n_jobs(n_jobs)
  314. if n_jobs == 1:
  315. # Avoid unnecessary overhead and use sequential backend instead.
  316. raise FallbackToBackend(
  317. SequentialBackend(nesting_level=self.nesting_level))
  318. self.parallel = parallel
  319. self._n_jobs = n_jobs
  320. return n_jobs
  321. def _get_pool(self):
  322. """Lazily initialize the thread pool
  323. The actual pool of worker threads is only initialized at the first
  324. call to apply_async.
  325. """
  326. if self._pool is None:
  327. self._pool = ThreadPool(self._n_jobs)
  328. return self._pool
  329. class MultiprocessingBackend(PoolManagerMixin, AutoBatchingMixin,
  330. ParallelBackendBase):
  331. """A ParallelBackend which will use a multiprocessing.Pool.
  332. Will introduce some communication and memory overhead when exchanging
  333. input and output data with the with the worker Python processes.
  334. However, does not suffer from the Python Global Interpreter Lock.
  335. """
  336. supports_timeout = True
  337. def effective_n_jobs(self, n_jobs):
  338. """Determine the number of jobs which are going to run in parallel.
  339. This also checks if we are attempting to create a nested parallel
  340. loop.
  341. """
  342. if mp is None:
  343. return 1
  344. if mp.current_process().daemon:
  345. # Daemonic processes cannot have children
  346. if n_jobs != 1:
  347. warnings.warn(
  348. 'Multiprocessing-backed parallel loops cannot be nested,'
  349. ' setting n_jobs=1',
  350. stacklevel=3)
  351. return 1
  352. if process_executor._CURRENT_DEPTH > 0:
  353. # Mixing loky and multiprocessing in nested loop is not supported
  354. if n_jobs != 1:
  355. warnings.warn(
  356. 'Multiprocessing-backed parallel loops cannot be nested,'
  357. ' below loky, setting n_jobs=1',
  358. stacklevel=3)
  359. return 1
  360. elif not (self.in_main_thread() or self.nesting_level == 0):
  361. # Prevent posix fork inside in non-main posix threads
  362. if n_jobs != 1:
  363. warnings.warn(
  364. 'Multiprocessing-backed parallel loops cannot be nested'
  365. ' below threads, setting n_jobs=1',
  366. stacklevel=3)
  367. return 1
  368. return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs)
  369. def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
  370. **memmappingpool_args):
  371. """Build a process or thread pool and return the number of workers"""
  372. n_jobs = self.effective_n_jobs(n_jobs)
  373. if n_jobs == 1:
  374. raise FallbackToBackend(
  375. SequentialBackend(nesting_level=self.nesting_level))
  376. # Make sure to free as much memory as possible before forking
  377. gc.collect()
  378. self._pool = MemmappingPool(n_jobs, **memmappingpool_args)
  379. self.parallel = parallel
  380. return n_jobs
  381. def terminate(self):
  382. """Shutdown the process or thread pool"""
  383. super(MultiprocessingBackend, self).terminate()
  384. self.reset_batch_stats()
  385. class LokyBackend(AutoBatchingMixin, ParallelBackendBase):
  386. """Managing pool of workers with loky instead of multiprocessing."""
  387. supports_timeout = True
  388. supports_inner_max_num_threads = True
  389. def configure(self, n_jobs=1, parallel=None, prefer=None, require=None,
  390. idle_worker_timeout=300, **memmappingexecutor_args):
  391. """Build a process executor and return the number of workers"""
  392. n_jobs = self.effective_n_jobs(n_jobs)
  393. if n_jobs == 1:
  394. raise FallbackToBackend(
  395. SequentialBackend(nesting_level=self.nesting_level))
  396. self._workers = get_memmapping_executor(
  397. n_jobs, timeout=idle_worker_timeout,
  398. env=self._prepare_worker_env(n_jobs=n_jobs),
  399. context_id=parallel._id, **memmappingexecutor_args)
  400. self.parallel = parallel
  401. return n_jobs
  402. def effective_n_jobs(self, n_jobs):
  403. """Determine the number of jobs which are going to run in parallel"""
  404. if n_jobs == 0:
  405. raise ValueError('n_jobs == 0 in Parallel has no meaning')
  406. elif mp is None or n_jobs is None:
  407. # multiprocessing is not available or disabled, fallback
  408. # to sequential mode
  409. return 1
  410. elif mp.current_process().daemon:
  411. # Daemonic processes cannot have children
  412. if n_jobs != 1:
  413. warnings.warn(
  414. 'Loky-backed parallel loops cannot be called in a'
  415. ' multiprocessing, setting n_jobs=1',
  416. stacklevel=3)
  417. return 1
  418. elif not (self.in_main_thread() or self.nesting_level == 0):
  419. # Prevent posix fork inside in non-main posix threads
  420. if n_jobs != 1:
  421. warnings.warn(
  422. 'Loky-backed parallel loops cannot be nested below '
  423. 'threads, setting n_jobs=1',
  424. stacklevel=3)
  425. return 1
  426. elif n_jobs < 0:
  427. n_jobs = max(cpu_count() + 1 + n_jobs, 1)
  428. return n_jobs
  429. def apply_async(self, func, callback=None):
  430. """Schedule a func to be run"""
  431. future = self._workers.submit(SafeFunction(func))
  432. future.get = functools.partial(self.wrap_future_result, future)
  433. if callback is not None:
  434. future.add_done_callback(callback)
  435. return future
  436. @staticmethod
  437. def wrap_future_result(future, timeout=None):
  438. """Wrapper for Future.result to implement the same behaviour as
  439. AsyncResults.get from multiprocessing."""
  440. try:
  441. return future.result(timeout=timeout)
  442. except CfTimeoutError as e:
  443. raise TimeoutError from e
  444. def terminate(self):
  445. if self._workers is not None:
  446. # Don't terminate the workers as we want to reuse them in later
  447. # calls, but cleanup the temporary resources that the Parallel call
  448. # created. This 'hack' requires a private, low-level operation.
  449. self._workers._temp_folder_manager._unlink_temporary_resources(
  450. context_id=self.parallel._id
  451. )
  452. self._workers = None
  453. self.reset_batch_stats()
  454. def abort_everything(self, ensure_ready=True):
  455. """Shutdown the workers and restart a new one with the same parameters
  456. """
  457. self._workers.terminate(kill_workers=True)
  458. self._workers = None
  459. if ensure_ready:
  460. self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel)
  461. class ImmediateResult(object):
  462. def __init__(self, batch):
  463. # Don't delay the application, to avoid keeping the input
  464. # arguments in memory
  465. self.results = batch()
  466. def get(self):
  467. return self.results
  468. class SafeFunction(object):
  469. """Wrapper that handles the serialization of exception tracebacks.
  470. TODO python2_drop: check whether SafeFunction is still needed since we
  471. dropped support for Python 2. If not needed anymore it should be
  472. deprecated.
  473. If an exception is triggered when calling the inner function, a copy of
  474. the full traceback is captured to make it possible to serialize
  475. it so that it can be rendered in a different Python process.
  476. """
  477. def __init__(self, func):
  478. self.func = func
  479. def __call__(self, *args, **kwargs):
  480. try:
  481. return self.func(*args, **kwargs)
  482. except KeyboardInterrupt as e:
  483. # We capture the KeyboardInterrupt and reraise it as
  484. # something different, as multiprocessing does not
  485. # interrupt processing for a KeyboardInterrupt
  486. raise WorkerInterrupt() from e
  487. except BaseException:
  488. # Rely on Python 3 built-in Remote Traceback reporting
  489. raise
  490. class FallbackToBackend(Exception):
  491. """Raised when configuration should fallback to another backend"""
  492. def __init__(self, backend):
  493. self.backend = backend