synchronize.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. ###############################################################################
  2. # Synchronization primitives based on our SemLock implementation
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from multiprocessing/synchronize.py (17/02/2017)
  7. # * Remove ctx argument for compatibility reason
  8. # * Implementation of Condition/Event are necessary for compatibility
  9. # with python2.7/3.3, Barrier should be reimplemented to for those
  10. # version (but it is not used in loky).
  11. #
  12. import os
  13. import sys
  14. import tempfile
  15. import threading
  16. import _multiprocessing
  17. from time import time as _time
  18. from .context import assert_spawning
  19. from . import resource_tracker
  20. from multiprocessing import process
  21. from multiprocessing import util
  22. __all__ = [
  23. 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
  24. ]
  25. # Try to import the mp.synchronize module cleanly, if it fails
  26. # raise ImportError for platforms lacking a working sem_open implementation.
  27. # See issue 3770
  28. try:
  29. if sys.version_info < (3, 4):
  30. from .semlock import SemLock as _SemLock
  31. from .semlock import sem_unlink
  32. else:
  33. from _multiprocessing import SemLock as _SemLock
  34. from _multiprocessing import sem_unlink
  35. except (ImportError):
  36. raise ImportError("This platform lacks a functioning sem_open" +
  37. " implementation, therefore, the required" +
  38. " synchronization primitives needed will not" +
  39. " function, see issue 3770.")
  40. if sys.version_info[:2] < (3, 3):
  41. FileExistsError = OSError
  42. #
  43. # Constants
  44. #
  45. RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
  46. SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
  47. #
  48. # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
  49. #
  50. class SemLock(object):
  51. _rand = tempfile._RandomNameSequence()
  52. def __init__(self, kind, value, maxvalue):
  53. # unlink_now is only used on win32 or when we are using fork.
  54. unlink_now = False
  55. for i in range(100):
  56. try:
  57. self._semlock = _SemLock(
  58. kind, value, maxvalue, SemLock._make_name(),
  59. unlink_now)
  60. except FileExistsError: # pragma: no cover
  61. pass
  62. else:
  63. break
  64. else: # pragma: no cover
  65. raise FileExistsError('cannot find name for semaphore')
  66. util.debug('created semlock with handle %s and name "%s"'
  67. % (self._semlock.handle, self._semlock.name))
  68. self._make_methods()
  69. def _after_fork(obj):
  70. obj._semlock._after_fork()
  71. util.register_after_fork(self, _after_fork)
  72. # When the object is garbage collected or the
  73. # process shuts down we unlink the semaphore name
  74. resource_tracker.register(self._semlock.name, "semlock")
  75. util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
  76. exitpriority=0)
  77. @staticmethod
  78. def _cleanup(name):
  79. sem_unlink(name)
  80. resource_tracker.unregister(name, "semlock")
  81. def _make_methods(self):
  82. self.acquire = self._semlock.acquire
  83. self.release = self._semlock.release
  84. def __enter__(self):
  85. return self._semlock.acquire()
  86. def __exit__(self, *args):
  87. return self._semlock.release()
  88. def __getstate__(self):
  89. assert_spawning(self)
  90. sl = self._semlock
  91. h = sl.handle
  92. return (h, sl.kind, sl.maxvalue, sl.name)
  93. def __setstate__(self, state):
  94. self._semlock = _SemLock._rebuild(*state)
  95. util.debug('recreated blocker with handle %r and name "%s"'
  96. % (state[0], state[3]))
  97. self._make_methods()
  98. @staticmethod
  99. def _make_name():
  100. # OSX does not support long names for semaphores
  101. return '/loky-%i-%s' % (os.getpid(), next(SemLock._rand))
  102. #
  103. # Semaphore
  104. #
  105. class Semaphore(SemLock):
  106. def __init__(self, value=1):
  107. SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
  108. def get_value(self):
  109. if sys.platform == 'darwin':
  110. raise NotImplementedError("OSX does not implement sem_getvalue")
  111. return self._semlock._get_value()
  112. def __repr__(self):
  113. try:
  114. value = self._semlock._get_value()
  115. except Exception:
  116. value = 'unknown'
  117. return '<%s(value=%s)>' % (self.__class__.__name__, value)
  118. #
  119. # Bounded semaphore
  120. #
  121. class BoundedSemaphore(Semaphore):
  122. def __init__(self, value=1):
  123. SemLock.__init__(self, SEMAPHORE, value, value)
  124. def __repr__(self):
  125. try:
  126. value = self._semlock._get_value()
  127. except Exception:
  128. value = 'unknown'
  129. return '<%s(value=%s, maxvalue=%s)>' % \
  130. (self.__class__.__name__, value, self._semlock.maxvalue)
  131. #
  132. # Non-recursive lock
  133. #
  134. class Lock(SemLock):
  135. def __init__(self):
  136. super(Lock, self).__init__(SEMAPHORE, 1, 1)
  137. def __repr__(self):
  138. try:
  139. if self._semlock._is_mine():
  140. name = process.current_process().name
  141. if threading.current_thread().name != 'MainThread':
  142. name += '|' + threading.current_thread().name
  143. elif self._semlock._get_value() == 1:
  144. name = 'None'
  145. elif self._semlock._count() > 0:
  146. name = 'SomeOtherThread'
  147. else:
  148. name = 'SomeOtherProcess'
  149. except Exception:
  150. name = 'unknown'
  151. return '<%s(owner=%s)>' % (self.__class__.__name__, name)
  152. #
  153. # Recursive lock
  154. #
  155. class RLock(SemLock):
  156. def __init__(self):
  157. super(RLock, self).__init__(RECURSIVE_MUTEX, 1, 1)
  158. def __repr__(self):
  159. try:
  160. if self._semlock._is_mine():
  161. name = process.current_process().name
  162. if threading.current_thread().name != 'MainThread':
  163. name += '|' + threading.current_thread().name
  164. count = self._semlock._count()
  165. elif self._semlock._get_value() == 1:
  166. name, count = 'None', 0
  167. elif self._semlock._count() > 0:
  168. name, count = 'SomeOtherThread', 'nonzero'
  169. else:
  170. name, count = 'SomeOtherProcess', 'nonzero'
  171. except Exception:
  172. name, count = 'unknown', 'unknown'
  173. return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
  174. #
  175. # Condition variable
  176. #
  177. class Condition(object):
  178. def __init__(self, lock=None):
  179. self._lock = lock or RLock()
  180. self._sleeping_count = Semaphore(0)
  181. self._woken_count = Semaphore(0)
  182. self._wait_semaphore = Semaphore(0)
  183. self._make_methods()
  184. def __getstate__(self):
  185. assert_spawning(self)
  186. return (self._lock, self._sleeping_count,
  187. self._woken_count, self._wait_semaphore)
  188. def __setstate__(self, state):
  189. (self._lock, self._sleeping_count,
  190. self._woken_count, self._wait_semaphore) = state
  191. self._make_methods()
  192. def __enter__(self):
  193. return self._lock.__enter__()
  194. def __exit__(self, *args):
  195. return self._lock.__exit__(*args)
  196. def _make_methods(self):
  197. self.acquire = self._lock.acquire
  198. self.release = self._lock.release
  199. def __repr__(self):
  200. try:
  201. num_waiters = (self._sleeping_count._semlock._get_value() -
  202. self._woken_count._semlock._get_value())
  203. except Exception:
  204. num_waiters = 'unknown'
  205. return '<%s(%s, %s)>' % (self.__class__.__name__,
  206. self._lock, num_waiters)
  207. def wait(self, timeout=None):
  208. assert self._lock._semlock._is_mine(), \
  209. 'must acquire() condition before using wait()'
  210. # indicate that this thread is going to sleep
  211. self._sleeping_count.release()
  212. # release lock
  213. count = self._lock._semlock._count()
  214. for i in range(count):
  215. self._lock.release()
  216. try:
  217. # wait for notification or timeout
  218. return self._wait_semaphore.acquire(True, timeout)
  219. finally:
  220. # indicate that this thread has woken
  221. self._woken_count.release()
  222. # reacquire lock
  223. for i in range(count):
  224. self._lock.acquire()
  225. def notify(self):
  226. assert self._lock._semlock._is_mine(), 'lock is not owned'
  227. assert not self._wait_semaphore.acquire(False)
  228. # to take account of timeouts since last notify() we subtract
  229. # woken_count from sleeping_count and rezero woken_count
  230. while self._woken_count.acquire(False):
  231. res = self._sleeping_count.acquire(False)
  232. assert res
  233. if self._sleeping_count.acquire(False): # try grabbing a sleeper
  234. self._wait_semaphore.release() # wake up one sleeper
  235. self._woken_count.acquire() # wait for the sleeper to wake
  236. # rezero _wait_semaphore in case a timeout just happened
  237. self._wait_semaphore.acquire(False)
  238. def notify_all(self):
  239. assert self._lock._semlock._is_mine(), 'lock is not owned'
  240. assert not self._wait_semaphore.acquire(False)
  241. # to take account of timeouts since last notify*() we subtract
  242. # woken_count from sleeping_count and rezero woken_count
  243. while self._woken_count.acquire(False):
  244. res = self._sleeping_count.acquire(False)
  245. assert res
  246. sleepers = 0
  247. while self._sleeping_count.acquire(False):
  248. self._wait_semaphore.release() # wake up one sleeper
  249. sleepers += 1
  250. if sleepers:
  251. for i in range(sleepers):
  252. self._woken_count.acquire() # wait for a sleeper to wake
  253. # rezero wait_semaphore in case some timeouts just happened
  254. while self._wait_semaphore.acquire(False):
  255. pass
  256. def wait_for(self, predicate, timeout=None):
  257. result = predicate()
  258. if result:
  259. return result
  260. if timeout is not None:
  261. endtime = _time() + timeout
  262. else:
  263. endtime = None
  264. waittime = None
  265. while not result:
  266. if endtime is not None:
  267. waittime = endtime - _time()
  268. if waittime <= 0:
  269. break
  270. self.wait(waittime)
  271. result = predicate()
  272. return result
  273. #
  274. # Event
  275. #
  276. class Event(object):
  277. def __init__(self):
  278. self._cond = Condition(Lock())
  279. self._flag = Semaphore(0)
  280. def is_set(self):
  281. with self._cond:
  282. if self._flag.acquire(False):
  283. self._flag.release()
  284. return True
  285. return False
  286. def set(self):
  287. with self._cond:
  288. self._flag.acquire(False)
  289. self._flag.release()
  290. self._cond.notify_all()
  291. def clear(self):
  292. with self._cond:
  293. self._flag.acquire(False)
  294. def wait(self, timeout=None):
  295. with self._cond:
  296. if self._flag.acquire(False):
  297. self._flag.release()
  298. else:
  299. self._cond.wait(timeout)
  300. if self._flag.acquire(False):
  301. self._flag.release()
  302. return True
  303. return False