context.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. ###############################################################################
  2. # Basic context management with LokyContext and provides
  3. # compat for UNIX 2.7 and 3.3
  4. #
  5. # author: Thomas Moreau and Olivier Grisel
  6. #
  7. # adapted from multiprocessing/context.py
  8. # * Create a context ensuring loky uses only objects that are compatible
  9. # * Add LokyContext to the list of context of multiprocessing so loky can be
  10. # used with multiprocessing.set_start_method
  11. # * Add some compat function for python2.7 and 3.3.
  12. #
  13. from __future__ import division
  14. import os
  15. import sys
  16. import warnings
  17. import multiprocessing as mp
  18. from .process import LokyProcess, LokyInitMainProcess
  19. START_METHODS = ['loky', 'loky_init_main']
  20. _DEFAULT_START_METHOD = None
  21. if sys.version_info[:2] >= (3, 4):
  22. from multiprocessing import get_context as mp_get_context
  23. from multiprocessing.context import assert_spawning, set_spawning_popen
  24. from multiprocessing.context import get_spawning_popen, BaseContext
  25. START_METHODS += ['spawn']
  26. if sys.platform != 'win32':
  27. START_METHODS += ['fork', 'forkserver']
  28. def get_context(method=None):
  29. # Try to overload the default context
  30. method = method or _DEFAULT_START_METHOD or "loky"
  31. if method == "fork":
  32. # If 'fork' is explicitly requested, warn user about potential
  33. # issues.
  34. warnings.warn("`fork` start method should not be used with "
  35. "`loky` as it does not respect POSIX. Try using "
  36. "`spawn` or `loky` instead.", UserWarning)
  37. try:
  38. context = mp_get_context(method)
  39. except ValueError:
  40. raise ValueError("Unknown context '{}'. Value should be in {}."
  41. .format(method, START_METHODS))
  42. return context
  43. else:
  44. if sys.platform != 'win32':
  45. import threading
  46. # Mechanism to check that the current thread is spawning a process
  47. _tls = threading.local()
  48. popen_attr = 'spawning_popen'
  49. else:
  50. from multiprocessing.forking import Popen
  51. _tls = Popen._tls
  52. popen_attr = 'process_handle'
  53. BaseContext = object
  54. def get_spawning_popen():
  55. return getattr(_tls, popen_attr, None)
  56. def set_spawning_popen(popen):
  57. setattr(_tls, popen_attr, popen)
  58. def assert_spawning(obj):
  59. if get_spawning_popen() is None:
  60. raise RuntimeError(
  61. '%s objects should only be shared between processes'
  62. ' through inheritance' % type(obj).__name__
  63. )
  64. def get_context(method=None):
  65. method = method or _DEFAULT_START_METHOD or 'loky'
  66. if method == "loky":
  67. return LokyContext()
  68. elif method == "loky_init_main":
  69. return LokyInitMainContext()
  70. else:
  71. raise ValueError("Unknown context '{}'. Value should be in {}."
  72. .format(method, START_METHODS))
  73. def set_start_method(method, force=False):
  74. global _DEFAULT_START_METHOD
  75. if _DEFAULT_START_METHOD is not None and not force:
  76. raise RuntimeError('context has already been set')
  77. assert method is None or method in START_METHODS, (
  78. "'{}' is not a valid start_method. It should be in {}"
  79. .format(method, START_METHODS))
  80. _DEFAULT_START_METHOD = method
  81. def get_start_method():
  82. return _DEFAULT_START_METHOD
  83. def cpu_count():
  84. """Return the number of CPUs the current process can use.
  85. The returned number of CPUs accounts for:
  86. * the number of CPUs in the system, as given by
  87. ``multiprocessing.cpu_count``;
  88. * the CPU affinity settings of the current process
  89. (available with Python 3.4+ on some Unix systems);
  90. * CFS scheduler CPU bandwidth limit (available on Linux only, typically
  91. set by docker and similar container orchestration systems);
  92. * the value of the LOKY_MAX_CPU_COUNT environment variable if defined.
  93. and is given as the minimum of these constraints.
  94. It is also always larger or equal to 1.
  95. """
  96. import math
  97. try:
  98. cpu_count_mp = mp.cpu_count()
  99. except NotImplementedError:
  100. cpu_count_mp = 1
  101. # Number of available CPUs given affinity settings
  102. cpu_count_affinity = cpu_count_mp
  103. if hasattr(os, 'sched_getaffinity'):
  104. try:
  105. cpu_count_affinity = len(os.sched_getaffinity(0))
  106. except NotImplementedError:
  107. pass
  108. # CFS scheduler CPU bandwidth limit
  109. # available in Linux since 2.6 kernel
  110. cpu_count_cfs = cpu_count_mp
  111. cfs_quota_fname = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"
  112. cfs_period_fname = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"
  113. if os.path.exists(cfs_quota_fname) and os.path.exists(cfs_period_fname):
  114. with open(cfs_quota_fname, 'r') as fh:
  115. cfs_quota_us = int(fh.read())
  116. with open(cfs_period_fname, 'r') as fh:
  117. cfs_period_us = int(fh.read())
  118. if cfs_quota_us > 0 and cfs_period_us > 0:
  119. # Make sure this quantity is an int as math.ceil returns a
  120. # float in python2.7. (See issue #165)
  121. cpu_count_cfs = int(math.ceil(cfs_quota_us / cfs_period_us))
  122. # User defined soft-limit passed as an loky specific environment variable.
  123. cpu_count_loky = int(os.environ.get('LOKY_MAX_CPU_COUNT', cpu_count_mp))
  124. aggregate_cpu_count = min(cpu_count_mp, cpu_count_affinity, cpu_count_cfs,
  125. cpu_count_loky)
  126. return max(aggregate_cpu_count, 1)
  127. class LokyContext(BaseContext):
  128. """Context relying on the LokyProcess."""
  129. _name = 'loky'
  130. Process = LokyProcess
  131. cpu_count = staticmethod(cpu_count)
  132. def Queue(self, maxsize=0, reducers=None):
  133. '''Returns a queue object'''
  134. from .queues import Queue
  135. return Queue(maxsize, reducers=reducers,
  136. ctx=self.get_context())
  137. def SimpleQueue(self, reducers=None):
  138. '''Returns a queue object'''
  139. from .queues import SimpleQueue
  140. return SimpleQueue(reducers=reducers, ctx=self.get_context())
  141. if sys.version_info[:2] < (3, 4):
  142. """Compat for python2.7/3.3 for necessary methods in Context"""
  143. def get_context(self):
  144. return self
  145. def get_start_method(self):
  146. return self._name
  147. def Pipe(self, duplex=True):
  148. '''Returns two connection object connected by a pipe'''
  149. return mp.Pipe(duplex)
  150. if sys.platform != "win32":
  151. """Use the compat Manager for python2.7/3.3 on UNIX to avoid
  152. relying on fork processes
  153. """
  154. def Manager(self):
  155. """Returns a manager object"""
  156. from .managers import LokyManager
  157. m = LokyManager()
  158. m.start()
  159. return m
  160. else:
  161. """Compat for context on Windows and python2.7/3.3. Using regular
  162. multiprocessing objects as it does not rely on fork.
  163. """
  164. from multiprocessing import synchronize
  165. Semaphore = staticmethod(synchronize.Semaphore)
  166. BoundedSemaphore = staticmethod(synchronize.BoundedSemaphore)
  167. Lock = staticmethod(synchronize.Lock)
  168. RLock = staticmethod(synchronize.RLock)
  169. Condition = staticmethod(synchronize.Condition)
  170. Event = staticmethod(synchronize.Event)
  171. Manager = staticmethod(mp.Manager)
  172. if sys.platform != "win32":
  173. """For Unix platform, use our custom implementation of synchronize
  174. relying on ctypes to interface with pthread semaphores.
  175. """
  176. def Semaphore(self, value=1):
  177. """Returns a semaphore object"""
  178. from .synchronize import Semaphore
  179. return Semaphore(value=value)
  180. def BoundedSemaphore(self, value):
  181. """Returns a bounded semaphore object"""
  182. from .synchronize import BoundedSemaphore
  183. return BoundedSemaphore(value)
  184. def Lock(self):
  185. """Returns a lock object"""
  186. from .synchronize import Lock
  187. return Lock()
  188. def RLock(self):
  189. """Returns a recurrent lock object"""
  190. from .synchronize import RLock
  191. return RLock()
  192. def Condition(self, lock=None):
  193. """Returns a condition object"""
  194. from .synchronize import Condition
  195. return Condition(lock)
  196. def Event(self):
  197. """Returns an event object"""
  198. from .synchronize import Event
  199. return Event()
  200. class LokyInitMainContext(LokyContext):
  201. """Extra context with LokyProcess, which does load the main module
  202. This context is used for compatibility in the case ``cloudpickle`` is not
  203. present on the running system. This permits to load functions defined in
  204. the ``main`` module, using proper safeguards. The declaration of the
  205. ``executor`` should be protected by ``if __name__ == "__main__":`` and the
  206. functions and variable used from main should be out of this block.
  207. This mimics the default behavior of multiprocessing under Windows and the
  208. behavior of the ``spawn`` start method on a posix system for python3.4+.
  209. For more details, see the end of the following section of python doc
  210. https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
  211. """
  212. _name = 'loky_init_main'
  213. Process = LokyInitMainProcess
  214. if sys.version_info > (3, 4):
  215. """Register loky context so it works with multiprocessing.get_context"""
  216. ctx_loky = LokyContext()
  217. mp.context._concrete_contexts['loky'] = ctx_loky
  218. mp.context._concrete_contexts['loky_init_main'] = LokyInitMainContext()