| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- ###############################################################################
- # Ctypes implementation for posix semaphore.
- #
- # author: Thomas Moreau and Olivier Grisel
- #
- # adapted from cpython/Modules/_multiprocessing/semaphore.c (17/02/2017)
- # * use ctypes to access pthread semaphores and provide a full python
- # semaphore management.
- # * For OSX, as no sem_getvalue is not implemented, Semaphore with value > 1
- # are not guaranteed to work.
- # * Only work with LokyProcess on posix
- #
- import os
- import sys
- import time
- import errno
- import ctypes
- import tempfile
- import threading
- from ctypes.util import find_library
- # As we need to use ctypes return types for semlock object, failure value
- # needs to be cast to proper python value. Unix failure convention is to
- # return 0, whereas OSX returns -1
- SEM_FAILURE = ctypes.c_void_p(0).value
- if sys.platform == 'darwin':
- SEM_FAILURE = ctypes.c_void_p(-1).value
- # Semaphore types
- RECURSIVE_MUTEX = 0
- SEMAPHORE = 1
- # Semaphore constants
- SEM_OFLAG = ctypes.c_int(os.O_CREAT | os.O_EXCL)
- SEM_PERM = ctypes.c_int(384)
- class timespec(ctypes.Structure):
- _fields_ = [("tv_sec", ctypes.c_long), ("tv_nsec", ctypes.c_long)]
- if sys.platform != 'win32':
- pthread = ctypes.CDLL(find_library('pthread'), use_errno=True)
- pthread.sem_open.restype = ctypes.c_void_p
- pthread.sem_close.argtypes = [ctypes.c_void_p]
- pthread.sem_wait.argtypes = [ctypes.c_void_p]
- pthread.sem_trywait.argtypes = [ctypes.c_void_p]
- pthread.sem_post.argtypes = [ctypes.c_void_p]
- pthread.sem_getvalue.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
- pthread.sem_unlink.argtypes = [ctypes.c_char_p]
- if sys.platform != "darwin":
- pthread.sem_timedwait.argtypes = [ctypes.c_void_p,
- ctypes.POINTER(timespec)]
- try:
- from threading import get_ident
- except ImportError:
- def get_ident():
- return threading.current_thread().ident
- if sys.version_info[:2] < (3, 3):
- class FileExistsError(OSError):
- pass
- class FileNotFoundError(OSError):
- pass
- def sem_unlink(name):
- if pthread.sem_unlink(name.encode('ascii')) < 0:
- raiseFromErrno()
- def _sem_open(name, value=None):
- """ Construct or retrieve a semaphore with the given name
- If value is None, try to retrieve an existing named semaphore.
- Else create a new semaphore with the given value
- """
- if value is None:
- handle = pthread.sem_open(ctypes.c_char_p(name), 0)
- else:
- handle = pthread.sem_open(ctypes.c_char_p(name), SEM_OFLAG, SEM_PERM,
- ctypes.c_int(value))
- if handle == SEM_FAILURE:
- e = ctypes.get_errno()
- if e == errno.EEXIST:
- raise FileExistsError("a semaphore named %s already exists" % name)
- elif e == errno.ENOENT:
- raise FileNotFoundError('cannot find semaphore named %s' % name)
- elif e == errno.ENOSYS:
- raise NotImplementedError('No semaphore implementation on this '
- 'system')
- else:
- raiseFromErrno()
- return handle
- def _sem_timedwait(handle, timeout):
- t_start = time.time()
- if sys.platform != "darwin":
- sec = int(timeout)
- tv_sec = int(t_start)
- nsec = int(1e9 * (timeout - sec) + .5)
- tv_nsec = int(1e9 * (t_start - tv_sec) + .5)
- deadline = timespec(sec+tv_sec, nsec+tv_nsec)
- deadline.tv_sec += int(deadline.tv_nsec / 1000000000)
- deadline.tv_nsec %= 1000000000
- return pthread.sem_timedwait(handle, ctypes.pointer(deadline))
- # PERFORMANCE WARNING
- # No sem_timedwait on OSX so we implement our own method. This method can
- # degrade performances has the wait can have a latency up to 20 msecs
- deadline = t_start + timeout
- delay = 0
- now = time.time()
- while True:
- # Poll the sem file
- res = pthread.sem_trywait(handle)
- if res == 0:
- return 0
- else:
- e = ctypes.get_errno()
- if e != errno.EAGAIN:
- raiseFromErrno()
- # check for timeout
- now = time.time()
- if now > deadline:
- ctypes.set_errno(errno.ETIMEDOUT)
- return -1
- # calculate how much time left and check the delay is not too long
- # -- maximum is 20 msecs
- difference = (deadline - now)
- delay = min(delay, 20e-3, difference)
- # Sleep and increase delay
- time.sleep(delay)
- delay += 1e-3
- class SemLock(object):
- """ctypes wrapper to the unix semaphore"""
- _rand = tempfile._RandomNameSequence()
- def __init__(self, kind, value, maxvalue, name=None, unlink_now=False):
- self.count = 0
- self.ident = 0
- self.kind = kind
- self.maxvalue = maxvalue
- self.name = name
- self.handle = _sem_open(self.name.encode('ascii'), value)
- def __del__(self):
- try:
- res = pthread.sem_close(self.handle)
- assert res == 0, "Issue while closing semaphores"
- except AttributeError:
- pass
- def _is_mine(self):
- return self.count > 0 and get_ident() == self.ident
- def acquire(self, block=True, timeout=None):
- if self.kind == RECURSIVE_MUTEX and self._is_mine():
- self.count += 1
- return True
- if block and timeout is None:
- res = pthread.sem_wait(self.handle)
- elif not block or timeout <= 0:
- res = pthread.sem_trywait(self.handle)
- else:
- res = _sem_timedwait(self.handle, timeout)
- if res < 0:
- e = ctypes.get_errno()
- if e == errno.EINTR:
- return None
- elif e in [errno.EAGAIN, errno.ETIMEDOUT]:
- return False
- raiseFromErrno()
- self.count += 1
- self.ident = get_ident()
- return True
- def release(self):
- if self.kind == RECURSIVE_MUTEX:
- assert self._is_mine(), (
- "attempt to release recursive lock not owned by thread")
- if self.count > 1:
- self.count -= 1
- return
- assert self.count == 1
- else:
- if sys.platform == 'darwin':
- # Handle broken get_value for mac ==> only Lock will work
- # as sem_get_value do not work properly
- if self.maxvalue == 1:
- if pthread.sem_trywait(self.handle) < 0:
- e = ctypes.get_errno()
- if e != errno.EAGAIN:
- raise OSError(e, errno.errorcode[e])
- else:
- if pthread.sem_post(self.handle) < 0:
- raiseFromErrno()
- else:
- raise ValueError(
- "semaphore or lock released too many times")
- else:
- import warnings
- warnings.warn("semaphore are broken on OSX, release might "
- "increase its maximal value", RuntimeWarning)
- else:
- value = self._get_value()
- if value >= self.maxvalue:
- raise ValueError(
- "semaphore or lock released too many times")
- if pthread.sem_post(self.handle) < 0:
- raiseFromErrno()
- self.count -= 1
- def _get_value(self):
- value = ctypes.pointer(ctypes.c_int(-1))
- if pthread.sem_getvalue(self.handle, value) < 0:
- raiseFromErrno()
- return value.contents.value
- def _count(self):
- return self.count
- def _is_zero(self):
- if sys.platform == 'darwin':
- # Handle broken get_value for mac ==> only Lock will work
- # as sem_get_value do not work properly
- if pthread.sem_trywait(self.handle) < 0:
- e = ctypes.get_errno()
- if e == errno.EAGAIN:
- return True
- raise OSError(e, errno.errorcode[e])
- else:
- if pthread.sem_post(self.handle) < 0:
- raiseFromErrno()
- return False
- else:
- value = ctypes.pointer(ctypes.c_int(-1))
- if pthread.sem_getvalue(self.handle, value) < 0:
- raiseFromErrno()
- return value.contents.value == 0
- def _after_fork(self):
- self.count = 0
- @staticmethod
- def _rebuild(handle, kind, maxvalue, name):
- self = SemLock.__new__(SemLock)
- self.count = 0
- self.ident = 0
- self.kind = kind
- self.maxvalue = maxvalue
- self.name = name
- self.handle = _sem_open(name.encode('ascii'))
- return self
- def raiseFromErrno():
- e = ctypes.get_errno()
- raise OSError(e, errno.errorcode[e])
|