_memmapping_reducer.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664
  1. """
  2. Reducer using memory mapping for numpy arrays
  3. """
  4. # Author: Thomas Moreau <thomas.moreau.2010@gmail.com>
  5. # Copyright: 2017, Thomas Moreau
  6. # License: BSD 3 clause
  7. from mmap import mmap
  8. import errno
  9. import os
  10. import stat
  11. import threading
  12. import atexit
  13. import tempfile
  14. import time
  15. import warnings
  16. import weakref
  17. from uuid import uuid4
  18. from multiprocessing import util
  19. from pickle import whichmodule, loads, dumps, HIGHEST_PROTOCOL, PicklingError
  20. try:
  21. WindowsError
  22. except NameError:
  23. WindowsError = type(None)
  24. try:
  25. import numpy as np
  26. from numpy.lib.stride_tricks import as_strided
  27. except ImportError:
  28. np = None
  29. from .numpy_pickle import dump, load, load_temporary_memmap
  30. from .backports import make_memmap
  31. from .disk import delete_folder
  32. from .externals.loky.backend import resource_tracker
  33. # Some system have a ramdisk mounted by default, we can use it instead of /tmp
  34. # as the default folder to dump big arrays to share with subprocesses.
  35. SYSTEM_SHARED_MEM_FS = '/dev/shm'
  36. # Minimal number of bytes available on SYSTEM_SHARED_MEM_FS to consider using
  37. # it as the default folder to dump big arrays to share with subprocesses.
  38. SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(2e9)
  39. # Folder and file permissions to chmod temporary files generated by the
  40. # memmapping pool. Only the owner of the Python process can access the
  41. # temporary files and folder.
  42. FOLDER_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
  43. FILE_PERMISSIONS = stat.S_IRUSR | stat.S_IWUSR
  44. # Set used in joblib workers, referencing the filenames of temporary memmaps
  45. # created by joblib to speed up data communication. In child processes, we add
  46. # a finalizer to these memmaps that sends a maybe_unlink call to the
  47. # resource_tracker, in order to free main memory as fast as possible.
  48. JOBLIB_MMAPS = set()
  49. def _log_and_unlink(filename):
  50. from .externals.loky.backend.resource_tracker import _resource_tracker
  51. util.debug(
  52. "[FINALIZER CALL] object mapping to {} about to be deleted,"
  53. " decrementing the refcount of the file (pid: {})".format(
  54. os.path.basename(filename), os.getpid()))
  55. _resource_tracker.maybe_unlink(filename, "file")
  56. def add_maybe_unlink_finalizer(memmap):
  57. util.debug(
  58. "[FINALIZER ADD] adding finalizer to {} (id {}, filename {}, pid {})"
  59. "".format(type(memmap), id(memmap), os.path.basename(memmap.filename),
  60. os.getpid()))
  61. weakref.finalize(memmap, _log_and_unlink, memmap.filename)
  62. def unlink_file(filename):
  63. """Wrapper around os.unlink with a retry mechanism.
  64. The retry mechanism has been implemented primarily to overcome a race
  65. condition happening during the finalizer of a np.memmap: when a process
  66. holding the last reference to a mmap-backed np.memmap/np.array is about to
  67. delete this array (and close the reference), it sends a maybe_unlink
  68. request to the resource_tracker. This request can be processed faster than
  69. it takes for the last reference of the memmap to be closed, yielding (on
  70. Windows) a PermissionError in the resource_tracker loop.
  71. """
  72. NUM_RETRIES = 10
  73. for retry_no in range(1, NUM_RETRIES + 1):
  74. try:
  75. os.unlink(filename)
  76. break
  77. except PermissionError:
  78. util.debug(
  79. '[ResourceTracker] tried to unlink {}, got '
  80. 'PermissionError'.format(filename)
  81. )
  82. if retry_no == NUM_RETRIES:
  83. raise
  84. else:
  85. time.sleep(.2)
  86. resource_tracker._CLEANUP_FUNCS['file'] = unlink_file
  87. class _WeakArrayKeyMap:
  88. """A variant of weakref.WeakKeyDictionary for unhashable numpy arrays.
  89. This datastructure will be used with numpy arrays as obj keys, therefore we
  90. do not use the __get__ / __set__ methods to avoid any conflict with the
  91. numpy fancy indexing syntax.
  92. """
  93. def __init__(self):
  94. self._data = {}
  95. def get(self, obj):
  96. ref, val = self._data[id(obj)]
  97. if ref() is not obj:
  98. # In case of race condition with on_destroy: could never be
  99. # triggered by the joblib tests with CPython.
  100. raise KeyError(obj)
  101. return val
  102. def set(self, obj, value):
  103. key = id(obj)
  104. try:
  105. ref, _ = self._data[key]
  106. if ref() is not obj:
  107. # In case of race condition with on_destroy: could never be
  108. # triggered by the joblib tests with CPython.
  109. raise KeyError(obj)
  110. except KeyError:
  111. # Insert the new entry in the mapping along with a weakref
  112. # callback to automatically delete the entry from the mapping
  113. # as soon as the object used as key is garbage collected.
  114. def on_destroy(_):
  115. del self._data[key]
  116. ref = weakref.ref(obj, on_destroy)
  117. self._data[key] = ref, value
  118. def __getstate__(self):
  119. raise PicklingError("_WeakArrayKeyMap is not pickleable")
  120. ###############################################################################
  121. # Support for efficient transient pickling of numpy data structures
  122. def _get_backing_memmap(a):
  123. """Recursively look up the original np.memmap instance base if any."""
  124. b = getattr(a, 'base', None)
  125. if b is None:
  126. # TODO: check scipy sparse datastructure if scipy is installed
  127. # a nor its descendants do not have a memmap base
  128. return None
  129. elif isinstance(b, mmap):
  130. # a is already a real memmap instance.
  131. return a
  132. else:
  133. # Recursive exploration of the base ancestry
  134. return _get_backing_memmap(b)
  135. def _get_temp_dir(pool_folder_name, temp_folder=None):
  136. """Get the full path to a subfolder inside the temporary folder.
  137. Parameters
  138. ----------
  139. pool_folder_name : str
  140. Sub-folder name used for the serialization of a pool instance.
  141. temp_folder: str, optional
  142. Folder to be used by the pool for memmapping large arrays
  143. for sharing memory with worker processes. If None, this will try in
  144. order:
  145. - a folder pointed by the JOBLIB_TEMP_FOLDER environment
  146. variable,
  147. - /dev/shm if the folder exists and is writable: this is a
  148. RAMdisk filesystem available by default on modern Linux
  149. distributions,
  150. - the default system temporary folder that can be
  151. overridden with TMP, TMPDIR or TEMP environment
  152. variables, typically /tmp under Unix operating systems.
  153. Returns
  154. -------
  155. pool_folder : str
  156. full path to the temporary folder
  157. use_shared_mem : bool
  158. whether the temporary folder is written to the system shared memory
  159. folder or some other temporary folder.
  160. """
  161. use_shared_mem = False
  162. if temp_folder is None:
  163. temp_folder = os.environ.get('JOBLIB_TEMP_FOLDER', None)
  164. if temp_folder is None:
  165. if os.path.exists(SYSTEM_SHARED_MEM_FS):
  166. try:
  167. shm_stats = os.statvfs(SYSTEM_SHARED_MEM_FS)
  168. available_nbytes = shm_stats.f_bsize * shm_stats.f_bavail
  169. if available_nbytes > SYSTEM_SHARED_MEM_FS_MIN_SIZE:
  170. # Try to see if we have write access to the shared mem
  171. # folder only if it is reasonably large (that is 2GB or
  172. # more).
  173. temp_folder = SYSTEM_SHARED_MEM_FS
  174. pool_folder = os.path.join(temp_folder, pool_folder_name)
  175. if not os.path.exists(pool_folder):
  176. os.makedirs(pool_folder)
  177. use_shared_mem = True
  178. except (IOError, OSError):
  179. # Missing rights in the /dev/shm partition, fallback to regular
  180. # temp folder.
  181. temp_folder = None
  182. if temp_folder is None:
  183. # Fallback to the default tmp folder, typically /tmp
  184. temp_folder = tempfile.gettempdir()
  185. temp_folder = os.path.abspath(os.path.expanduser(temp_folder))
  186. pool_folder = os.path.join(temp_folder, pool_folder_name)
  187. return pool_folder, use_shared_mem
  188. def has_shareable_memory(a):
  189. """Return True if a is backed by some mmap buffer directly or not."""
  190. return _get_backing_memmap(a) is not None
  191. def _strided_from_memmap(filename, dtype, mode, offset, order, shape, strides,
  192. total_buffer_len, unlink_on_gc_collect):
  193. """Reconstruct an array view on a memory mapped file."""
  194. if mode == 'w+':
  195. # Do not zero the original data when unpickling
  196. mode = 'r+'
  197. if strides is None:
  198. # Simple, contiguous memmap
  199. return make_memmap(
  200. filename, dtype=dtype, shape=shape, mode=mode, offset=offset,
  201. order=order, unlink_on_gc_collect=unlink_on_gc_collect
  202. )
  203. else:
  204. # For non-contiguous data, memmap the total enclosing buffer and then
  205. # extract the non-contiguous view with the stride-tricks API
  206. base = make_memmap(
  207. filename, dtype=dtype, shape=total_buffer_len, offset=offset,
  208. mode=mode, order=order, unlink_on_gc_collect=unlink_on_gc_collect
  209. )
  210. return as_strided(base, shape=shape, strides=strides)
  211. def _reduce_memmap_backed(a, m):
  212. """Pickling reduction for memmap backed arrays.
  213. a is expected to be an instance of np.ndarray (or np.memmap)
  214. m is expected to be an instance of np.memmap on the top of the ``base``
  215. attribute ancestry of a. ``m.base`` should be the real python mmap object.
  216. """
  217. # offset that comes from the striding differences between a and m
  218. util.debug('[MEMMAP REDUCE] reducing a memmap-backed array '
  219. '(shape, {}, pid: {})'.format(a.shape, os.getpid()))
  220. a_start, a_end = np.byte_bounds(a)
  221. m_start = np.byte_bounds(m)[0]
  222. offset = a_start - m_start
  223. # offset from the backing memmap
  224. offset += m.offset
  225. if m.flags['F_CONTIGUOUS']:
  226. order = 'F'
  227. else:
  228. # The backing memmap buffer is necessarily contiguous hence C if not
  229. # Fortran
  230. order = 'C'
  231. if a.flags['F_CONTIGUOUS'] or a.flags['C_CONTIGUOUS']:
  232. # If the array is a contiguous view, no need to pass the strides
  233. strides = None
  234. total_buffer_len = None
  235. else:
  236. # Compute the total number of items to map from which the strided
  237. # view will be extracted.
  238. strides = a.strides
  239. total_buffer_len = (a_end - a_start) // a.itemsize
  240. return (_strided_from_memmap,
  241. (m.filename, a.dtype, m.mode, offset, order, a.shape, strides,
  242. total_buffer_len, False))
  243. def reduce_array_memmap_backward(a):
  244. """reduce a np.array or a np.memmap from a child process"""
  245. m = _get_backing_memmap(a)
  246. if isinstance(m, np.memmap) and m.filename not in JOBLIB_MMAPS:
  247. # if a is backed by a memmaped file, reconstruct a using the
  248. # memmaped file.
  249. return _reduce_memmap_backed(a, m)
  250. else:
  251. # a is either a regular (not memmap-backed) numpy array, or an array
  252. # backed by a shared temporary file created by joblib. In the latter
  253. # case, in order to limit the lifespan of these temporary files, we
  254. # serialize the memmap as a regular numpy array, and decref the
  255. # file backing the memmap (done implicitly in a previously registered
  256. # finalizer, see ``unlink_on_gc_collect`` for more details)
  257. return (
  258. loads, (dumps(np.asarray(a), protocol=HIGHEST_PROTOCOL), )
  259. )
  260. class ArrayMemmapForwardReducer(object):
  261. """Reducer callable to dump large arrays to memmap files.
  262. Parameters
  263. ----------
  264. max_nbytes: int
  265. Threshold to trigger memmapping of large arrays to files created
  266. a folder.
  267. temp_folder_resolver: callable
  268. An callable in charge of resolving a temporary folder name where files
  269. for backing memmapped arrays are created.
  270. mmap_mode: 'r', 'r+' or 'c'
  271. Mode for the created memmap datastructure. See the documentation of
  272. numpy.memmap for more details. Note: 'w+' is coerced to 'r+'
  273. automatically to avoid zeroing the data on unpickling.
  274. verbose: int, optional, 0 by default
  275. If verbose > 0, memmap creations are logged.
  276. If verbose > 1, both memmap creations, reuse and array pickling are
  277. logged.
  278. prewarm: bool, optional, False by default.
  279. Force a read on newly memmapped array to make sure that OS pre-cache it
  280. memory. This can be useful to avoid concurrent disk access when the
  281. same data array is passed to different worker processes.
  282. """
  283. def __init__(self, max_nbytes, temp_folder_resolver, mmap_mode,
  284. unlink_on_gc_collect, verbose=0, prewarm=True):
  285. self._max_nbytes = max_nbytes
  286. self._temp_folder_resolver = temp_folder_resolver
  287. self._mmap_mode = mmap_mode
  288. self.verbose = int(verbose)
  289. if prewarm == "auto":
  290. self._prewarm = not self._temp_folder.startswith(
  291. SYSTEM_SHARED_MEM_FS
  292. )
  293. else:
  294. self._prewarm = prewarm
  295. self._prewarm = prewarm
  296. self._memmaped_arrays = _WeakArrayKeyMap()
  297. self._temporary_memmaped_filenames = set()
  298. self._unlink_on_gc_collect = unlink_on_gc_collect
  299. @property
  300. def _temp_folder(self):
  301. return self._temp_folder_resolver()
  302. def __reduce__(self):
  303. # The ArrayMemmapForwardReducer is passed to the children processes: it
  304. # needs to be pickled but the _WeakArrayKeyMap need to be skipped as
  305. # it's only guaranteed to be consistent with the parent process memory
  306. # garbage collection.
  307. # Although this reducer is pickled, it is not needed in its destination
  308. # process (child processes), as we only use this reducer to send
  309. # memmaps from the parent process to the children processes. For this
  310. # reason, we can afford skipping the resolver, (which would otherwise
  311. # be unpicklable), and pass it as None instead.
  312. args = (self._max_nbytes, None, self._mmap_mode,
  313. self._unlink_on_gc_collect)
  314. kwargs = {
  315. 'verbose': self.verbose,
  316. 'prewarm': self._prewarm,
  317. }
  318. return ArrayMemmapForwardReducer, args, kwargs
  319. def __call__(self, a):
  320. m = _get_backing_memmap(a)
  321. if m is not None and isinstance(m, np.memmap):
  322. # a is already backed by a memmap file, let's reuse it directly
  323. return _reduce_memmap_backed(a, m)
  324. if (not a.dtype.hasobject and self._max_nbytes is not None and
  325. a.nbytes > self._max_nbytes):
  326. # check that the folder exists (lazily create the pool temp folder
  327. # if required)
  328. try:
  329. os.makedirs(self._temp_folder)
  330. os.chmod(self._temp_folder, FOLDER_PERMISSIONS)
  331. except OSError as e:
  332. if e.errno != errno.EEXIST:
  333. raise e
  334. try:
  335. basename = self._memmaped_arrays.get(a)
  336. except KeyError:
  337. # Generate a new unique random filename. The process and thread
  338. # ids are only useful for debugging purpose and to make it
  339. # easier to cleanup orphaned files in case of hard process
  340. # kill (e.g. by "kill -9" or segfault).
  341. basename = "{}-{}-{}.pkl".format(
  342. os.getpid(), id(threading.current_thread()), uuid4().hex)
  343. self._memmaped_arrays.set(a, basename)
  344. filename = os.path.join(self._temp_folder, basename)
  345. # In case the same array with the same content is passed several
  346. # times to the pool subprocess children, serialize it only once
  347. is_new_memmap = filename not in self._temporary_memmaped_filenames
  348. # add the memmap to the list of temporary memmaps created by joblib
  349. self._temporary_memmaped_filenames.add(filename)
  350. if self._unlink_on_gc_collect:
  351. # Bump reference count of the memmap by 1 to account for
  352. # shared usage of the memmap by a child process. The
  353. # corresponding decref call will be executed upon calling
  354. # resource_tracker.maybe_unlink, registered as a finalizer in
  355. # the child.
  356. # the incref/decref calls here are only possible when the child
  357. # and the parent share the same resource_tracker. It is not the
  358. # case for the multiprocessing backend, but it does not matter
  359. # because unlinking a memmap from a child process is only
  360. # useful to control the memory usage of long-lasting child
  361. # processes, while the multiprocessing-based pools terminate
  362. # their workers at the end of a map() call.
  363. resource_tracker.register(filename, "file")
  364. if is_new_memmap:
  365. # Incref each temporary memmap created by joblib one extra
  366. # time. This means that these memmaps will only be deleted
  367. # once an extra maybe_unlink() is called, which is done once
  368. # all the jobs have completed (or been canceled) in the
  369. # Parallel._terminate_backend() method.
  370. resource_tracker.register(filename, "file")
  371. if not os.path.exists(filename):
  372. util.debug(
  373. "[ARRAY DUMP] Pickling new array (shape={}, dtype={}) "
  374. "creating a new memmap at {}".format(
  375. a.shape, a.dtype, filename))
  376. for dumped_filename in dump(a, filename):
  377. os.chmod(dumped_filename, FILE_PERMISSIONS)
  378. if self._prewarm:
  379. # Warm up the data by accessing it. This operation ensures
  380. # that the disk access required to create the memmapping
  381. # file are performed in the reducing process and avoids
  382. # concurrent memmap creation in multiple children
  383. # processes.
  384. load(filename, mmap_mode=self._mmap_mode).max()
  385. else:
  386. util.debug(
  387. "[ARRAY DUMP] Pickling known array (shape={}, dtype={}) "
  388. "reusing memmap file: {}".format(
  389. a.shape, a.dtype, os.path.basename(filename)))
  390. # The worker process will use joblib.load to memmap the data
  391. return (
  392. (load_temporary_memmap, (filename, self._mmap_mode,
  393. self._unlink_on_gc_collect))
  394. )
  395. else:
  396. # do not convert a into memmap, let pickler do its usual copy with
  397. # the default system pickler
  398. util.debug(
  399. '[ARRAY DUMP] Pickling array (NO MEMMAPPING) (shape={}, '
  400. ' dtype={}).'.format(a.shape, a.dtype))
  401. return (loads, (dumps(a, protocol=HIGHEST_PROTOCOL),))
  402. def get_memmapping_reducers(
  403. forward_reducers=None, backward_reducers=None,
  404. temp_folder_resolver=None, max_nbytes=1e6, mmap_mode='r', verbose=0,
  405. prewarm=False, unlink_on_gc_collect=True, **kwargs):
  406. """Construct a pair of memmapping reducer linked to a tmpdir.
  407. This function manage the creation and the clean up of the temporary folders
  408. underlying the memory maps and should be use to get the reducers necessary
  409. to construct joblib pool or executor.
  410. """
  411. if forward_reducers is None:
  412. forward_reducers = dict()
  413. if backward_reducers is None:
  414. backward_reducers = dict()
  415. if np is not None:
  416. # Register smart numpy.ndarray reducers that detects memmap backed
  417. # arrays and that is also able to dump to memmap large in-memory
  418. # arrays over the max_nbytes threshold
  419. forward_reduce_ndarray = ArrayMemmapForwardReducer(
  420. max_nbytes, temp_folder_resolver, mmap_mode, unlink_on_gc_collect,
  421. verbose, prewarm=prewarm)
  422. forward_reducers[np.ndarray] = forward_reduce_ndarray
  423. forward_reducers[np.memmap] = forward_reduce_ndarray
  424. # Communication from child process to the parent process always
  425. # pickles in-memory numpy.ndarray without dumping them as memmap
  426. # to avoid confusing the caller and make it tricky to collect the
  427. # temporary folder
  428. backward_reducers[np.ndarray] = reduce_array_memmap_backward
  429. backward_reducers[np.memmap] = reduce_array_memmap_backward
  430. return forward_reducers, backward_reducers
  431. class TemporaryResourcesManager(object):
  432. """Stateful object able to manage temporary folder and pickles
  433. It exposes:
  434. - a per-context folder name resolving API that memmap-based reducers will
  435. rely on to know where to pickle the temporary memmaps
  436. - a temporary file/folder management API that internally uses the
  437. resource_tracker.
  438. """
  439. def __init__(self, temp_folder_root=None, context_id=None):
  440. self._current_temp_folder = None
  441. self._temp_folder_root = temp_folder_root
  442. self._use_shared_mem = None
  443. self._cached_temp_folders = dict()
  444. self._id = uuid4().hex
  445. self._finalizers = {}
  446. if context_id is None:
  447. # It would be safer to not assign a default context id (less silent
  448. # bugs), but doing this while maintaining backward compatibility
  449. # with the previous, context-unaware version get_memmaping_executor
  450. # exposes exposes too many low-level details.
  451. context_id = uuid4().hex
  452. self.set_current_context(context_id)
  453. def set_current_context(self, context_id):
  454. self._current_context_id = context_id
  455. self.register_new_context(context_id)
  456. def register_new_context(self, context_id):
  457. # Prepare a sub-folder name specific to a context (usually a unique id
  458. # generated by each instance of the Parallel class). Do not create in
  459. # advance to spare FS write access if no array is to be dumped).
  460. if context_id in self._cached_temp_folders:
  461. return
  462. else:
  463. # During its lifecycle, one Parallel object can have several
  464. # executors associated to it (for instance, if a loky worker raises
  465. # an exception, joblib shutdowns the executor and instantly
  466. # recreates a new one before raising the error - see
  467. # ``ensure_ready``. Because we don't want two executors tied to
  468. # the same Parallel object (and thus the same context id) to
  469. # register/use/delete the same folder, we also add an id specific
  470. # to the current Manager (and thus specific to its associated
  471. # executor) to the folder name.
  472. new_folder_name = (
  473. "joblib_memmapping_folder_{}_{}_{}".format(
  474. os.getpid(), self._id, context_id)
  475. )
  476. new_folder_path, _ = _get_temp_dir(
  477. new_folder_name, self._temp_folder_root
  478. )
  479. self.register_folder_finalizer(new_folder_path, context_id)
  480. self._cached_temp_folders[context_id] = new_folder_path
  481. def resolve_temp_folder_name(self):
  482. """Return a folder name specific to the currently activated context"""
  483. return self._cached_temp_folders[self._current_context_id]
  484. def _unregister_context(self, context_id=None):
  485. if context_id is None:
  486. for context_id in list(self._cached_temp_folders):
  487. self._unregister_context(context_id)
  488. else:
  489. temp_folder = self._cached_temp_folders[context_id]
  490. finalizer = self._finalizers[context_id]
  491. resource_tracker.unregister(temp_folder, "folder")
  492. atexit.unregister(finalizer)
  493. self._cached_temp_folders.pop(context_id)
  494. self._finalizers.pop(context_id)
  495. # resource management API
  496. def register_folder_finalizer(self, pool_subfolder, context_id):
  497. # Register the garbage collector at program exit in case caller forgets
  498. # to call terminate explicitly: note we do not pass any reference to
  499. # ensure that this callback won't prevent garbage collection of
  500. # parallel instance and related file handler resources such as POSIX
  501. # semaphores and pipes
  502. pool_module_name = whichmodule(delete_folder, 'delete_folder')
  503. resource_tracker.register(pool_subfolder, "folder")
  504. def _cleanup():
  505. # In some cases the Python runtime seems to set delete_folder to
  506. # None just before exiting when accessing the delete_folder
  507. # function from the closure namespace. So instead we reimport
  508. # the delete_folder function explicitly.
  509. # https://github.com/joblib/joblib/issues/328
  510. # We cannot just use from 'joblib.pool import delete_folder'
  511. # because joblib should only use relative imports to allow
  512. # easy vendoring.
  513. delete_folder = __import__(
  514. pool_module_name, fromlist=['delete_folder']).delete_folder
  515. try:
  516. delete_folder(pool_subfolder, allow_non_empty=True)
  517. resource_tracker.unregister(pool_subfolder, "folder")
  518. except OSError:
  519. warnings.warn("Failed to delete temporary folder: {}"
  520. .format(pool_subfolder))
  521. self._finalizers[context_id] = atexit.register(_cleanup)
  522. def _unlink_temporary_resources(self, context_id=None):
  523. """Unlink temporary resources created by a process-based pool"""
  524. if context_id is None:
  525. # iterate over a copy of the cache keys because
  526. # unlink_temporary_resources further deletes an entry in this
  527. # cache
  528. for context_id in self._cached_temp_folders.copy():
  529. self._unlink_temporary_resources(context_id)
  530. else:
  531. temp_folder = self._cached_temp_folders[context_id]
  532. if os.path.exists(temp_folder):
  533. for filename in os.listdir(temp_folder):
  534. resource_tracker.maybe_unlink(
  535. os.path.join(temp_folder, filename), "file"
  536. )
  537. self._try_delete_folder(
  538. allow_non_empty=False, context_id=context_id
  539. )
  540. def _unregister_temporary_resources(self, context_id=None):
  541. """Unregister temporary resources created by a process-based pool"""
  542. if context_id is None:
  543. for context_id in self._cached_temp_folders:
  544. self._unregister_temporary_resources(context_id)
  545. else:
  546. temp_folder = self._cached_temp_folders[context_id]
  547. if os.path.exists(temp_folder):
  548. for filename in os.listdir(temp_folder):
  549. resource_tracker.unregister(
  550. os.path.join(temp_folder, filename), "file"
  551. )
  552. def _try_delete_folder(self, allow_non_empty, context_id=None):
  553. if context_id is None:
  554. # ditto
  555. for context_id in self._cached_temp_folders.copy():
  556. self._try_delete_folder(
  557. allow_non_empty=allow_non_empty, context_id=context_id
  558. )
  559. else:
  560. temp_folder = self._cached_temp_folders[context_id]
  561. try:
  562. delete_folder(
  563. temp_folder, allow_non_empty=allow_non_empty
  564. )
  565. # Now that this folder is deleted, we can forget about it
  566. self._unregister_context(context_id)
  567. except OSError:
  568. # Temporary folder cannot be deleted right now. No need to
  569. # handle it though, as this folder will be cleaned up by an
  570. # atexit finalizer registered by the memmapping_reducer.
  571. pass