| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- """
- Small utilities for testing.
- """
- import threading
- import signal
- import time
- import os
- import sys
- import gc
- from joblib._multiprocessing_helpers import mp
- from joblib.testing import SkipTest, skipif
- try:
- import lz4
- except ImportError:
- lz4 = None
- # A decorator to run tests only when numpy is available
- try:
- import numpy as np
- def with_numpy(func):
- """A decorator to skip tests requiring numpy."""
- return func
- except ImportError:
- def with_numpy(func):
- """A decorator to skip tests requiring numpy."""
- def my_func():
- raise SkipTest('Test requires numpy')
- return my_func
- np = None
- # TODO: Turn this back on after refactoring yield based tests in test_hashing
- # with_numpy = skipif(not np, reason='Test requires numpy.')
- # we use memory_profiler library for memory consumption checks
- try:
- from memory_profiler import memory_usage
- def with_memory_profiler(func):
- """A decorator to skip tests requiring memory_profiler."""
- return func
- def memory_used(func, *args, **kwargs):
- """Compute memory usage when executing func."""
- gc.collect()
- mem_use = memory_usage((func, args, kwargs), interval=.001)
- return max(mem_use) - min(mem_use)
- except ImportError:
- def with_memory_profiler(func):
- """A decorator to skip tests requiring memory_profiler."""
- def dummy_func():
- raise SkipTest('Test requires memory_profiler.')
- return dummy_func
- memory_usage = memory_used = None
- # A utility to kill the test runner in case a multiprocessing assumption
- # triggers an infinite wait on a pipe by the master process for one of its
- # failed workers
- _KILLER_THREADS = dict()
- def setup_autokill(module_name, timeout=30):
- """Timeout based suiciding thread to kill the test runner process
- If some subprocess dies in an unexpected way we don't want the
- parent process to block indefinitely.
- """
- if "NO_AUTOKILL" in os.environ or "--pdb" in sys.argv:
- # Do not install the autokiller
- return
- # Renew any previous contract under that name by first cancelling the
- # previous version (that should normally not happen in practice)
- teardown_autokill(module_name)
- def autokill():
- pid = os.getpid()
- print("Timeout exceeded: terminating stalled process: %d" % pid)
- os.kill(pid, signal.SIGTERM)
- # If were are still there ask the OS to kill ourself for real
- time.sleep(0.5)
- print("Timeout exceeded: killing stalled process: %d" % pid)
- os.kill(pid, signal.SIGKILL)
- _KILLER_THREADS[module_name] = t = threading.Timer(timeout, autokill)
- t.start()
- def teardown_autokill(module_name):
- """Cancel a previously started killer thread"""
- killer = _KILLER_THREADS.get(module_name)
- if killer is not None:
- killer.cancel()
- with_multiprocessing = skipif(
- mp is None, reason='Needs multiprocessing to run.')
- with_dev_shm = skipif(
- not os.path.exists('/dev/shm'),
- reason='This test requires a large /dev/shm shared memory fs.')
- with_lz4 = skipif(lz4 is None, reason='Needs lz4 compression to run')
- without_lz4 = skipif(
- lz4 is not None, reason='Needs lz4 not being installed to run')
|