popen_loky_win32.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import os
  2. import sys
  3. from pickle import load
  4. from multiprocessing import process, util
  5. from . import spawn
  6. from . import reduction
  7. from .context import get_spawning_popen, set_spawning_popen
  8. if sys.platform == "win32":
  9. # Avoid import error by code introspection tools such as test runners
  10. # trying to import this module while running on non-Windows systems.
  11. import msvcrt
  12. from .compat_win32 import _winapi
  13. from .compat_win32 import Popen as _Popen
  14. from .reduction import duplicate
  15. else:
  16. _Popen = object
  17. if sys.version_info[:2] < (3, 3):
  18. from os import fdopen as open
  19. __all__ = ['Popen']
  20. #
  21. #
  22. #
  23. TERMINATE = 0x10000
  24. WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
  25. WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
  26. def _path_eq(p1, p2):
  27. return p1 == p2 or os.path.normcase(p1) == os.path.normcase(p2)
  28. WINENV = (hasattr(sys, "_base_executable")
  29. and not _path_eq(sys.executable, sys._base_executable))
  30. #
  31. # We define a Popen class similar to the one from subprocess, but
  32. # whose constructor takes a process object as its argument.
  33. #
  34. class Popen(_Popen):
  35. '''
  36. Start a subprocess to run the code of a process object
  37. '''
  38. method = 'loky'
  39. def __init__(self, process_obj):
  40. prep_data = spawn.get_preparation_data(
  41. process_obj._name, getattr(process_obj, "init_main_module", True))
  42. # read end of pipe will be "stolen" by the child process
  43. # -- see spawn_main() in spawn.py.
  44. rfd, wfd = os.pipe()
  45. rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
  46. os.close(rfd)
  47. cmd = get_command_line(parent_pid=os.getpid(), pipe_handle=rhandle)
  48. cmd = ' '.join('"%s"' % x for x in cmd)
  49. python_exe = spawn.get_executable()
  50. # copy the environment variables to set in the child process
  51. child_env = os.environ.copy()
  52. child_env.update(process_obj.env)
  53. # bpo-35797: When running in a venv, we bypass the redirect
  54. # executor and launch our base Python.
  55. if WINENV and _path_eq(python_exe, sys.executable):
  56. python_exe = sys._base_executable
  57. child_env["__PYVENV_LAUNCHER__"] = sys.executable
  58. try:
  59. with open(wfd, 'wb') as to_child:
  60. # start process
  61. try:
  62. # This flag allows to pass inheritable handles from the
  63. # parent to the child process in a python2-3 compatible way
  64. # (see
  65. # https://github.com/tomMoral/loky/pull/204#discussion_r290719629
  66. # for more detail). When support for Python 2 is dropped,
  67. # the cleaner multiprocessing.reduction.steal_handle should
  68. # be used instead.
  69. inherit = True
  70. hp, ht, pid, tid = _winapi.CreateProcess(
  71. python_exe, cmd,
  72. None, None, inherit, 0,
  73. child_env, None, None)
  74. _winapi.CloseHandle(ht)
  75. except BaseException:
  76. _winapi.CloseHandle(rhandle)
  77. raise
  78. # set attributes of self
  79. self.pid = pid
  80. self.returncode = None
  81. self._handle = hp
  82. self.sentinel = int(hp)
  83. util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
  84. # send information to child
  85. set_spawning_popen(self)
  86. if sys.version_info[:2] < (3, 4):
  87. Popen._tls.process_handle = int(hp)
  88. try:
  89. reduction.dump(prep_data, to_child)
  90. reduction.dump(process_obj, to_child)
  91. finally:
  92. set_spawning_popen(None)
  93. if sys.version_info[:2] < (3, 4):
  94. del Popen._tls.process_handle
  95. except IOError as exc:
  96. # IOError 22 happens when the launched subprocess terminated before
  97. # wfd.close is called. Thus we can safely ignore it.
  98. if exc.errno != 22:
  99. raise
  100. util.debug("While starting {}, ignored a IOError 22"
  101. .format(process_obj._name))
  102. def duplicate_for_child(self, handle):
  103. assert self is get_spawning_popen()
  104. return duplicate(handle, self.sentinel)
  105. def get_command_line(pipe_handle, **kwds):
  106. '''
  107. Returns prefix of command line used for spawning a child process
  108. '''
  109. if getattr(sys, 'frozen', False):
  110. return ([sys.executable, '--multiprocessing-fork', pipe_handle])
  111. else:
  112. prog = 'from joblib.externals.loky.backend.popen_loky_win32 import main; main()'
  113. opts = util._args_from_interpreter_flags()
  114. return [spawn.get_executable()] + opts + [
  115. '-c', prog, '--multiprocessing-fork', pipe_handle]
  116. def is_forking(argv):
  117. '''
  118. Return whether commandline indicates we are forking
  119. '''
  120. if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
  121. assert len(argv) == 3
  122. return True
  123. else:
  124. return False
  125. def main():
  126. '''
  127. Run code specified by data received over pipe
  128. '''
  129. assert is_forking(sys.argv)
  130. handle = int(sys.argv[-1])
  131. fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
  132. from_parent = os.fdopen(fd, 'rb')
  133. process.current_process()._inheriting = True
  134. preparation_data = load(from_parent)
  135. spawn.prepare(preparation_data)
  136. self = load(from_parent)
  137. process.current_process()._inheriting = False
  138. from_parent.close()
  139. exitcode = self._bootstrap()
  140. exit(exitcode)