managers.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. ###############################################################################
  2. # compat for UNIX 2.7 and 3.3
  3. # Manager with LokyContext server.
  4. # This avoids having a Manager using fork and breaks the fd.
  5. #
  6. # author: Thomas Moreau and Olivier Grisel
  7. #
  8. # based on multiprocessing/managers.py (17/02/2017)
  9. # * Overload the start method to use LokyContext and launch a loky subprocess
  10. #
  11. import multiprocessing as mp
  12. from multiprocessing.managers import SyncManager, State
  13. from .process import LokyProcess as Process
  14. class LokyManager(SyncManager):
  15. def start(self, initializer=None, initargs=()):
  16. '''Spawn a server process for this manager object'''
  17. assert self._state.value == State.INITIAL
  18. if (initializer is not None
  19. and not hasattr(initializer, '__call__')):
  20. raise TypeError('initializer must be a callable')
  21. # pipe over which we will retrieve address of server
  22. reader, writer = mp.Pipe(duplex=False)
  23. # spawn process which runs a server
  24. self._process = Process(
  25. target=type(self)._run_server,
  26. args=(self._registry, self._address, bytes(self._authkey),
  27. self._serializer, writer, initializer, initargs),
  28. )
  29. ident = ':'.join(str(i) for i in self._process._identity)
  30. self._process.name = type(self).__name__ + '-' + ident
  31. self._process.start()
  32. # get address of server
  33. writer.close()
  34. self._address = reader.recv()
  35. reader.close()
  36. # register a finalizer
  37. self._state.value = State.STARTED
  38. self.shutdown = mp.util.Finalize(
  39. self, type(self)._finalize_manager,
  40. args=(self._process, self._address, self._authkey,
  41. self._state, self._Client),
  42. exitpriority=0
  43. )