| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- ###############################################################################
- # Popen for LokyProcess.
- #
- # author: Thomas Moreau and Olivier Grisel
- #
- import os
- import sys
- import signal
- import pickle
- from io import BytesIO
- from . import reduction, spawn
- from .context import get_spawning_popen, set_spawning_popen
- from multiprocessing import util, process
- if sys.version_info[:2] < (3, 3):
- ProcessLookupError = OSError
- if sys.platform != "win32":
- from . import resource_tracker
- __all__ = []
- if sys.platform != "win32":
- #
- # Wrapper for an fd used while launching a process
- #
- class _DupFd(object):
- def __init__(self, fd):
- self.fd = reduction._mk_inheritable(fd)
- def detach(self):
- return self.fd
- #
- # Start child process using subprocess.Popen
- #
- __all__.append('Popen')
- class Popen(object):
- method = 'loky'
- DupFd = _DupFd
- def __init__(self, process_obj):
- sys.stdout.flush()
- sys.stderr.flush()
- self.returncode = None
- self._fds = []
- self._launch(process_obj)
- if sys.version_info < (3, 4):
- @classmethod
- def duplicate_for_child(cls, fd):
- popen = get_spawning_popen()
- popen._fds.append(fd)
- return reduction._mk_inheritable(fd)
- else:
- def duplicate_for_child(self, fd):
- self._fds.append(fd)
- return reduction._mk_inheritable(fd)
- def poll(self, flag=os.WNOHANG):
- if self.returncode is None:
- while True:
- try:
- pid, sts = os.waitpid(self.pid, flag)
- except OSError:
- # Child process not yet created. See #1731717
- # e.errno == errno.ECHILD == 10
- return None
- else:
- break
- if pid == self.pid:
- if os.WIFSIGNALED(sts):
- self.returncode = -os.WTERMSIG(sts)
- else:
- assert os.WIFEXITED(sts)
- self.returncode = os.WEXITSTATUS(sts)
- return self.returncode
- def wait(self, timeout=None):
- if sys.version_info < (3, 3):
- import time
- if timeout is None:
- return self.poll(0)
- deadline = time.time() + timeout
- delay = 0.0005
- while 1:
- res = self.poll()
- if res is not None:
- break
- remaining = deadline - time.time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, 0.05)
- time.sleep(delay)
- return res
- if self.returncode is None:
- if timeout is not None:
- from multiprocessing.connection import wait
- if not wait([self.sentinel], timeout):
- return None
- # This shouldn't block if wait() returned successfully.
- return self.poll(os.WNOHANG if timeout == 0.0 else 0)
- return self.returncode
- def terminate(self):
- if self.returncode is None:
- try:
- os.kill(self.pid, signal.SIGTERM)
- except ProcessLookupError:
- pass
- except OSError:
- if self.wait(timeout=0.1) is None:
- raise
- def _launch(self, process_obj):
- tracker_fd = resource_tracker._resource_tracker.getfd()
- fp = BytesIO()
- set_spawning_popen(self)
- try:
- prep_data = spawn.get_preparation_data(
- process_obj._name,
- getattr(process_obj, "init_main_module", True))
- reduction.dump(prep_data, fp)
- reduction.dump(process_obj, fp)
- finally:
- set_spawning_popen(None)
- try:
- parent_r, child_w = os.pipe()
- child_r, parent_w = os.pipe()
- # for fd in self._fds:
- # _mk_inheritable(fd)
- cmd_python = [sys.executable]
- cmd_python += ['-m', self.__module__]
- cmd_python += ['--process-name', str(process_obj.name)]
- cmd_python += ['--pipe',
- str(reduction._mk_inheritable(child_r))]
- reduction._mk_inheritable(child_w)
- reduction._mk_inheritable(tracker_fd)
- self._fds.extend([child_r, child_w, tracker_fd])
- if sys.version_info >= (3, 8) and os.name == 'posix':
- mp_tracker_fd = prep_data['mp_tracker_args']['fd']
- self.duplicate_for_child(mp_tracker_fd)
- from .fork_exec import fork_exec
- pid = fork_exec(cmd_python, self._fds, env=process_obj.env)
- util.debug("launched python with pid {} and cmd:\n{}"
- .format(pid, cmd_python))
- self.sentinel = parent_r
- method = 'getbuffer'
- if not hasattr(fp, method):
- method = 'getvalue'
- with os.fdopen(parent_w, 'wb') as f:
- f.write(getattr(fp, method)())
- self.pid = pid
- finally:
- if parent_r is not None:
- util.Finalize(self, os.close, (parent_r,))
- for fd in (child_r, child_w):
- if fd is not None:
- os.close(fd)
- @staticmethod
- def thread_is_spawning():
- return True
- if __name__ == '__main__':
- import argparse
- parser = argparse.ArgumentParser('Command line parser')
- parser.add_argument('--pipe', type=int, required=True,
- help='File handle for the pipe')
- parser.add_argument('--process-name', type=str, default=None,
- help='Identifier for debugging purpose')
- args = parser.parse_args()
- info = dict()
- exitcode = 1
- try:
- with os.fdopen(args.pipe, 'rb') as from_parent:
- process.current_process()._inheriting = True
- try:
- prep_data = pickle.load(from_parent)
- spawn.prepare(prep_data)
- process_obj = pickle.load(from_parent)
- finally:
- del process.current_process()._inheriting
- exitcode = process_obj._bootstrap()
- except Exception:
- print('\n\n' + '-' * 80)
- print('{} failed with traceback: '.format(args.process_name))
- print('-' * 80)
- import traceback
- print(traceback.format_exc())
- print('\n' + '-' * 80)
- finally:
- if from_parent is not None:
- from_parent.close()
- sys.exit(exitcode)
|