test_memmapping.py 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162
  1. import os
  2. import mmap
  3. import sys
  4. import platform
  5. import gc
  6. import pickle
  7. import itertools
  8. from time import sleep
  9. import subprocess
  10. import threading
  11. from joblib.test.common import with_numpy, np
  12. from joblib.test.common import setup_autokill
  13. from joblib.test.common import teardown_autokill
  14. from joblib.test.common import with_multiprocessing
  15. from joblib.test.common import with_dev_shm
  16. from joblib.testing import raises, parametrize, skipif, xfail, param
  17. from joblib.backports import make_memmap
  18. from joblib.parallel import Parallel, delayed
  19. from joblib.pool import MemmappingPool
  20. from joblib.executor import _TestingMemmappingExecutor as TestExecutor
  21. from joblib._memmapping_reducer import has_shareable_memory
  22. from joblib._memmapping_reducer import ArrayMemmapForwardReducer
  23. from joblib._memmapping_reducer import _strided_from_memmap
  24. from joblib._memmapping_reducer import _get_temp_dir
  25. from joblib._memmapping_reducer import _WeakArrayKeyMap
  26. from joblib._memmapping_reducer import _get_backing_memmap
  27. import joblib._memmapping_reducer as jmr
  28. def setup_module():
  29. setup_autokill(__name__, timeout=300)
  30. def teardown_module():
  31. teardown_autokill(__name__)
  32. def check_memmap_and_send_back(array):
  33. assert _get_backing_memmap(array) is not None
  34. return array
  35. def check_array(args):
  36. """Dummy helper function to be executed in subprocesses
  37. Check that the provided array has the expected values in the provided
  38. range.
  39. """
  40. data, position, expected = args
  41. np.testing.assert_array_equal(data[position], expected)
  42. def inplace_double(args):
  43. """Dummy helper function to be executed in subprocesses
  44. Check that the input array has the right values in the provided range
  45. and perform an inplace modification to double the values in the range by
  46. two.
  47. """
  48. data, position, expected = args
  49. assert data[position] == expected
  50. data[position] *= 2
  51. np.testing.assert_array_equal(data[position], 2 * expected)
  52. @with_numpy
  53. @with_multiprocessing
  54. def test_memmap_based_array_reducing(tmpdir):
  55. """Check that it is possible to reduce a memmap backed array"""
  56. assert_array_equal = np.testing.assert_array_equal
  57. filename = tmpdir.join('test.mmap').strpath
  58. # Create a file larger than what will be used by a
  59. buffer = np.memmap(filename, dtype=np.float64, shape=500, mode='w+')
  60. # Fill the original buffer with negative markers to detect over of
  61. # underflow in case of test failures
  62. buffer[:] = - 1.0 * np.arange(buffer.shape[0], dtype=buffer.dtype)
  63. buffer.flush()
  64. # Memmap a 2D fortran array on a offseted subsection of the previous
  65. # buffer
  66. a = np.memmap(filename, dtype=np.float64, shape=(3, 5, 4),
  67. mode='r+', order='F', offset=4)
  68. a[:] = np.arange(60).reshape(a.shape)
  69. # Build various views that share the buffer with the original memmap
  70. # b is an memmap sliced view on an memmap instance
  71. b = a[1:-1, 2:-1, 2:4]
  72. # c and d are array views
  73. c = np.asarray(b)
  74. d = c.T
  75. # Array reducer with auto dumping disabled
  76. reducer = ArrayMemmapForwardReducer(None, tmpdir.strpath, 'c', True)
  77. def reconstruct_array_or_memmap(x):
  78. cons, args = reducer(x)
  79. return cons(*args)
  80. # Reconstruct original memmap
  81. a_reconstructed = reconstruct_array_or_memmap(a)
  82. assert has_shareable_memory(a_reconstructed)
  83. assert isinstance(a_reconstructed, np.memmap)
  84. assert_array_equal(a_reconstructed, a)
  85. # Reconstruct strided memmap view
  86. b_reconstructed = reconstruct_array_or_memmap(b)
  87. assert has_shareable_memory(b_reconstructed)
  88. assert_array_equal(b_reconstructed, b)
  89. # Reconstruct arrays views on memmap base
  90. c_reconstructed = reconstruct_array_or_memmap(c)
  91. assert not isinstance(c_reconstructed, np.memmap)
  92. assert has_shareable_memory(c_reconstructed)
  93. assert_array_equal(c_reconstructed, c)
  94. d_reconstructed = reconstruct_array_or_memmap(d)
  95. assert not isinstance(d_reconstructed, np.memmap)
  96. assert has_shareable_memory(d_reconstructed)
  97. assert_array_equal(d_reconstructed, d)
  98. # Test graceful degradation on fake memmap instances with in-memory
  99. # buffers
  100. a3 = a * 3
  101. assert not has_shareable_memory(a3)
  102. a3_reconstructed = reconstruct_array_or_memmap(a3)
  103. assert not has_shareable_memory(a3_reconstructed)
  104. assert not isinstance(a3_reconstructed, np.memmap)
  105. assert_array_equal(a3_reconstructed, a * 3)
  106. # Test graceful degradation on arrays derived from fake memmap instances
  107. b3 = np.asarray(a3)
  108. assert not has_shareable_memory(b3)
  109. b3_reconstructed = reconstruct_array_or_memmap(b3)
  110. assert isinstance(b3_reconstructed, np.ndarray)
  111. assert not has_shareable_memory(b3_reconstructed)
  112. assert_array_equal(b3_reconstructed, b3)
  113. @skipif(sys.platform != "win32",
  114. reason="PermissionError only easily triggerable on Windows")
  115. def test_resource_tracker_retries_when_permissionerror(tmpdir):
  116. # Test resource_tracker retry mechanism when unlinking memmaps. See more
  117. # thorough information in the ``unlink_file`` documentation of joblib.
  118. filename = tmpdir.join('test.mmap').strpath
  119. cmd = """if 1:
  120. import os
  121. import numpy as np
  122. import time
  123. from joblib.externals.loky.backend import resource_tracker
  124. resource_tracker.VERBOSE = 1
  125. # Start the resource tracker
  126. resource_tracker.ensure_running()
  127. time.sleep(1)
  128. # Create a file containing numpy data
  129. memmap = np.memmap(r"{filename}", dtype=np.float64, shape=10, mode='w+')
  130. memmap[:] = np.arange(10).astype(np.int8).data
  131. memmap.flush()
  132. assert os.path.exists(r"{filename}")
  133. del memmap
  134. # Create a np.memmap backed by this file
  135. memmap = np.memmap(r"{filename}", dtype=np.float64, shape=10, mode='w+')
  136. resource_tracker.register(r"{filename}", "file")
  137. # Ask the resource_tracker to delete the file backing the np.memmap , this
  138. # should raise PermissionError that the resource_tracker will log.
  139. resource_tracker.maybe_unlink(r"{filename}", "file")
  140. # Wait for the resource_tracker to process the maybe_unlink before cleaning
  141. # up the memmap
  142. time.sleep(2)
  143. """.format(filename=filename)
  144. p = subprocess.Popen([sys.executable, '-c', cmd], stderr=subprocess.PIPE,
  145. stdout=subprocess.PIPE)
  146. p.wait()
  147. out, err = p.communicate()
  148. assert p.returncode == 0
  149. assert out == b''
  150. msg = 'tried to unlink {}, got PermissionError'.format(filename)
  151. assert msg in err.decode()
  152. @with_numpy
  153. @with_multiprocessing
  154. def test_high_dimension_memmap_array_reducing(tmpdir):
  155. assert_array_equal = np.testing.assert_array_equal
  156. filename = tmpdir.join('test.mmap').strpath
  157. # Create a high dimensional memmap
  158. a = np.memmap(filename, dtype=np.float64, shape=(100, 15, 15, 3),
  159. mode='w+')
  160. a[:] = np.arange(100 * 15 * 15 * 3).reshape(a.shape)
  161. # Create some slices/indices at various dimensions
  162. b = a[0:10]
  163. c = a[:, 5:10]
  164. d = a[:, :, :, 0]
  165. e = a[1:3:4]
  166. # Array reducer with auto dumping disabled
  167. reducer = ArrayMemmapForwardReducer(None, tmpdir.strpath, 'c', True)
  168. def reconstruct_array_or_memmap(x):
  169. cons, args = reducer(x)
  170. return cons(*args)
  171. a_reconstructed = reconstruct_array_or_memmap(a)
  172. assert has_shareable_memory(a_reconstructed)
  173. assert isinstance(a_reconstructed, np.memmap)
  174. assert_array_equal(a_reconstructed, a)
  175. b_reconstructed = reconstruct_array_or_memmap(b)
  176. assert has_shareable_memory(b_reconstructed)
  177. assert_array_equal(b_reconstructed, b)
  178. c_reconstructed = reconstruct_array_or_memmap(c)
  179. assert has_shareable_memory(c_reconstructed)
  180. assert_array_equal(c_reconstructed, c)
  181. d_reconstructed = reconstruct_array_or_memmap(d)
  182. assert has_shareable_memory(d_reconstructed)
  183. assert_array_equal(d_reconstructed, d)
  184. e_reconstructed = reconstruct_array_or_memmap(e)
  185. assert has_shareable_memory(e_reconstructed)
  186. assert_array_equal(e_reconstructed, e)
  187. @with_numpy
  188. def test__strided_from_memmap(tmpdir):
  189. fname = tmpdir.join('test.mmap').strpath
  190. size = 5 * mmap.ALLOCATIONGRANULARITY
  191. offset = mmap.ALLOCATIONGRANULARITY + 1
  192. # This line creates the mmap file that is reused later
  193. memmap_obj = np.memmap(fname, mode='w+', shape=size + offset)
  194. # filename, dtype, mode, offset, order, shape, strides, total_buffer_len
  195. memmap_obj = _strided_from_memmap(fname, dtype='uint8', mode='r',
  196. offset=offset, order='C', shape=size,
  197. strides=None, total_buffer_len=None,
  198. unlink_on_gc_collect=False)
  199. assert isinstance(memmap_obj, np.memmap)
  200. assert memmap_obj.offset == offset
  201. memmap_backed_obj = _strided_from_memmap(
  202. fname, dtype='uint8', mode='r', offset=offset, order='C',
  203. shape=(size // 2,), strides=(2,), total_buffer_len=size,
  204. unlink_on_gc_collect=False
  205. )
  206. assert _get_backing_memmap(memmap_backed_obj).offset == offset
  207. @with_numpy
  208. @with_multiprocessing
  209. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  210. ids=["multiprocessing", "loky"])
  211. def test_pool_with_memmap(factory, tmpdir):
  212. """Check that subprocess can access and update shared memory memmap"""
  213. assert_array_equal = np.testing.assert_array_equal
  214. # Fork the subprocess before allocating the objects to be passed
  215. pool_temp_folder = tmpdir.mkdir('pool').strpath
  216. p = factory(10, max_nbytes=2, temp_folder=pool_temp_folder)
  217. try:
  218. filename = tmpdir.join('test.mmap').strpath
  219. a = np.memmap(filename, dtype=np.float32, shape=(3, 5), mode='w+')
  220. a.fill(1.0)
  221. p.map(inplace_double, [(a, (i, j), 1.0)
  222. for i in range(a.shape[0])
  223. for j in range(a.shape[1])])
  224. assert_array_equal(a, 2 * np.ones(a.shape))
  225. # Open a copy-on-write view on the previous data
  226. b = np.memmap(filename, dtype=np.float32, shape=(5, 3), mode='c')
  227. p.map(inplace_double, [(b, (i, j), 2.0)
  228. for i in range(b.shape[0])
  229. for j in range(b.shape[1])])
  230. # Passing memmap instances to the pool should not trigger the creation
  231. # of new files on the FS
  232. assert os.listdir(pool_temp_folder) == []
  233. # the original data is untouched
  234. assert_array_equal(a, 2 * np.ones(a.shape))
  235. assert_array_equal(b, 2 * np.ones(b.shape))
  236. # readonly maps can be read but not updated
  237. c = np.memmap(filename, dtype=np.float32, shape=(10,), mode='r',
  238. offset=5 * 4)
  239. with raises(AssertionError):
  240. p.map(check_array, [(c, i, 3.0) for i in range(c.shape[0])])
  241. # depending on the version of numpy one can either get a RuntimeError
  242. # or a ValueError
  243. with raises((RuntimeError, ValueError)):
  244. p.map(inplace_double, [(c, i, 2.0) for i in range(c.shape[0])])
  245. finally:
  246. # Clean all filehandlers held by the pool
  247. p.terminate()
  248. del p
  249. @with_numpy
  250. @with_multiprocessing
  251. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  252. ids=["multiprocessing", "loky"])
  253. def test_pool_with_memmap_array_view(factory, tmpdir):
  254. """Check that subprocess can access and update shared memory array"""
  255. assert_array_equal = np.testing.assert_array_equal
  256. # Fork the subprocess before allocating the objects to be passed
  257. pool_temp_folder = tmpdir.mkdir('pool').strpath
  258. p = factory(10, max_nbytes=2, temp_folder=pool_temp_folder)
  259. try:
  260. filename = tmpdir.join('test.mmap').strpath
  261. a = np.memmap(filename, dtype=np.float32, shape=(3, 5), mode='w+')
  262. a.fill(1.0)
  263. # Create an ndarray view on the memmap instance
  264. a_view = np.asarray(a)
  265. assert not isinstance(a_view, np.memmap)
  266. assert has_shareable_memory(a_view)
  267. p.map(inplace_double, [(a_view, (i, j), 1.0)
  268. for i in range(a.shape[0])
  269. for j in range(a.shape[1])])
  270. # Both a and the a_view have been updated
  271. assert_array_equal(a, 2 * np.ones(a.shape))
  272. assert_array_equal(a_view, 2 * np.ones(a.shape))
  273. # Passing memmap array view to the pool should not trigger the
  274. # creation of new files on the FS
  275. assert os.listdir(pool_temp_folder) == []
  276. finally:
  277. p.terminate()
  278. del p
  279. @with_numpy
  280. @parametrize("backend", ["multiprocessing", "loky"])
  281. def test_permission_error_windows_reference_cycle(backend):
  282. # Non regression test for:
  283. # https://github.com/joblib/joblib/issues/806
  284. #
  285. # The issue happens when trying to delete a memory mapped file that has
  286. # not yet been closed by one of the worker processes.
  287. cmd = """if 1:
  288. import numpy as np
  289. from joblib import Parallel, delayed
  290. data = np.random.rand(int(2e6)).reshape((int(1e6), 2))
  291. # Build a complex cyclic reference that is likely to delay garbage
  292. # collection of the memmapped array in the worker processes.
  293. first_list = current_list = [data]
  294. for i in range(10):
  295. current_list = [current_list]
  296. first_list.append(current_list)
  297. if __name__ == "__main__":
  298. results = Parallel(n_jobs=2, backend="{b}")(
  299. delayed(len)(current_list) for i in range(10))
  300. assert results == [1] * 10
  301. """.format(b=backend)
  302. p = subprocess.Popen([sys.executable, '-c', cmd], stderr=subprocess.PIPE,
  303. stdout=subprocess.PIPE)
  304. p.wait()
  305. out, err = p.communicate()
  306. assert p.returncode == 0, out.decode() + "\n\n" + err.decode()
  307. @with_numpy
  308. @parametrize("backend", ["multiprocessing", "loky"])
  309. def test_permission_error_windows_memmap_sent_to_parent(backend):
  310. # Second non-regression test for:
  311. # https://github.com/joblib/joblib/issues/806
  312. # previously, child process would not convert temporary memmaps to numpy
  313. # arrays when sending the data back to the parent process. This would lead
  314. # to permission errors on windows when deleting joblib's temporary folder,
  315. # as the memmaped files handles would still opened in the parent process.
  316. cmd = '''if 1:
  317. import os
  318. import time
  319. import numpy as np
  320. from joblib import Parallel, delayed
  321. from testutils import return_slice_of_data
  322. data = np.ones(int(2e6))
  323. if __name__ == '__main__':
  324. # warm-up call to launch the workers and start the resource_tracker
  325. _ = Parallel(n_jobs=2, verbose=5, backend='{b}')(
  326. delayed(id)(i) for i in range(20))
  327. time.sleep(0.5)
  328. slice_of_data = Parallel(n_jobs=2, verbose=5, backend='{b}')(
  329. delayed(return_slice_of_data)(data, 0, 20) for _ in range(10))
  330. '''.format(b=backend)
  331. for _ in range(3):
  332. env = os.environ.copy()
  333. env['PYTHONPATH'] = os.path.dirname(__file__)
  334. p = subprocess.Popen([sys.executable, '-c', cmd],
  335. stderr=subprocess.PIPE,
  336. stdout=subprocess.PIPE, env=env)
  337. p.wait()
  338. out, err = p.communicate()
  339. assert p.returncode == 0, err
  340. assert out == b''
  341. if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]:
  342. # In early versions of Python 3.8, a reference leak
  343. # https://github.com/cloudpipe/cloudpickle/issues/327, holds
  344. # references to pickled objects, generating race condition during
  345. # cleanup finalizers of joblib and noisy resource_tracker outputs.
  346. assert b'resource_tracker' not in err
  347. @with_numpy
  348. @with_multiprocessing
  349. @parametrize("backend", ["multiprocessing", "loky"])
  350. def test_parallel_isolated_temp_folders(backend):
  351. # Test that consecutive Parallel call use isolated subfolders, even
  352. # for the loky backend that reuses its executor instance across calls.
  353. array = np.arange(int(1e2))
  354. [filename_1] = Parallel(n_jobs=2, backend=backend, max_nbytes=10)(
  355. delayed(getattr)(array, 'filename') for _ in range(1)
  356. )
  357. [filename_2] = Parallel(n_jobs=2, backend=backend, max_nbytes=10)(
  358. delayed(getattr)(array, 'filename') for _ in range(1)
  359. )
  360. assert os.path.dirname(filename_2) != os.path.dirname(filename_1)
  361. @with_numpy
  362. @with_multiprocessing
  363. @parametrize("backend", ["multiprocessing", "loky"])
  364. def test_managed_backend_reuse_temp_folder(backend):
  365. # Test that calls to a managed parallel object reuse the same memmaps.
  366. array = np.arange(int(1e2))
  367. with Parallel(n_jobs=2, backend=backend, max_nbytes=10) as p:
  368. [filename_1] = p(
  369. delayed(getattr)(array, 'filename') for _ in range(1)
  370. )
  371. [filename_2] = p(
  372. delayed(getattr)(array, 'filename') for _ in range(1)
  373. )
  374. assert os.path.dirname(filename_2) == os.path.dirname(filename_1)
  375. @with_numpy
  376. @with_multiprocessing
  377. def test_memmapping_temp_folder_thread_safety():
  378. # Concurrent calls to Parallel with the loky backend will use the same
  379. # executor, and thus the same reducers. Make sure that those reducers use
  380. # different temporary folders depending on which Parallel objects called
  381. # them, which is necessary to limit potential race conditions during the
  382. # garbage collection of temporary memmaps.
  383. array = np.arange(int(1e2))
  384. temp_dirs_thread_1 = set()
  385. temp_dirs_thread_2 = set()
  386. def concurrent_get_filename(array, temp_dirs):
  387. with Parallel(backend='loky', n_jobs=2, max_nbytes=10) as p:
  388. for i in range(10):
  389. [filename] = p(
  390. delayed(getattr)(array, 'filename') for _ in range(1)
  391. )
  392. temp_dirs.add(os.path.dirname(filename))
  393. t1 = threading.Thread(
  394. target=concurrent_get_filename, args=(array, temp_dirs_thread_1)
  395. )
  396. t2 = threading.Thread(
  397. target=concurrent_get_filename, args=(array, temp_dirs_thread_2)
  398. )
  399. t1.start()
  400. t2.start()
  401. t1.join()
  402. t2.join()
  403. assert len(temp_dirs_thread_1) == 1
  404. assert len(temp_dirs_thread_2) == 1
  405. assert temp_dirs_thread_1 != temp_dirs_thread_2
  406. @with_numpy
  407. @with_multiprocessing
  408. def test_multithreaded_parallel_termination_resource_tracker_silent():
  409. # test that concurrent termination attempts of a same executor does not
  410. # emit any spurious error from the resource_tracker. We test various
  411. # situations making 0, 1 or both parallel call sending a task that will
  412. # make the worker (and thus the whole Parallel call) error out.
  413. cmd = '''if 1:
  414. import os
  415. import numpy as np
  416. from joblib import Parallel, delayed
  417. from joblib.externals.loky.backend import resource_tracker
  418. from concurrent.futures import ThreadPoolExecutor, wait
  419. resource_tracker.VERBOSE = 0
  420. array = np.arange(int(1e2))
  421. temp_dirs_thread_1 = set()
  422. temp_dirs_thread_2 = set()
  423. def raise_error(array):
  424. raise ValueError
  425. def parallel_get_filename(array, temp_dirs):
  426. with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p:
  427. for i in range(10):
  428. [filename] = p(
  429. delayed(getattr)(array, "filename") for _ in range(1)
  430. )
  431. temp_dirs.add(os.path.dirname(filename))
  432. def parallel_raise(array, temp_dirs):
  433. with Parallel(backend="loky", n_jobs=2, max_nbytes=10) as p:
  434. for i in range(10):
  435. [filename] = p(
  436. delayed(raise_error)(array) for _ in range(1)
  437. )
  438. temp_dirs.add(os.path.dirname(filename))
  439. executor = ThreadPoolExecutor(max_workers=2)
  440. # both function calls will use the same loky executor, but with a
  441. # different Parallel object.
  442. future_1 = executor.submit({f1}, array, temp_dirs_thread_1)
  443. future_2 = executor.submit({f2}, array, temp_dirs_thread_2)
  444. # Wait for both threads to terminate their backend
  445. wait([future_1, future_2])
  446. future_1.result()
  447. future_2.result()
  448. '''
  449. functions_and_returncodes = [
  450. ("parallel_get_filename", "parallel_get_filename", 0),
  451. ("parallel_get_filename", "parallel_raise", 1),
  452. ("parallel_raise", "parallel_raise", 1)
  453. ]
  454. for f1, f2, returncode in functions_and_returncodes:
  455. p = subprocess.Popen([sys.executable, '-c', cmd.format(f1=f1, f2=f2)],
  456. stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  457. p.wait()
  458. out, err = p.communicate()
  459. assert p.returncode == returncode, out.decode()
  460. assert b"resource_tracker" not in err, err.decode()
  461. @with_numpy
  462. @with_multiprocessing
  463. def test_nested_loop_error_in_grandchild_resource_tracker_silent():
  464. # Safety smoke test: test that nested parallel calls using the loky backend
  465. # don't yield noisy resource_tracker outputs when the grandchild errors
  466. # out.
  467. cmd = '''if 1:
  468. from joblib import Parallel, delayed
  469. def raise_error(i):
  470. raise ValueError
  471. def nested_loop(f):
  472. Parallel(backend="loky", n_jobs=2)(
  473. delayed(f)(i) for i in range(10)
  474. )
  475. if __name__ == "__main__":
  476. Parallel(backend="loky", n_jobs=2)(
  477. delayed(nested_loop)(func) for func in [raise_error]
  478. )
  479. '''
  480. p = subprocess.Popen([sys.executable, '-c', cmd],
  481. stderr=subprocess.PIPE, stdout=subprocess.PIPE)
  482. p.wait()
  483. out, err = p.communicate()
  484. assert p.returncode == 1, out.decode()
  485. assert b"resource_tracker" not in err, err.decode()
  486. @with_numpy
  487. @with_multiprocessing
  488. @parametrize("backend", ["multiprocessing", "loky"])
  489. def test_many_parallel_calls_on_same_object(backend):
  490. # After #966 got merged, consecutive Parallel objects were sharing temp
  491. # folder, which would lead to race conditions happening during the
  492. # temporary resources management with the resource_tracker. This is a
  493. # non-regression test that makes sure that consecutive Parallel operations
  494. # on the same object do not error out.
  495. cmd = '''if 1:
  496. import os
  497. import time
  498. import numpy as np
  499. from joblib import Parallel, delayed
  500. from testutils import return_slice_of_data
  501. data = np.ones(100)
  502. if __name__ == '__main__':
  503. for i in range(5):
  504. slice_of_data = Parallel(
  505. n_jobs=2, max_nbytes=1, backend='{b}')(
  506. delayed(return_slice_of_data)(data, 0, 20)
  507. for _ in range(10)
  508. )
  509. slice_of_data = Parallel(
  510. n_jobs=2, max_nbytes=1, backend='{b}')(
  511. delayed(return_slice_of_data)(data, 0, 20)
  512. for _ in range(10)
  513. )
  514. '''.format(b=backend)
  515. for _ in range(3):
  516. env = os.environ.copy()
  517. env['PYTHONPATH'] = os.path.dirname(__file__)
  518. p = subprocess.Popen([sys.executable, '-c', cmd],
  519. stderr=subprocess.PIPE,
  520. stdout=subprocess.PIPE, env=env)
  521. p.wait()
  522. out, err = p.communicate()
  523. assert p.returncode == 0, err
  524. assert out == b''
  525. if sys.version_info[:3] not in [(3, 8, 0), (3, 8, 1)]:
  526. # In early versions of Python 3.8, a reference leak
  527. # https://github.com/cloudpipe/cloudpickle/issues/327, holds
  528. # references to pickled objects, generating race condition during
  529. # cleanup finalizers of joblib and noisy resource_tracker outputs.
  530. assert b'resource_tracker' not in err
  531. @with_numpy
  532. @with_multiprocessing
  533. @parametrize("backend", ["multiprocessing", "loky"])
  534. def test_memmap_returned_as_regular_array(backend):
  535. data = np.ones(int(1e3))
  536. # Check that child processes send temporary memmaps back as numpy arrays.
  537. [result] = Parallel(n_jobs=2, backend=backend, max_nbytes=100)(
  538. delayed(check_memmap_and_send_back)(data) for _ in range(1))
  539. assert _get_backing_memmap(result) is None
  540. @with_numpy
  541. @with_multiprocessing
  542. @parametrize("backend", ["multiprocessing", param("loky", marks=xfail)])
  543. def test_resource_tracker_silent_when_reference_cycles(backend):
  544. # There is a variety of reasons that can make joblib with loky backend
  545. # output noisy warnings when a reference cycle is preventing a memmap from
  546. # being garbage collected. Especially, joblib's main process finalizer
  547. # deletes the temporary folder if it was not done before, which can
  548. # interact badly with the resource_tracker. We don't risk leaking any
  549. # resources, but this will likely make joblib output a lot of low-level
  550. # confusing messages. This test is marked as xfail for now: but a next PR
  551. # should fix this behavior.
  552. # Note that the script in ``cmd`` is the exact same script as in
  553. # test_permission_error_windows_reference_cycle.
  554. cmd = """if 1:
  555. import numpy as np
  556. from joblib import Parallel, delayed
  557. data = np.random.rand(int(2e6)).reshape((int(1e6), 2))
  558. # Build a complex cyclic reference that is likely to delay garbage
  559. # collection of the memmapped array in the worker processes.
  560. first_list = current_list = [data]
  561. for i in range(10):
  562. current_list = [current_list]
  563. first_list.append(current_list)
  564. if __name__ == "__main__":
  565. results = Parallel(n_jobs=2, backend="{b}")(
  566. delayed(len)(current_list) for i in range(10))
  567. assert results == [1] * 10
  568. """.format(b=backend)
  569. p = subprocess.Popen([sys.executable, '-c', cmd], stderr=subprocess.PIPE,
  570. stdout=subprocess.PIPE)
  571. p.wait()
  572. out, err = p.communicate()
  573. assert p.returncode == 0, out.decode()
  574. assert b"resource_tracker" not in err, err.decode()
  575. @with_numpy
  576. @with_multiprocessing
  577. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  578. ids=["multiprocessing", "loky"])
  579. def test_memmapping_pool_for_large_arrays(factory, tmpdir):
  580. """Check that large arrays are not copied in memory"""
  581. # Check that the tempfolder is empty
  582. assert os.listdir(tmpdir.strpath) == []
  583. # Build an array reducers that automaticaly dump large array content
  584. # to filesystem backed memmap instances to avoid memory explosion
  585. p = factory(3, max_nbytes=40, temp_folder=tmpdir.strpath, verbose=2)
  586. try:
  587. # The temporary folder for the pool is not provisioned in advance
  588. assert os.listdir(tmpdir.strpath) == []
  589. assert not os.path.exists(p._temp_folder)
  590. small = np.ones(5, dtype=np.float32)
  591. assert small.nbytes == 20
  592. p.map(check_array, [(small, i, 1.0) for i in range(small.shape[0])])
  593. # Memory has been copied, the pool filesystem folder is unused
  594. assert os.listdir(tmpdir.strpath) == []
  595. # Try with a file larger than the memmap threshold of 40 bytes
  596. large = np.ones(100, dtype=np.float64)
  597. assert large.nbytes == 800
  598. p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])])
  599. # The data has been dumped in a temp folder for subprocess to share it
  600. # without per-child memory copies
  601. assert os.path.isdir(p._temp_folder)
  602. dumped_filenames = os.listdir(p._temp_folder)
  603. assert len(dumped_filenames) == 1
  604. # Check that memory mapping is not triggered for arrays with
  605. # dtype='object'
  606. objects = np.array(['abc'] * 100, dtype='object')
  607. results = p.map(has_shareable_memory, [objects])
  608. assert not results[0]
  609. finally:
  610. # check FS garbage upon pool termination
  611. p.terminate()
  612. for i in range(10):
  613. sleep(.1)
  614. if not os.path.exists(p._temp_folder):
  615. break
  616. else: # pragma: no cover
  617. raise AssertionError(
  618. 'temporary folder of {} was not deleted'.format(p)
  619. )
  620. del p
  621. @with_numpy
  622. @with_multiprocessing
  623. @parametrize("backend", ["multiprocessing", "loky"])
  624. def test_child_raises_parent_exits_cleanly(backend):
  625. # When a task executed by a child process raises an error, the parent
  626. # process's backend is notified, and calls abort_everything.
  627. # In loky, abort_everything itself calls shutdown(kill_workers=True) which
  628. # sends SIGKILL to the worker, preventing it from running the finalizers
  629. # supposed to signal the resource_tracker when the worker is done using
  630. # objects relying on a shared resource (e.g np.memmaps). Because this
  631. # behavior is prone to :
  632. # - cause a resource leak
  633. # - make the resource tracker emit noisy resource warnings
  634. # we explicitly test that, when the said situation occurs:
  635. # - no resources are actually leaked
  636. # - the temporary resources are deleted as soon as possible (typically, at
  637. # the end of the failing Parallel call)
  638. # - the resource_tracker does not emit any warnings.
  639. cmd = """if 1:
  640. import os
  641. import numpy as np
  642. from joblib import Parallel, delayed
  643. from testutils import print_filename_and_raise
  644. data = np.random.rand(1000)
  645. def get_temp_folder(parallel_obj, backend):
  646. if "{b}" == "loky":
  647. return p._backend._workers._temp_folder
  648. else:
  649. return p._backend._pool._temp_folder
  650. if __name__ == "__main__":
  651. try:
  652. with Parallel(n_jobs=2, backend="{b}", max_nbytes=100) as p:
  653. temp_folder = get_temp_folder(p, "{b}")
  654. p(delayed(print_filename_and_raise)(data)
  655. for i in range(1))
  656. except ValueError:
  657. # the temporary folder should be deleted by the end of this
  658. # call
  659. assert not os.path.exists(temp_folder)
  660. """.format(b=backend)
  661. env = os.environ.copy()
  662. env['PYTHONPATH'] = os.path.dirname(__file__)
  663. p = subprocess.Popen([sys.executable, '-c', cmd], stderr=subprocess.PIPE,
  664. stdout=subprocess.PIPE, env=env)
  665. p.wait()
  666. out, err = p.communicate()
  667. out, err = out.decode(), err.decode()
  668. filename = out.split('\n')[0]
  669. assert p.returncode == 0, out
  670. assert err == '' # no resource_tracker warnings.
  671. assert not os.path.exists(filename)
  672. @with_numpy
  673. @with_multiprocessing
  674. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  675. ids=["multiprocessing", "loky"])
  676. def test_memmapping_pool_for_large_arrays_disabled(factory, tmpdir):
  677. """Check that large arrays memmapping can be disabled"""
  678. # Set max_nbytes to None to disable the auto memmapping feature
  679. p = factory(3, max_nbytes=None, temp_folder=tmpdir.strpath)
  680. try:
  681. # Check that the tempfolder is empty
  682. assert os.listdir(tmpdir.strpath) == []
  683. # Try with a file largish than the memmap threshold of 40 bytes
  684. large = np.ones(100, dtype=np.float64)
  685. assert large.nbytes == 800
  686. p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])])
  687. # Check that the tempfolder is still empty
  688. assert os.listdir(tmpdir.strpath) == []
  689. finally:
  690. # Cleanup open file descriptors
  691. p.terminate()
  692. del p
  693. @with_numpy
  694. @with_multiprocessing
  695. @with_dev_shm
  696. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  697. ids=["multiprocessing", "loky"])
  698. def test_memmapping_on_large_enough_dev_shm(factory):
  699. """Check that memmapping uses /dev/shm when possible"""
  700. orig_size = jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE
  701. try:
  702. # Make joblib believe that it can use /dev/shm even when running on a
  703. # CI container where the size of the /dev/shm is not very large (that
  704. # is at least 32 MB instead of 2 GB by default).
  705. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(32e6)
  706. p = factory(3, max_nbytes=10)
  707. try:
  708. # Check that the pool has correctly detected the presence of the
  709. # shared memory filesystem.
  710. pool_temp_folder = p._temp_folder
  711. folder_prefix = '/dev/shm/joblib_memmapping_folder_'
  712. assert pool_temp_folder.startswith(folder_prefix)
  713. assert os.path.exists(pool_temp_folder)
  714. # Try with a file larger than the memmap threshold of 10 bytes
  715. a = np.ones(100, dtype=np.float64)
  716. assert a.nbytes == 800
  717. p.map(id, [a] * 10)
  718. # a should have been memmapped to the pool temp folder: the joblib
  719. # pickling procedure generate one .pkl file:
  720. assert len(os.listdir(pool_temp_folder)) == 1
  721. # create a new array with content that is different from 'a' so
  722. # that it is mapped to a different file in the temporary folder of
  723. # the pool.
  724. b = np.ones(100, dtype=np.float64) * 2
  725. assert b.nbytes == 800
  726. p.map(id, [b] * 10)
  727. # A copy of both a and b are now stored in the shared memory folder
  728. assert len(os.listdir(pool_temp_folder)) == 2
  729. finally:
  730. # Cleanup open file descriptors
  731. p.terminate()
  732. del p
  733. for i in range(100):
  734. # The temp folder is cleaned up upon pool termination
  735. if not os.path.exists(pool_temp_folder):
  736. break
  737. sleep(.1)
  738. else: # pragma: no cover
  739. raise AssertionError('temporary folder of pool was not deleted')
  740. finally:
  741. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = orig_size
  742. @with_numpy
  743. @with_multiprocessing
  744. @with_dev_shm
  745. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  746. ids=["multiprocessing", "loky"])
  747. def test_memmapping_on_too_small_dev_shm(factory):
  748. orig_size = jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE
  749. try:
  750. # Make joblib believe that it cannot use /dev/shm unless there is
  751. # 42 exabytes of available shared memory in /dev/shm
  752. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = int(42e18)
  753. p = factory(3, max_nbytes=10)
  754. try:
  755. # Check that the pool has correctly detected the presence of the
  756. # shared memory filesystem.
  757. pool_temp_folder = p._temp_folder
  758. assert not pool_temp_folder.startswith('/dev/shm')
  759. finally:
  760. # Cleanup open file descriptors
  761. p.terminate()
  762. del p
  763. # The temp folder is cleaned up upon pool termination
  764. assert not os.path.exists(pool_temp_folder)
  765. finally:
  766. jmr.SYSTEM_SHARED_MEM_FS_MIN_SIZE = orig_size
  767. @with_numpy
  768. @with_multiprocessing
  769. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  770. ids=["multiprocessing", "loky"])
  771. def test_memmapping_pool_for_large_arrays_in_return(factory, tmpdir):
  772. """Check that large arrays are not copied in memory in return"""
  773. assert_array_equal = np.testing.assert_array_equal
  774. # Build an array reducers that automaticaly dump large array content
  775. # but check that the returned datastructure are regular arrays to avoid
  776. # passing a memmap array pointing to a pool controlled temp folder that
  777. # might be confusing to the user
  778. # The MemmappingPool user can always return numpy.memmap object explicitly
  779. # to avoid memory copy
  780. p = factory(3, max_nbytes=10, temp_folder=tmpdir.strpath)
  781. try:
  782. res = p.apply_async(np.ones, args=(1000,))
  783. large = res.get()
  784. assert not has_shareable_memory(large)
  785. assert_array_equal(large, np.ones(1000))
  786. finally:
  787. p.terminate()
  788. del p
  789. def _worker_multiply(a, n_times):
  790. """Multiplication function to be executed by subprocess"""
  791. assert has_shareable_memory(a)
  792. return a * n_times
  793. @with_numpy
  794. @with_multiprocessing
  795. @parametrize("factory", [MemmappingPool, TestExecutor.get_memmapping_executor],
  796. ids=["multiprocessing", "loky"])
  797. def test_workaround_against_bad_memmap_with_copied_buffers(factory, tmpdir):
  798. """Check that memmaps with a bad buffer are returned as regular arrays
  799. Unary operations and ufuncs on memmap instances return a new memmap
  800. instance with an in-memory buffer (probably a numpy bug).
  801. """
  802. assert_array_equal = np.testing.assert_array_equal
  803. p = factory(3, max_nbytes=10, temp_folder=tmpdir.strpath)
  804. try:
  805. # Send a complex, large-ish view on a array that will be converted to
  806. # a memmap in the worker process
  807. a = np.asarray(np.arange(6000).reshape((1000, 2, 3)),
  808. order='F')[:, :1, :]
  809. # Call a non-inplace multiply operation on the worker and memmap and
  810. # send it back to the parent.
  811. b = p.apply_async(_worker_multiply, args=(a, 3)).get()
  812. assert not has_shareable_memory(b)
  813. assert_array_equal(b, 3 * a)
  814. finally:
  815. p.terminate()
  816. del p
  817. def identity(arg):
  818. return arg
  819. @with_numpy
  820. @with_multiprocessing
  821. @parametrize(
  822. "factory,retry_no",
  823. list(itertools.product(
  824. [MemmappingPool, TestExecutor.get_memmapping_executor], range(3))),
  825. ids=['{}, {}'.format(x, y) for x, y in itertools.product(
  826. ["multiprocessing", "loky"], map(str, range(3)))])
  827. def test_pool_memmap_with_big_offset(factory, retry_no, tmpdir):
  828. # Test that numpy memmap offset is set correctly if greater than
  829. # mmap.ALLOCATIONGRANULARITY, see
  830. # https://github.com/joblib/joblib/issues/451 and
  831. # https://github.com/numpy/numpy/pull/8443 for more details.
  832. fname = tmpdir.join('test.mmap').strpath
  833. size = 5 * mmap.ALLOCATIONGRANULARITY
  834. offset = mmap.ALLOCATIONGRANULARITY + 1
  835. obj = make_memmap(fname, mode='w+', shape=size, dtype='uint8',
  836. offset=offset)
  837. p = factory(2, temp_folder=tmpdir.strpath)
  838. result = p.apply_async(identity, args=(obj,)).get()
  839. assert isinstance(result, np.memmap)
  840. assert result.offset == offset
  841. np.testing.assert_array_equal(obj, result)
  842. p.terminate()
  843. def test_pool_get_temp_dir(tmpdir):
  844. pool_folder_name = 'test.tmpdir'
  845. pool_folder, shared_mem = _get_temp_dir(pool_folder_name, tmpdir.strpath)
  846. assert shared_mem is False
  847. assert pool_folder == tmpdir.join('test.tmpdir').strpath
  848. pool_folder, shared_mem = _get_temp_dir(pool_folder_name, temp_folder=None)
  849. if sys.platform.startswith('win'):
  850. assert shared_mem is False
  851. assert pool_folder.endswith(pool_folder_name)
  852. @with_numpy
  853. @skipif(sys.platform == 'win32', reason='This test fails with a '
  854. 'PermissionError on Windows')
  855. @parametrize("mmap_mode", ["r+", "w+"])
  856. def test_numpy_arrays_use_different_memory(mmap_mode):
  857. def func(arr, value):
  858. arr[:] = value
  859. return arr
  860. arrays = [np.zeros((10, 10), dtype='float64') for i in range(10)]
  861. results = Parallel(mmap_mode=mmap_mode, max_nbytes=0, n_jobs=2)(
  862. delayed(func)(arr, i) for i, arr in enumerate(arrays))
  863. for i, arr in enumerate(results):
  864. np.testing.assert_array_equal(arr, i)
  865. @with_numpy
  866. def test_weak_array_key_map():
  867. def assert_empty_after_gc_collect(container, retries=100):
  868. for i in range(retries):
  869. if len(container) == 0:
  870. return
  871. gc.collect()
  872. sleep(.1)
  873. assert len(container) == 0
  874. a = np.ones(42)
  875. m = _WeakArrayKeyMap()
  876. m.set(a, 'a')
  877. assert m.get(a) == 'a'
  878. b = a
  879. assert m.get(b) == 'a'
  880. m.set(b, 'b')
  881. assert m.get(a) == 'b'
  882. del a
  883. gc.collect()
  884. assert len(m._data) == 1
  885. assert m.get(b) == 'b'
  886. del b
  887. assert_empty_after_gc_collect(m._data)
  888. c = np.ones(42)
  889. m.set(c, 'c')
  890. assert len(m._data) == 1
  891. assert m.get(c) == 'c'
  892. with raises(KeyError):
  893. m.get(np.ones(42))
  894. del c
  895. assert_empty_after_gc_collect(m._data)
  896. # Check that creating and dropping numpy arrays with potentially the same
  897. # object id will not cause the map to get confused.
  898. def get_set_get_collect(m, i):
  899. a = np.ones(42)
  900. with raises(KeyError):
  901. m.get(a)
  902. m.set(a, i)
  903. assert m.get(a) == i
  904. return id(a)
  905. unique_ids = set([get_set_get_collect(m, i) for i in range(1000)])
  906. if platform.python_implementation() == 'CPython':
  907. # On CPython (at least) the same id is often reused many times for the
  908. # temporary arrays created under the local scope of the
  909. # get_set_get_collect function without causing any spurious lookups /
  910. # insertions in the map.
  911. assert len(unique_ids) < 100
  912. def test_weak_array_key_map_no_pickling():
  913. m = _WeakArrayKeyMap()
  914. with raises(pickle.PicklingError):
  915. pickle.dumps(m)
  916. @with_numpy
  917. @with_multiprocessing
  918. def test_direct_mmap(tmpdir):
  919. testfile = str(tmpdir.join('arr.dat'))
  920. a = np.arange(10, dtype='uint8')
  921. a.tofile(testfile)
  922. def _read_array():
  923. with open(testfile) as fd:
  924. mm = mmap.mmap(fd.fileno(), 0, access=mmap.ACCESS_READ, offset=0)
  925. return np.ndarray((10,), dtype=np.uint8, buffer=mm, offset=0)
  926. def func(x):
  927. return x**2
  928. arr = _read_array()
  929. # this is expected to work and gives the reference
  930. ref = Parallel(n_jobs=2)(delayed(func)(x) for x in [a])
  931. # now test that it work with the mmap array
  932. results = Parallel(n_jobs=2)(delayed(func)(x) for x in [arr])
  933. np.testing.assert_array_equal(results, ref)
  934. # also test with a mmap array read in the subprocess
  935. def worker():
  936. return _read_array()
  937. results = Parallel(n_jobs=2)(delayed(worker)() for _ in range(1))
  938. np.testing.assert_array_equal(results[0], arr)