common.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. """
  2. Small utilities for testing.
  3. """
  4. import threading
  5. import signal
  6. import time
  7. import os
  8. import sys
  9. import gc
  10. from joblib._multiprocessing_helpers import mp
  11. from joblib.testing import SkipTest, skipif
  12. try:
  13. import lz4
  14. except ImportError:
  15. lz4 = None
  16. # A decorator to run tests only when numpy is available
  17. try:
  18. import numpy as np
  19. def with_numpy(func):
  20. """A decorator to skip tests requiring numpy."""
  21. return func
  22. except ImportError:
  23. def with_numpy(func):
  24. """A decorator to skip tests requiring numpy."""
  25. def my_func():
  26. raise SkipTest('Test requires numpy')
  27. return my_func
  28. np = None
  29. # TODO: Turn this back on after refactoring yield based tests in test_hashing
  30. # with_numpy = skipif(not np, reason='Test requires numpy.')
  31. # we use memory_profiler library for memory consumption checks
  32. try:
  33. from memory_profiler import memory_usage
  34. def with_memory_profiler(func):
  35. """A decorator to skip tests requiring memory_profiler."""
  36. return func
  37. def memory_used(func, *args, **kwargs):
  38. """Compute memory usage when executing func."""
  39. gc.collect()
  40. mem_use = memory_usage((func, args, kwargs), interval=.001)
  41. return max(mem_use) - min(mem_use)
  42. except ImportError:
  43. def with_memory_profiler(func):
  44. """A decorator to skip tests requiring memory_profiler."""
  45. def dummy_func():
  46. raise SkipTest('Test requires memory_profiler.')
  47. return dummy_func
  48. memory_usage = memory_used = None
  49. # A utility to kill the test runner in case a multiprocessing assumption
  50. # triggers an infinite wait on a pipe by the master process for one of its
  51. # failed workers
  52. _KILLER_THREADS = dict()
  53. def setup_autokill(module_name, timeout=30):
  54. """Timeout based suiciding thread to kill the test runner process
  55. If some subprocess dies in an unexpected way we don't want the
  56. parent process to block indefinitely.
  57. """
  58. if "NO_AUTOKILL" in os.environ or "--pdb" in sys.argv:
  59. # Do not install the autokiller
  60. return
  61. # Renew any previous contract under that name by first cancelling the
  62. # previous version (that should normally not happen in practice)
  63. teardown_autokill(module_name)
  64. def autokill():
  65. pid = os.getpid()
  66. print("Timeout exceeded: terminating stalled process: %d" % pid)
  67. os.kill(pid, signal.SIGTERM)
  68. # If were are still there ask the OS to kill ourself for real
  69. time.sleep(0.5)
  70. print("Timeout exceeded: killing stalled process: %d" % pid)
  71. os.kill(pid, signal.SIGKILL)
  72. _KILLER_THREADS[module_name] = t = threading.Timer(timeout, autokill)
  73. t.start()
  74. def teardown_autokill(module_name):
  75. """Cancel a previously started killer thread"""
  76. killer = _KILLER_THREADS.get(module_name)
  77. if killer is not None:
  78. killer.cancel()
  79. with_multiprocessing = skipif(
  80. mp is None, reason='Needs multiprocessing to run.')
  81. with_dev_shm = skipif(
  82. not os.path.exists('/dev/shm'),
  83. reason='This test requires a large /dev/shm shared memory fs.')
  84. with_lz4 = skipif(lz4 is None, reason='Needs lz4 compression to run')
  85. without_lz4 = skipif(
  86. lz4 is not None, reason='Needs lz4 not being installed to run')