test_store_backends.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. try:
  2. # Python 2.7: use the C pickle to speed up
  3. # test_concurrency_safe_write which pickles big python objects
  4. import cPickle as cpickle
  5. except ImportError:
  6. import pickle as cpickle
  7. import functools
  8. import time
  9. from joblib.testing import parametrize, timeout
  10. from joblib.test.common import with_multiprocessing
  11. from joblib.backports import concurrency_safe_rename
  12. from joblib import Parallel, delayed
  13. from joblib._store_backends import concurrency_safe_write
  14. def write_func(output, filename):
  15. with open(filename, 'wb') as f:
  16. cpickle.dump(output, f)
  17. def load_func(expected, filename):
  18. for i in range(10):
  19. try:
  20. with open(filename, 'rb') as f:
  21. reloaded = cpickle.load(f)
  22. break
  23. except (OSError, IOError):
  24. # On Windows you can have WindowsError ([Error 5] Access
  25. # is denied or [Error 13] Permission denied) when reading the file,
  26. # probably because a writer process has a lock on the file
  27. time.sleep(0.1)
  28. else:
  29. raise
  30. assert expected == reloaded
  31. def concurrency_safe_write_rename(to_write, filename, write_func):
  32. temporary_filename = concurrency_safe_write(to_write,
  33. filename, write_func)
  34. concurrency_safe_rename(temporary_filename, filename)
  35. @timeout(0) # No timeout as this test can be long
  36. @with_multiprocessing
  37. @parametrize('backend', ['multiprocessing', 'loky', 'threading'])
  38. def test_concurrency_safe_write(tmpdir, backend):
  39. # Add one item to cache
  40. filename = tmpdir.join('test.pkl').strpath
  41. obj = {str(i): i for i in range(int(1e5))}
  42. funcs = [functools.partial(concurrency_safe_write_rename,
  43. write_func=write_func)
  44. if i % 3 != 2 else load_func for i in range(12)]
  45. Parallel(n_jobs=2, backend=backend)(
  46. delayed(func)(obj, filename) for func in funcs)