_monitor.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from threading import Event, Thread, current_thread
  2. from time import time
  3. from warnings import warn
  4. import atexit
  5. __all__ = ["TMonitor", "TqdmSynchronisationWarning"]
  6. class TqdmSynchronisationWarning(RuntimeWarning):
  7. """tqdm multi-thread/-process errors which may cause incorrect nesting
  8. but otherwise no adverse effects"""
  9. pass
  10. class TMonitor(Thread):
  11. """
  12. Monitoring thread for tqdm bars.
  13. Monitors if tqdm bars are taking too much time to display
  14. and readjusts miniters automatically if necessary.
  15. Parameters
  16. ----------
  17. tqdm_cls : class
  18. tqdm class to use (can be core tqdm or a submodule).
  19. sleep_interval : fload
  20. Time to sleep between monitoring checks.
  21. """
  22. # internal vars for unit testing
  23. _time = None
  24. _event = None
  25. def __init__(self, tqdm_cls, sleep_interval):
  26. Thread.__init__(self)
  27. self.daemon = True # kill thread when main killed (KeyboardInterrupt)
  28. self.was_killed = Event()
  29. self.woken = 0 # last time woken up, to sync with monitor
  30. self.tqdm_cls = tqdm_cls
  31. self.sleep_interval = sleep_interval
  32. if TMonitor._time is not None:
  33. self._time = TMonitor._time
  34. else:
  35. self._time = time
  36. if TMonitor._event is not None:
  37. self._event = TMonitor._event
  38. else:
  39. self._event = Event
  40. atexit.register(self.exit)
  41. self.start()
  42. def exit(self):
  43. self.was_killed.set()
  44. if self is not current_thread():
  45. self.join()
  46. return self.report()
  47. def get_instances(self):
  48. # returns a copy of started `tqdm_cls` instances
  49. return [i for i in self.tqdm_cls._instances.copy()
  50. # Avoid race by checking that the instance started
  51. if hasattr(i, 'start_t')]
  52. def run(self):
  53. cur_t = self._time()
  54. while True:
  55. # After processing and before sleeping, notify that we woke
  56. # Need to be done just before sleeping
  57. self.woken = cur_t
  58. # Sleep some time...
  59. self.was_killed.wait(self.sleep_interval)
  60. # Quit if killed
  61. if self.was_killed.is_set():
  62. return
  63. # Then monitor!
  64. # Acquire lock (to access _instances)
  65. with self.tqdm_cls.get_lock():
  66. cur_t = self._time()
  67. # Check tqdm instances are waiting too long to print
  68. instances = self.get_instances()
  69. for instance in instances:
  70. # Check event in loop to reduce blocking time on exit
  71. if self.was_killed.is_set():
  72. return
  73. # Only if mininterval > 1 (else iterations are just slow)
  74. # and last refresh exceeded maxinterval
  75. if instance.miniters > 1 and \
  76. (cur_t - instance.last_print_t) >= \
  77. instance.maxinterval:
  78. # force bypassing miniters on next iteration
  79. # (dynamic_miniters adjusts mininterval automatically)
  80. instance.miniters = 1
  81. # Refresh now! (works only for manual tqdm)
  82. instance.refresh(nolock=True)
  83. # Remove accidental long-lived strong reference
  84. del instance
  85. if instances != self.get_instances(): # pragma: nocover
  86. warn("Set changed size during iteration" +
  87. " (see https://github.com/tqdm/tqdm/issues/481)",
  88. TqdmSynchronisationWarning, stacklevel=2)
  89. # Remove accidental long-lived strong references
  90. del instances
  91. def report(self):
  92. return not self.was_killed.is_set()