pool.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. """Custom implementation of multiprocessing.Pool with custom pickler.
  2. This module provides efficient ways of working with data stored in
  3. shared memory with numpy.memmap arrays without inducing any memory
  4. copy between the parent and child processes.
  5. This module should not be imported if multiprocessing is not
  6. available as it implements subclasses of multiprocessing Pool
  7. that uses a custom alternative to SimpleQueue.
  8. """
  9. # Author: Olivier Grisel <olivier.grisel@ensta.org>
  10. # Copyright: 2012, Olivier Grisel
  11. # License: BSD 3 clause
  12. import copyreg
  13. import sys
  14. import warnings
  15. from time import sleep
  16. try:
  17. WindowsError
  18. except NameError:
  19. WindowsError = type(None)
  20. from pickle import Pickler
  21. from pickle import HIGHEST_PROTOCOL
  22. from io import BytesIO
  23. from ._memmapping_reducer import get_memmapping_reducers
  24. from ._memmapping_reducer import TemporaryResourcesManager
  25. from ._multiprocessing_helpers import mp, assert_spawning
  26. # We need the class definition to derive from it, not the multiprocessing.Pool
  27. # factory function
  28. from multiprocessing.pool import Pool
  29. try:
  30. import numpy as np
  31. except ImportError:
  32. np = None
  33. ###############################################################################
  34. # Enable custom pickling in Pool queues
  35. class CustomizablePickler(Pickler):
  36. """Pickler that accepts custom reducers.
  37. TODO python2_drop : can this be simplified ?
  38. HIGHEST_PROTOCOL is selected by default as this pickler is used
  39. to pickle ephemeral datastructures for interprocess communication
  40. hence no backward compatibility is required.
  41. `reducers` is expected to be a dictionary with key/values
  42. being `(type, callable)` pairs where `callable` is a function that
  43. give an instance of `type` will return a tuple `(constructor,
  44. tuple_of_objects)` to rebuild an instance out of the pickled
  45. `tuple_of_objects` as would return a `__reduce__` method. See the
  46. standard library documentation on pickling for more details.
  47. """
  48. # We override the pure Python pickler as its the only way to be able to
  49. # customize the dispatch table without side effects in Python 2.7
  50. # to 3.2. For Python 3.3+ leverage the new dispatch_table
  51. # feature from https://bugs.python.org/issue14166 that makes it possible
  52. # to use the C implementation of the Pickler which is faster.
  53. def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
  54. Pickler.__init__(self, writer, protocol=protocol)
  55. if reducers is None:
  56. reducers = {}
  57. if hasattr(Pickler, 'dispatch'):
  58. # Make the dispatch registry an instance level attribute instead of
  59. # a reference to the class dictionary under Python 2
  60. self.dispatch = Pickler.dispatch.copy()
  61. else:
  62. # Under Python 3 initialize the dispatch table with a copy of the
  63. # default registry
  64. self.dispatch_table = copyreg.dispatch_table.copy()
  65. for type, reduce_func in reducers.items():
  66. self.register(type, reduce_func)
  67. def register(self, type, reduce_func):
  68. """Attach a reducer function to a given type in the dispatch table."""
  69. if hasattr(Pickler, 'dispatch'):
  70. # Python 2 pickler dispatching is not explicitly customizable.
  71. # Let us use a closure to workaround this limitation.
  72. def dispatcher(self, obj):
  73. reduced = reduce_func(obj)
  74. self.save_reduce(obj=obj, *reduced)
  75. self.dispatch[type] = dispatcher
  76. else:
  77. self.dispatch_table[type] = reduce_func
  78. class CustomizablePicklingQueue(object):
  79. """Locked Pipe implementation that uses a customizable pickler.
  80. This class is an alternative to the multiprocessing implementation
  81. of SimpleQueue in order to make it possible to pass custom
  82. pickling reducers, for instance to avoid memory copy when passing
  83. memory mapped datastructures.
  84. `reducers` is expected to be a dict with key / values being
  85. `(type, callable)` pairs where `callable` is a function that, given an
  86. instance of `type`, will return a tuple `(constructor, tuple_of_objects)`
  87. to rebuild an instance out of the pickled `tuple_of_objects` as would
  88. return a `__reduce__` method.
  89. See the standard library documentation on pickling for more details.
  90. """
  91. def __init__(self, context, reducers=None):
  92. self._reducers = reducers
  93. self._reader, self._writer = context.Pipe(duplex=False)
  94. self._rlock = context.Lock()
  95. if sys.platform == 'win32':
  96. self._wlock = None
  97. else:
  98. self._wlock = context.Lock()
  99. self._make_methods()
  100. def __getstate__(self):
  101. assert_spawning(self)
  102. return (self._reader, self._writer, self._rlock, self._wlock,
  103. self._reducers)
  104. def __setstate__(self, state):
  105. (self._reader, self._writer, self._rlock, self._wlock,
  106. self._reducers) = state
  107. self._make_methods()
  108. def empty(self):
  109. return not self._reader.poll()
  110. def _make_methods(self):
  111. self._recv = recv = self._reader.recv
  112. racquire, rrelease = self._rlock.acquire, self._rlock.release
  113. def get():
  114. racquire()
  115. try:
  116. return recv()
  117. finally:
  118. rrelease()
  119. self.get = get
  120. if self._reducers:
  121. def send(obj):
  122. buffer = BytesIO()
  123. CustomizablePickler(buffer, self._reducers).dump(obj)
  124. self._writer.send_bytes(buffer.getvalue())
  125. self._send = send
  126. else:
  127. self._send = send = self._writer.send
  128. if self._wlock is None:
  129. # writes to a message oriented win32 pipe are atomic
  130. self.put = send
  131. else:
  132. wlock_acquire, wlock_release = (
  133. self._wlock.acquire, self._wlock.release)
  134. def put(obj):
  135. wlock_acquire()
  136. try:
  137. return send(obj)
  138. finally:
  139. wlock_release()
  140. self.put = put
  141. class PicklingPool(Pool):
  142. """Pool implementation with customizable pickling reducers.
  143. This is useful to control how data is shipped between processes
  144. and makes it possible to use shared memory without useless
  145. copies induces by the default pickling methods of the original
  146. objects passed as arguments to dispatch.
  147. `forward_reducers` and `backward_reducers` are expected to be
  148. dictionaries with key/values being `(type, callable)` pairs where
  149. `callable` is a function that, given an instance of `type`, will return a
  150. tuple `(constructor, tuple_of_objects)` to rebuild an instance out of the
  151. pickled `tuple_of_objects` as would return a `__reduce__` method.
  152. See the standard library documentation about pickling for more details.
  153. """
  154. def __init__(self, processes=None, forward_reducers=None,
  155. backward_reducers=None, **kwargs):
  156. if forward_reducers is None:
  157. forward_reducers = dict()
  158. if backward_reducers is None:
  159. backward_reducers = dict()
  160. self._forward_reducers = forward_reducers
  161. self._backward_reducers = backward_reducers
  162. poolargs = dict(processes=processes)
  163. poolargs.update(kwargs)
  164. super(PicklingPool, self).__init__(**poolargs)
  165. def _setup_queues(self):
  166. context = getattr(self, '_ctx', mp)
  167. self._inqueue = CustomizablePicklingQueue(context,
  168. self._forward_reducers)
  169. self._outqueue = CustomizablePicklingQueue(context,
  170. self._backward_reducers)
  171. self._quick_put = self._inqueue._send
  172. self._quick_get = self._outqueue._recv
  173. class MemmappingPool(PicklingPool):
  174. """Process pool that shares large arrays to avoid memory copy.
  175. This drop-in replacement for `multiprocessing.pool.Pool` makes
  176. it possible to work efficiently with shared memory in a numpy
  177. context.
  178. Existing instances of numpy.memmap are preserved: the child
  179. suprocesses will have access to the same shared memory in the
  180. original mode except for the 'w+' mode that is automatically
  181. transformed as 'r+' to avoid zeroing the original data upon
  182. instantiation.
  183. Furthermore large arrays from the parent process are automatically
  184. dumped to a temporary folder on the filesystem such as child
  185. processes to access their content via memmapping (file system
  186. backed shared memory).
  187. Note: it is important to call the terminate method to collect
  188. the temporary folder used by the pool.
  189. Parameters
  190. ----------
  191. processes: int, optional
  192. Number of worker processes running concurrently in the pool.
  193. initializer: callable, optional
  194. Callable executed on worker process creation.
  195. initargs: tuple, optional
  196. Arguments passed to the initializer callable.
  197. temp_folder: (str, callable) optional
  198. If str:
  199. Folder to be used by the pool for memmapping large arrays
  200. for sharing memory with worker processes. If None, this will try in
  201. order:
  202. - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable,
  203. - /dev/shm if the folder exists and is writable: this is a RAMdisk
  204. filesystem available by default on modern Linux distributions,
  205. - the default system temporary folder that can be overridden
  206. with TMP, TMPDIR or TEMP environment variables, typically /tmp
  207. under Unix operating systems.
  208. if callable:
  209. An callable in charge of dynamically resolving a temporary folder
  210. for memmapping large arrays.
  211. max_nbytes int or None, optional, 1e6 by default
  212. Threshold on the size of arrays passed to the workers that
  213. triggers automated memory mapping in temp_folder.
  214. Use None to disable memmapping of large arrays.
  215. mmap_mode: {'r+', 'r', 'w+', 'c'}
  216. Memmapping mode for numpy arrays passed to workers.
  217. See 'max_nbytes' parameter documentation for more details.
  218. forward_reducers: dictionary, optional
  219. Reducers used to pickle objects passed from master to worker
  220. processes: see below.
  221. backward_reducers: dictionary, optional
  222. Reducers used to pickle return values from workers back to the
  223. master process.
  224. verbose: int, optional
  225. Make it possible to monitor how the communication of numpy arrays
  226. with the subprocess is handled (pickling or memmapping)
  227. prewarm: bool or str, optional, "auto" by default.
  228. If True, force a read on newly memmapped array to make sure that OS
  229. pre-cache it in memory. This can be useful to avoid concurrent disk
  230. access when the same data array is passed to different worker
  231. processes. If "auto" (by default), prewarm is set to True, unless the
  232. Linux shared memory partition /dev/shm is available and used as temp
  233. folder.
  234. `forward_reducers` and `backward_reducers` are expected to be
  235. dictionaries with key/values being `(type, callable)` pairs where
  236. `callable` is a function that give an instance of `type` will return
  237. a tuple `(constructor, tuple_of_objects)` to rebuild an instance out
  238. of the pickled `tuple_of_objects` as would return a `__reduce__`
  239. method. See the standard library documentation on pickling for more
  240. details.
  241. """
  242. def __init__(self, processes=None, temp_folder=None, max_nbytes=1e6,
  243. mmap_mode='r', forward_reducers=None, backward_reducers=None,
  244. verbose=0, context_id=None, prewarm=False, **kwargs):
  245. if context_id is not None:
  246. warnings.warn('context_id is deprecated and ignored in joblib'
  247. ' 0.9.4 and will be removed in 0.11',
  248. DeprecationWarning)
  249. manager = TemporaryResourcesManager(temp_folder)
  250. self._temp_folder_manager = manager
  251. # The usage of a temp_folder_resolver over a simple temp_folder is
  252. # superfluous for multiprocessing pools, as they don't get reused, see
  253. # get_memmapping_executor for more details. We still use it for code
  254. # simplicity.
  255. forward_reducers, backward_reducers = \
  256. get_memmapping_reducers(
  257. temp_folder_resolver=manager.resolve_temp_folder_name,
  258. max_nbytes=max_nbytes, mmap_mode=mmap_mode,
  259. forward_reducers=forward_reducers,
  260. backward_reducers=backward_reducers, verbose=verbose,
  261. unlink_on_gc_collect=False, prewarm=prewarm)
  262. poolargs = dict(
  263. processes=processes,
  264. forward_reducers=forward_reducers,
  265. backward_reducers=backward_reducers)
  266. poolargs.update(kwargs)
  267. super(MemmappingPool, self).__init__(**poolargs)
  268. def terminate(self):
  269. n_retries = 10
  270. for i in range(n_retries):
  271. try:
  272. super(MemmappingPool, self).terminate()
  273. break
  274. except OSError as e:
  275. if isinstance(e, WindowsError):
  276. # Workaround occasional "[Error 5] Access is denied" issue
  277. # when trying to terminate a process under windows.
  278. sleep(0.1)
  279. if i + 1 == n_retries:
  280. warnings.warn("Failed to terminate worker processes in"
  281. " multiprocessing pool: %r" % e)
  282. self._temp_folder_manager._unlink_temporary_resources()
  283. @property
  284. def _temp_folder(self):
  285. # Legacy property in tests. could be removed if we refactored the
  286. # memmapping tests. SHOULD ONLY BE USED IN TESTS!
  287. # We cache this property because it is called late in the tests - at
  288. # this point, all context have been unregistered, and
  289. # resolve_temp_folder_name raises an error.
  290. if getattr(self, '_cached_temp_folder', None) is not None:
  291. return self._cached_temp_folder
  292. else:
  293. self._cached_temp_folder = self._temp_folder_manager.resolve_temp_folder_name() # noqa
  294. return self._cached_temp_folder