semlock.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. ###############################################################################
  2. # Ctypes implementation for posix semaphore.
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from cpython/Modules/_multiprocessing/semaphore.c (17/02/2017)
  7. # * use ctypes to access pthread semaphores and provide a full python
  8. # semaphore management.
  9. # * For OSX, as no sem_getvalue is not implemented, Semaphore with value > 1
  10. # are not guaranteed to work.
  11. # * Only work with LokyProcess on posix
  12. #
  13. import os
  14. import sys
  15. import time
  16. import errno
  17. import ctypes
  18. import tempfile
  19. import threading
  20. from ctypes.util import find_library
  21. # As we need to use ctypes return types for semlock object, failure value
  22. # needs to be cast to proper python value. Unix failure convention is to
  23. # return 0, whereas OSX returns -1
  24. SEM_FAILURE = ctypes.c_void_p(0).value
  25. if sys.platform == 'darwin':
  26. SEM_FAILURE = ctypes.c_void_p(-1).value
  27. # Semaphore types
  28. RECURSIVE_MUTEX = 0
  29. SEMAPHORE = 1
  30. # Semaphore constants
  31. SEM_OFLAG = ctypes.c_int(os.O_CREAT | os.O_EXCL)
  32. SEM_PERM = ctypes.c_int(384)
  33. class timespec(ctypes.Structure):
  34. _fields_ = [("tv_sec", ctypes.c_long), ("tv_nsec", ctypes.c_long)]
  35. if sys.platform != 'win32':
  36. pthread = ctypes.CDLL(find_library('pthread'), use_errno=True)
  37. pthread.sem_open.restype = ctypes.c_void_p
  38. pthread.sem_close.argtypes = [ctypes.c_void_p]
  39. pthread.sem_wait.argtypes = [ctypes.c_void_p]
  40. pthread.sem_trywait.argtypes = [ctypes.c_void_p]
  41. pthread.sem_post.argtypes = [ctypes.c_void_p]
  42. pthread.sem_getvalue.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
  43. pthread.sem_unlink.argtypes = [ctypes.c_char_p]
  44. if sys.platform != "darwin":
  45. pthread.sem_timedwait.argtypes = [ctypes.c_void_p,
  46. ctypes.POINTER(timespec)]
  47. try:
  48. from threading import get_ident
  49. except ImportError:
  50. def get_ident():
  51. return threading.current_thread().ident
  52. if sys.version_info[:2] < (3, 3):
  53. class FileExistsError(OSError):
  54. pass
  55. class FileNotFoundError(OSError):
  56. pass
  57. def sem_unlink(name):
  58. if pthread.sem_unlink(name.encode('ascii')) < 0:
  59. raiseFromErrno()
  60. def _sem_open(name, value=None):
  61. """ Construct or retrieve a semaphore with the given name
  62. If value is None, try to retrieve an existing named semaphore.
  63. Else create a new semaphore with the given value
  64. """
  65. if value is None:
  66. handle = pthread.sem_open(ctypes.c_char_p(name), 0)
  67. else:
  68. handle = pthread.sem_open(ctypes.c_char_p(name), SEM_OFLAG, SEM_PERM,
  69. ctypes.c_int(value))
  70. if handle == SEM_FAILURE:
  71. e = ctypes.get_errno()
  72. if e == errno.EEXIST:
  73. raise FileExistsError("a semaphore named %s already exists" % name)
  74. elif e == errno.ENOENT:
  75. raise FileNotFoundError('cannot find semaphore named %s' % name)
  76. elif e == errno.ENOSYS:
  77. raise NotImplementedError('No semaphore implementation on this '
  78. 'system')
  79. else:
  80. raiseFromErrno()
  81. return handle
  82. def _sem_timedwait(handle, timeout):
  83. t_start = time.time()
  84. if sys.platform != "darwin":
  85. sec = int(timeout)
  86. tv_sec = int(t_start)
  87. nsec = int(1e9 * (timeout - sec) + .5)
  88. tv_nsec = int(1e9 * (t_start - tv_sec) + .5)
  89. deadline = timespec(sec+tv_sec, nsec+tv_nsec)
  90. deadline.tv_sec += int(deadline.tv_nsec / 1000000000)
  91. deadline.tv_nsec %= 1000000000
  92. return pthread.sem_timedwait(handle, ctypes.pointer(deadline))
  93. # PERFORMANCE WARNING
  94. # No sem_timedwait on OSX so we implement our own method. This method can
  95. # degrade performances has the wait can have a latency up to 20 msecs
  96. deadline = t_start + timeout
  97. delay = 0
  98. now = time.time()
  99. while True:
  100. # Poll the sem file
  101. res = pthread.sem_trywait(handle)
  102. if res == 0:
  103. return 0
  104. else:
  105. e = ctypes.get_errno()
  106. if e != errno.EAGAIN:
  107. raiseFromErrno()
  108. # check for timeout
  109. now = time.time()
  110. if now > deadline:
  111. ctypes.set_errno(errno.ETIMEDOUT)
  112. return -1
  113. # calculate how much time left and check the delay is not too long
  114. # -- maximum is 20 msecs
  115. difference = (deadline - now)
  116. delay = min(delay, 20e-3, difference)
  117. # Sleep and increase delay
  118. time.sleep(delay)
  119. delay += 1e-3
  120. class SemLock(object):
  121. """ctypes wrapper to the unix semaphore"""
  122. _rand = tempfile._RandomNameSequence()
  123. def __init__(self, kind, value, maxvalue, name=None, unlink_now=False):
  124. self.count = 0
  125. self.ident = 0
  126. self.kind = kind
  127. self.maxvalue = maxvalue
  128. self.name = name
  129. self.handle = _sem_open(self.name.encode('ascii'), value)
  130. def __del__(self):
  131. try:
  132. res = pthread.sem_close(self.handle)
  133. assert res == 0, "Issue while closing semaphores"
  134. except AttributeError:
  135. pass
  136. def _is_mine(self):
  137. return self.count > 0 and get_ident() == self.ident
  138. def acquire(self, block=True, timeout=None):
  139. if self.kind == RECURSIVE_MUTEX and self._is_mine():
  140. self.count += 1
  141. return True
  142. if block and timeout is None:
  143. res = pthread.sem_wait(self.handle)
  144. elif not block or timeout <= 0:
  145. res = pthread.sem_trywait(self.handle)
  146. else:
  147. res = _sem_timedwait(self.handle, timeout)
  148. if res < 0:
  149. e = ctypes.get_errno()
  150. if e == errno.EINTR:
  151. return None
  152. elif e in [errno.EAGAIN, errno.ETIMEDOUT]:
  153. return False
  154. raiseFromErrno()
  155. self.count += 1
  156. self.ident = get_ident()
  157. return True
  158. def release(self):
  159. if self.kind == RECURSIVE_MUTEX:
  160. assert self._is_mine(), (
  161. "attempt to release recursive lock not owned by thread")
  162. if self.count > 1:
  163. self.count -= 1
  164. return
  165. assert self.count == 1
  166. else:
  167. if sys.platform == 'darwin':
  168. # Handle broken get_value for mac ==> only Lock will work
  169. # as sem_get_value do not work properly
  170. if self.maxvalue == 1:
  171. if pthread.sem_trywait(self.handle) < 0:
  172. e = ctypes.get_errno()
  173. if e != errno.EAGAIN:
  174. raise OSError(e, errno.errorcode[e])
  175. else:
  176. if pthread.sem_post(self.handle) < 0:
  177. raiseFromErrno()
  178. else:
  179. raise ValueError(
  180. "semaphore or lock released too many times")
  181. else:
  182. import warnings
  183. warnings.warn("semaphore are broken on OSX, release might "
  184. "increase its maximal value", RuntimeWarning)
  185. else:
  186. value = self._get_value()
  187. if value >= self.maxvalue:
  188. raise ValueError(
  189. "semaphore or lock released too many times")
  190. if pthread.sem_post(self.handle) < 0:
  191. raiseFromErrno()
  192. self.count -= 1
  193. def _get_value(self):
  194. value = ctypes.pointer(ctypes.c_int(-1))
  195. if pthread.sem_getvalue(self.handle, value) < 0:
  196. raiseFromErrno()
  197. return value.contents.value
  198. def _count(self):
  199. return self.count
  200. def _is_zero(self):
  201. if sys.platform == 'darwin':
  202. # Handle broken get_value for mac ==> only Lock will work
  203. # as sem_get_value do not work properly
  204. if pthread.sem_trywait(self.handle) < 0:
  205. e = ctypes.get_errno()
  206. if e == errno.EAGAIN:
  207. return True
  208. raise OSError(e, errno.errorcode[e])
  209. else:
  210. if pthread.sem_post(self.handle) < 0:
  211. raiseFromErrno()
  212. return False
  213. else:
  214. value = ctypes.pointer(ctypes.c_int(-1))
  215. if pthread.sem_getvalue(self.handle, value) < 0:
  216. raiseFromErrno()
  217. return value.contents.value == 0
  218. def _after_fork(self):
  219. self.count = 0
  220. @staticmethod
  221. def _rebuild(handle, kind, maxvalue, name):
  222. self = SemLock.__new__(SemLock)
  223. self.count = 0
  224. self.ident = 0
  225. self.kind = kind
  226. self.maxvalue = maxvalue
  227. self.name = name
  228. self.handle = _sem_open(name.encode('ascii'))
  229. return self
  230. def raiseFromErrno():
  231. e = ctypes.get_errno()
  232. raise OSError(e, errno.errorcode[e])