executor.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """Utility function to construct a loky.ReusableExecutor with custom pickler.
  2. This module provides efficient ways of working with data stored in
  3. shared memory with numpy.memmap arrays without inducing any memory
  4. copy between the parent and child processes.
  5. """
  6. # Author: Thomas Moreau <thomas.moreau.2010@gmail.com>
  7. # Copyright: 2017, Thomas Moreau
  8. # License: BSD 3 clause
  9. from ._memmapping_reducer import get_memmapping_reducers
  10. from ._memmapping_reducer import TemporaryResourcesManager
  11. from .externals.loky.reusable_executor import _ReusablePoolExecutor
  12. _executor_args = None
  13. def get_memmapping_executor(n_jobs, **kwargs):
  14. return MemmappingExecutor.get_memmapping_executor(n_jobs, **kwargs)
  15. class MemmappingExecutor(_ReusablePoolExecutor):
  16. @classmethod
  17. def get_memmapping_executor(cls, n_jobs, timeout=300, initializer=None,
  18. initargs=(), env=None, temp_folder=None,
  19. context_id=None, **backend_args):
  20. """Factory for ReusableExecutor with automatic memmapping for large numpy
  21. arrays.
  22. """
  23. global _executor_args
  24. # Check if we can reuse the executor here instead of deferring the test
  25. # to loky as the reducers are objects that changes at each call.
  26. executor_args = backend_args.copy()
  27. executor_args.update(env if env else {})
  28. executor_args.update(dict(
  29. timeout=timeout, initializer=initializer, initargs=initargs))
  30. reuse = _executor_args is None or _executor_args == executor_args
  31. _executor_args = executor_args
  32. manager = TemporaryResourcesManager(temp_folder)
  33. # reducers access the temporary folder in which to store temporary
  34. # pickles through a call to manager.resolve_temp_folder_name. resolving
  35. # the folder name dynamically is useful to use different folders across
  36. # calls of a same reusable executor
  37. job_reducers, result_reducers = get_memmapping_reducers(
  38. unlink_on_gc_collect=True,
  39. temp_folder_resolver=manager.resolve_temp_folder_name,
  40. **backend_args)
  41. _executor, executor_is_reused = super().get_reusable_executor(
  42. n_jobs, job_reducers=job_reducers, result_reducers=result_reducers,
  43. reuse=reuse, timeout=timeout, initializer=initializer,
  44. initargs=initargs, env=env
  45. )
  46. if not executor_is_reused:
  47. # Only set a _temp_folder_manager for new executors. Reused
  48. # executors already have a _temporary_folder_manager that must not
  49. # be re-assigned like that because it is referenced in various
  50. # places in the reducing machinery of the executor.
  51. _executor._temp_folder_manager = manager
  52. if context_id is not None:
  53. # Only register the specified context once we know which manager
  54. # the current executor is using, in order to not register an atexit
  55. # finalizer twice for the same folder.
  56. _executor._temp_folder_manager.register_new_context(context_id)
  57. return _executor
  58. def terminate(self, kill_workers=False):
  59. self.shutdown(kill_workers=kill_workers)
  60. if kill_workers:
  61. # When workers are killed in such a brutal manner, they cannot
  62. # execute the finalizer of their shared memmaps. The refcount of
  63. # those memmaps may be off by an unknown number, so instead of
  64. # decref'ing them, we delete the whole temporary folder, and
  65. # unregister them. There is no risk of PermissionError at folder
  66. # deletion because because at this point, all child processes are
  67. # dead, so all references to temporary memmaps are closed.
  68. # unregister temporary resources from all contexts
  69. with self._submit_resize_lock:
  70. self._temp_folder_manager._unregister_temporary_resources()
  71. self._temp_folder_manager._try_delete_folder(
  72. allow_non_empty=True
  73. )
  74. else:
  75. self._temp_folder_manager._unlink_temporary_resources()
  76. self._temp_folder_manager._try_delete_folder(allow_non_empty=True)
  77. @property
  78. def _temp_folder(self):
  79. # Legacy property in tests. could be removed if we refactored the
  80. # memmapping tests. SHOULD ONLY BE USED IN TESTS!
  81. # We cache this property because it is called late in the tests - at
  82. # this point, all context have been unregistered, and
  83. # resolve_temp_folder_name raises an error.
  84. if getattr(self, '_cached_temp_folder', None) is not None:
  85. return self._cached_temp_folder
  86. else:
  87. self._cached_temp_folder = self._temp_folder_manager.resolve_temp_folder_name() # noqa
  88. return self._cached_temp_folder
  89. class _TestingMemmappingExecutor(MemmappingExecutor):
  90. """Wrapper around ReusableExecutor to ease memmapping testing with Pool
  91. and Executor. This is only for testing purposes.
  92. """
  93. def apply_async(self, func, args):
  94. """Schedule a func to be run"""
  95. future = self.submit(func, *args)
  96. future.get = future.result
  97. return future
  98. def map(self, f, *args):
  99. return list(super().map(f, *args))