utils_worker.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. """
  2. IO/concurrency helpers for `tqdm.contrib`.
  3. """
  4. from __future__ import absolute_import
  5. from concurrent.futures import ThreadPoolExecutor
  6. from collections import deque
  7. from tqdm.auto import tqdm as tqdm_auto
  8. __author__ = {"github.com/": ["casperdcl"]}
  9. __all__ = ['MonoWorker']
  10. class MonoWorker(object):
  11. """
  12. Supports one running task and one waiting task.
  13. The waiting task is the most recent submitted (others are discarded).
  14. """
  15. def __init__(self):
  16. self.pool = ThreadPoolExecutor(max_workers=1)
  17. self.futures = deque([], 2)
  18. def submit(self, func, *args, **kwargs):
  19. """`func(*args, **kwargs)` may replace currently waiting task."""
  20. futures = self.futures
  21. if len(futures) == futures.maxlen:
  22. running = futures.popleft()
  23. if not running.done():
  24. if len(futures): # clear waiting
  25. waiting = futures.pop()
  26. waiting.cancel()
  27. futures.appendleft(running) # re-insert running
  28. try:
  29. waiting = self.pool.submit(func, *args, **kwargs)
  30. except Exception as e:
  31. tqdm_auto.write(str(e))
  32. else:
  33. futures.append(waiting)
  34. return waiting