spawn.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. ###############################################################################
  2. # Prepares and processes the data to setup the new process environment
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from multiprocessing/spawn.py (17/02/2017)
  7. # * Improve logging data
  8. #
  9. import os
  10. import sys
  11. import runpy
  12. import types
  13. from multiprocessing import process, util
  14. if sys.platform != 'win32':
  15. WINEXE = False
  16. WINSERVICE = False
  17. else:
  18. import msvcrt
  19. from .reduction import duplicate
  20. WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
  21. WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
  22. if WINSERVICE:
  23. _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
  24. else:
  25. _python_exe = sys.executable
  26. def get_executable():
  27. return _python_exe
  28. def _check_not_importing_main():
  29. if getattr(process.current_process(), '_inheriting', False):
  30. raise RuntimeError('''
  31. An attempt has been made to start a new process before the
  32. current process has finished its bootstrapping phase.
  33. This probably means that you are not using fork to start your
  34. child processes and you have forgotten to use the proper idiom
  35. in the main module:
  36. if __name__ == '__main__':
  37. freeze_support()
  38. ...
  39. The "freeze_support()" line can be omitted if the program
  40. is not going to be frozen to produce an executable.''')
  41. def get_preparation_data(name, init_main_module=True):
  42. '''
  43. Return info about parent needed by child to unpickle process object
  44. '''
  45. _check_not_importing_main()
  46. d = dict(
  47. log_to_stderr=util._log_to_stderr,
  48. authkey=bytes(process.current_process().authkey),
  49. name=name,
  50. sys_argv=sys.argv,
  51. orig_dir=process.ORIGINAL_DIR,
  52. dir=os.getcwd()
  53. )
  54. # Send sys_path and make sure the current directory will not be changed
  55. sys_path = [p for p in sys.path]
  56. try:
  57. i = sys_path.index('')
  58. except ValueError:
  59. pass
  60. else:
  61. sys_path[i] = process.ORIGINAL_DIR
  62. d['sys_path'] = sys_path
  63. # Make sure to pass the information if the multiprocessing logger is active
  64. if util._logger is not None:
  65. d['log_level'] = util._logger.getEffectiveLevel()
  66. if len(util._logger.handlers) > 0:
  67. h = util._logger.handlers[0]
  68. d['log_fmt'] = h.formatter._fmt
  69. # Tell the child how to communicate with the resource_tracker
  70. from .resource_tracker import _resource_tracker
  71. _resource_tracker.ensure_running()
  72. d["tracker_args"] = {"pid": _resource_tracker._pid}
  73. if sys.platform == "win32":
  74. child_w = duplicate(
  75. msvcrt.get_osfhandle(_resource_tracker._fd), inheritable=True)
  76. d["tracker_args"]["fh"] = child_w
  77. else:
  78. d["tracker_args"]["fd"] = _resource_tracker._fd
  79. if sys.version_info >= (3, 8) and os.name == 'posix':
  80. # joblib/loky#242: allow loky processes to retrieve the resource
  81. # tracker of their parent in case the child processes depickles
  82. # shared_memory objects, that are still tracked by multiprocessing's
  83. # resource_tracker by default.
  84. # XXX: this is a workaround that may be error prone: in the future, it
  85. # would be better to have loky subclass multiprocessing's shared_memory
  86. # to force registration of shared_memory segments via loky's
  87. # resource_tracker.
  88. from multiprocessing.resource_tracker import (
  89. _resource_tracker as mp_resource_tracker
  90. )
  91. # multiprocessing's resource_tracker must be running before loky
  92. # process is created (othewise the child won't be able to use it if it
  93. # is created later on)
  94. mp_resource_tracker.ensure_running()
  95. d["mp_tracker_args"] = {
  96. 'fd': mp_resource_tracker._fd, 'pid': mp_resource_tracker._pid
  97. }
  98. # Figure out whether to initialise main in the subprocess as a module
  99. # or through direct execution (or to leave it alone entirely)
  100. if init_main_module:
  101. main_module = sys.modules['__main__']
  102. try:
  103. main_mod_name = getattr(main_module.__spec__, "name", None)
  104. except BaseException:
  105. main_mod_name = None
  106. if main_mod_name is not None:
  107. d['init_main_from_name'] = main_mod_name
  108. elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
  109. main_path = getattr(main_module, '__file__', None)
  110. if main_path is not None:
  111. if (not os.path.isabs(main_path) and
  112. process.ORIGINAL_DIR is not None):
  113. main_path = os.path.join(process.ORIGINAL_DIR, main_path)
  114. d['init_main_from_path'] = os.path.normpath(main_path)
  115. # Compat for python2.7
  116. d['main_path'] = d['init_main_from_path']
  117. return d
  118. #
  119. # Prepare current process
  120. #
  121. old_main_modules = []
  122. def prepare(data):
  123. '''
  124. Try to get current process ready to unpickle process object
  125. '''
  126. if 'name' in data:
  127. process.current_process().name = data['name']
  128. if 'authkey' in data:
  129. process.current_process().authkey = data['authkey']
  130. if 'log_to_stderr' in data and data['log_to_stderr']:
  131. util.log_to_stderr()
  132. if 'log_level' in data:
  133. util.get_logger().setLevel(data['log_level'])
  134. if 'log_fmt' in data:
  135. import logging
  136. util.get_logger().handlers[0].setFormatter(
  137. logging.Formatter(data['log_fmt'])
  138. )
  139. if 'sys_path' in data:
  140. sys.path = data['sys_path']
  141. if 'sys_argv' in data:
  142. sys.argv = data['sys_argv']
  143. if 'dir' in data:
  144. os.chdir(data['dir'])
  145. if 'orig_dir' in data:
  146. process.ORIGINAL_DIR = data['orig_dir']
  147. if 'mp_tracker_args' in data:
  148. from multiprocessing.resource_tracker import (
  149. _resource_tracker as mp_resource_tracker
  150. )
  151. mp_resource_tracker._fd = data['mp_tracker_args']['fd']
  152. mp_resource_tracker._pid = data['mp_tracker_args']['pid']
  153. if 'tracker_args' in data:
  154. from .resource_tracker import _resource_tracker
  155. _resource_tracker._pid = data["tracker_args"]['pid']
  156. if sys.platform == 'win32':
  157. handle = data["tracker_args"]["fh"]
  158. _resource_tracker._fd = msvcrt.open_osfhandle(handle, 0)
  159. else:
  160. _resource_tracker._fd = data["tracker_args"]["fd"]
  161. if 'init_main_from_name' in data:
  162. _fixup_main_from_name(data['init_main_from_name'])
  163. elif 'init_main_from_path' in data:
  164. _fixup_main_from_path(data['init_main_from_path'])
  165. # Multiprocessing module helpers to fix up the main module in
  166. # spawned subprocesses
  167. def _fixup_main_from_name(mod_name):
  168. # __main__.py files for packages, directories, zip archives, etc, run
  169. # their "main only" code unconditionally, so we don't even try to
  170. # populate anything in __main__, nor do we make any changes to
  171. # __main__ attributes
  172. current_main = sys.modules['__main__']
  173. if mod_name == "__main__" or mod_name.endswith(".__main__"):
  174. return
  175. # If this process was forked, __main__ may already be populated
  176. if getattr(current_main.__spec__, "name", None) == mod_name:
  177. return
  178. # Otherwise, __main__ may contain some non-main code where we need to
  179. # support unpickling it properly. We rerun it as __mp_main__ and make
  180. # the normal __main__ an alias to that
  181. old_main_modules.append(current_main)
  182. main_module = types.ModuleType("__mp_main__")
  183. main_content = runpy.run_module(mod_name,
  184. run_name="__mp_main__",
  185. alter_sys=True)
  186. main_module.__dict__.update(main_content)
  187. sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
  188. def _fixup_main_from_path(main_path):
  189. # If this process was forked, __main__ may already be populated
  190. current_main = sys.modules['__main__']
  191. # Unfortunately, the main ipython launch script historically had no
  192. # "if __name__ == '__main__'" guard, so we work around that
  193. # by treating it like a __main__.py file
  194. # See https://github.com/ipython/ipython/issues/4698
  195. main_name = os.path.splitext(os.path.basename(main_path))[0]
  196. if main_name == 'ipython':
  197. return
  198. # Otherwise, if __file__ already has the setting we expect,
  199. # there's nothing more to do
  200. if getattr(current_main, '__file__', None) == main_path:
  201. return
  202. # If the parent process has sent a path through rather than a module
  203. # name we assume it is an executable script that may contain
  204. # non-main code that needs to be executed
  205. old_main_modules.append(current_main)
  206. main_module = types.ModuleType("__mp_main__")
  207. main_content = runpy.run_path(main_path,
  208. run_name="__mp_main__")
  209. main_module.__dict__.update(main_content)
  210. sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
  211. def import_main_path(main_path):
  212. '''
  213. Set sys.modules['__main__'] to module at main_path
  214. '''
  215. _fixup_main_from_path(main_path)