| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- ###############################################################################
- # Extra reducers for Windows system and connections objects
- #
- # author: Thomas Moreau and Olivier Grisel
- #
- # adapted from multiprocessing/reduction.py (17/02/2017)
- # * Add adapted reduction for LokyProcesses and socket/PipeConnection
- #
- import os
- import sys
- import socket
- from .reduction import register
- if sys.platform == 'win32':
- if sys.version_info[:2] < (3, 3):
- from _multiprocessing import PipeConnection
- else:
- import _winapi
- from multiprocessing.connection import PipeConnection
- if sys.version_info[:2] >= (3, 4) and sys.platform == 'win32':
- class DupHandle(object):
- def __init__(self, handle, access, pid=None):
- # duplicate handle for process with given pid
- if pid is None:
- pid = os.getpid()
- proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
- try:
- self._handle = _winapi.DuplicateHandle(
- _winapi.GetCurrentProcess(),
- handle, proc, access, False, 0)
- finally:
- _winapi.CloseHandle(proc)
- self._access = access
- self._pid = pid
- def detach(self):
- # retrieve handle from process which currently owns it
- if self._pid == os.getpid():
- return self._handle
- proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
- self._pid)
- try:
- return _winapi.DuplicateHandle(
- proc, self._handle, _winapi.GetCurrentProcess(),
- self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
- finally:
- _winapi.CloseHandle(proc)
- def reduce_pipe_connection(conn):
- access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
- (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
- dh = DupHandle(conn.fileno(), access)
- return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
- def rebuild_pipe_connection(dh, readable, writable):
- from multiprocessing.connection import PipeConnection
- handle = dh.detach()
- return PipeConnection(handle, readable, writable)
- register(PipeConnection, reduce_pipe_connection)
- elif sys.platform == 'win32':
- # Older Python versions
- from multiprocessing.reduction import reduce_pipe_connection
- register(PipeConnection, reduce_pipe_connection)
- if sys.version_info[:2] < (3, 3) and sys.platform == 'win32':
- from _multiprocessing import win32
- from multiprocessing.reduction import reduce_handle, rebuild_handle
- close = win32.CloseHandle
- def fromfd(handle, family, type_, proto=0):
- s = socket.socket(family, type_, proto, fileno=handle)
- if s.__class__ is not socket.socket:
- s = socket.socket(_sock=s)
- return s
- def reduce_socket(s):
- if not hasattr(socket, "fromfd"):
- raise TypeError("sockets cannot be pickled on this system.")
- reduced_handle = reduce_handle(s.fileno())
- return _rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
- def _rebuild_socket(reduced_handle, family, type_, proto):
- handle = rebuild_handle(reduced_handle)
- s = fromfd(handle, family, type_, proto)
- close(handle)
- return s
- register(socket.socket, reduce_socket)
- elif sys.version_info[:2] < (3, 4):
- from multiprocessing.reduction import reduce_socket
- register(socket.socket, reduce_socket)
- else:
- from multiprocessing.reduction import _reduce_socket
- register(socket.socket, _reduce_socket)
|