_win_reduction.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. ###############################################################################
  2. # Extra reducers for Windows system and connections objects
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from multiprocessing/reduction.py (17/02/2017)
  7. # * Add adapted reduction for LokyProcesses and socket/PipeConnection
  8. #
  9. import os
  10. import sys
  11. import socket
  12. from .reduction import register
  13. if sys.platform == 'win32':
  14. if sys.version_info[:2] < (3, 3):
  15. from _multiprocessing import PipeConnection
  16. else:
  17. import _winapi
  18. from multiprocessing.connection import PipeConnection
  19. if sys.version_info[:2] >= (3, 4) and sys.platform == 'win32':
  20. class DupHandle(object):
  21. def __init__(self, handle, access, pid=None):
  22. # duplicate handle for process with given pid
  23. if pid is None:
  24. pid = os.getpid()
  25. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
  26. try:
  27. self._handle = _winapi.DuplicateHandle(
  28. _winapi.GetCurrentProcess(),
  29. handle, proc, access, False, 0)
  30. finally:
  31. _winapi.CloseHandle(proc)
  32. self._access = access
  33. self._pid = pid
  34. def detach(self):
  35. # retrieve handle from process which currently owns it
  36. if self._pid == os.getpid():
  37. return self._handle
  38. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
  39. self._pid)
  40. try:
  41. return _winapi.DuplicateHandle(
  42. proc, self._handle, _winapi.GetCurrentProcess(),
  43. self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
  44. finally:
  45. _winapi.CloseHandle(proc)
  46. def reduce_pipe_connection(conn):
  47. access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
  48. (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
  49. dh = DupHandle(conn.fileno(), access)
  50. return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
  51. def rebuild_pipe_connection(dh, readable, writable):
  52. from multiprocessing.connection import PipeConnection
  53. handle = dh.detach()
  54. return PipeConnection(handle, readable, writable)
  55. register(PipeConnection, reduce_pipe_connection)
  56. elif sys.platform == 'win32':
  57. # Older Python versions
  58. from multiprocessing.reduction import reduce_pipe_connection
  59. register(PipeConnection, reduce_pipe_connection)
  60. if sys.version_info[:2] < (3, 3) and sys.platform == 'win32':
  61. from _multiprocessing import win32
  62. from multiprocessing.reduction import reduce_handle, rebuild_handle
  63. close = win32.CloseHandle
  64. def fromfd(handle, family, type_, proto=0):
  65. s = socket.socket(family, type_, proto, fileno=handle)
  66. if s.__class__ is not socket.socket:
  67. s = socket.socket(_sock=s)
  68. return s
  69. def reduce_socket(s):
  70. if not hasattr(socket, "fromfd"):
  71. raise TypeError("sockets cannot be pickled on this system.")
  72. reduced_handle = reduce_handle(s.fileno())
  73. return _rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
  74. def _rebuild_socket(reduced_handle, family, type_, proto):
  75. handle = rebuild_handle(reduced_handle)
  76. s = fromfd(handle, family, type_, proto)
  77. close(handle)
  78. return s
  79. register(socket.socket, reduce_socket)
  80. elif sys.version_info[:2] < (3, 4):
  81. from multiprocessing.reduction import reduce_socket
  82. register(socket.socket, reduce_socket)
  83. else:
  84. from multiprocessing.reduction import _reduce_socket
  85. register(socket.socket, _reduce_socket)