popen_loky_posix.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. ###############################################################################
  2. # Popen for LokyProcess.
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. import os
  7. import sys
  8. import signal
  9. import pickle
  10. from io import BytesIO
  11. from . import reduction, spawn
  12. from .context import get_spawning_popen, set_spawning_popen
  13. from multiprocessing import util, process
  14. if sys.version_info[:2] < (3, 3):
  15. ProcessLookupError = OSError
  16. if sys.platform != "win32":
  17. from . import resource_tracker
  18. __all__ = []
  19. if sys.platform != "win32":
  20. #
  21. # Wrapper for an fd used while launching a process
  22. #
  23. class _DupFd(object):
  24. def __init__(self, fd):
  25. self.fd = reduction._mk_inheritable(fd)
  26. def detach(self):
  27. return self.fd
  28. #
  29. # Start child process using subprocess.Popen
  30. #
  31. __all__.append('Popen')
  32. class Popen(object):
  33. method = 'loky'
  34. DupFd = _DupFd
  35. def __init__(self, process_obj):
  36. sys.stdout.flush()
  37. sys.stderr.flush()
  38. self.returncode = None
  39. self._fds = []
  40. self._launch(process_obj)
  41. if sys.version_info < (3, 4):
  42. @classmethod
  43. def duplicate_for_child(cls, fd):
  44. popen = get_spawning_popen()
  45. popen._fds.append(fd)
  46. return reduction._mk_inheritable(fd)
  47. else:
  48. def duplicate_for_child(self, fd):
  49. self._fds.append(fd)
  50. return reduction._mk_inheritable(fd)
  51. def poll(self, flag=os.WNOHANG):
  52. if self.returncode is None:
  53. while True:
  54. try:
  55. pid, sts = os.waitpid(self.pid, flag)
  56. except OSError:
  57. # Child process not yet created. See #1731717
  58. # e.errno == errno.ECHILD == 10
  59. return None
  60. else:
  61. break
  62. if pid == self.pid:
  63. if os.WIFSIGNALED(sts):
  64. self.returncode = -os.WTERMSIG(sts)
  65. else:
  66. assert os.WIFEXITED(sts)
  67. self.returncode = os.WEXITSTATUS(sts)
  68. return self.returncode
  69. def wait(self, timeout=None):
  70. if sys.version_info < (3, 3):
  71. import time
  72. if timeout is None:
  73. return self.poll(0)
  74. deadline = time.time() + timeout
  75. delay = 0.0005
  76. while 1:
  77. res = self.poll()
  78. if res is not None:
  79. break
  80. remaining = deadline - time.time()
  81. if remaining <= 0:
  82. break
  83. delay = min(delay * 2, remaining, 0.05)
  84. time.sleep(delay)
  85. return res
  86. if self.returncode is None:
  87. if timeout is not None:
  88. from multiprocessing.connection import wait
  89. if not wait([self.sentinel], timeout):
  90. return None
  91. # This shouldn't block if wait() returned successfully.
  92. return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  93. return self.returncode
  94. def terminate(self):
  95. if self.returncode is None:
  96. try:
  97. os.kill(self.pid, signal.SIGTERM)
  98. except ProcessLookupError:
  99. pass
  100. except OSError:
  101. if self.wait(timeout=0.1) is None:
  102. raise
  103. def _launch(self, process_obj):
  104. tracker_fd = resource_tracker._resource_tracker.getfd()
  105. fp = BytesIO()
  106. set_spawning_popen(self)
  107. try:
  108. prep_data = spawn.get_preparation_data(
  109. process_obj._name,
  110. getattr(process_obj, "init_main_module", True))
  111. reduction.dump(prep_data, fp)
  112. reduction.dump(process_obj, fp)
  113. finally:
  114. set_spawning_popen(None)
  115. try:
  116. parent_r, child_w = os.pipe()
  117. child_r, parent_w = os.pipe()
  118. # for fd in self._fds:
  119. # _mk_inheritable(fd)
  120. cmd_python = [sys.executable]
  121. cmd_python += ['-m', self.__module__]
  122. cmd_python += ['--process-name', str(process_obj.name)]
  123. cmd_python += ['--pipe',
  124. str(reduction._mk_inheritable(child_r))]
  125. reduction._mk_inheritable(child_w)
  126. reduction._mk_inheritable(tracker_fd)
  127. self._fds.extend([child_r, child_w, tracker_fd])
  128. if sys.version_info >= (3, 8) and os.name == 'posix':
  129. mp_tracker_fd = prep_data['mp_tracker_args']['fd']
  130. self.duplicate_for_child(mp_tracker_fd)
  131. from .fork_exec import fork_exec
  132. pid = fork_exec(cmd_python, self._fds, env=process_obj.env)
  133. util.debug("launched python with pid {} and cmd:\n{}"
  134. .format(pid, cmd_python))
  135. self.sentinel = parent_r
  136. method = 'getbuffer'
  137. if not hasattr(fp, method):
  138. method = 'getvalue'
  139. with os.fdopen(parent_w, 'wb') as f:
  140. f.write(getattr(fp, method)())
  141. self.pid = pid
  142. finally:
  143. if parent_r is not None:
  144. util.Finalize(self, os.close, (parent_r,))
  145. for fd in (child_r, child_w):
  146. if fd is not None:
  147. os.close(fd)
  148. @staticmethod
  149. def thread_is_spawning():
  150. return True
  151. if __name__ == '__main__':
  152. import argparse
  153. parser = argparse.ArgumentParser('Command line parser')
  154. parser.add_argument('--pipe', type=int, required=True,
  155. help='File handle for the pipe')
  156. parser.add_argument('--process-name', type=str, default=None,
  157. help='Identifier for debugging purpose')
  158. args = parser.parse_args()
  159. info = dict()
  160. exitcode = 1
  161. try:
  162. with os.fdopen(args.pipe, 'rb') as from_parent:
  163. process.current_process()._inheriting = True
  164. try:
  165. prep_data = pickle.load(from_parent)
  166. spawn.prepare(prep_data)
  167. process_obj = pickle.load(from_parent)
  168. finally:
  169. del process.current_process()._inheriting
  170. exitcode = process_obj._bootstrap()
  171. except Exception:
  172. print('\n\n' + '-' * 80)
  173. print('{} failed with traceback: '.format(args.process_name))
  174. print('-' * 80)
  175. import traceback
  176. print(traceback.format_exc())
  177. print('\n' + '-' * 80)
  178. finally:
  179. if from_parent is not None:
  180. from_parent.close()
  181. sys.exit(exitcode)