_posix_reduction.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. ###############################################################################
  2. # Extra reducers for Unix based system and connections objects
  3. #
  4. # author: Thomas Moreau and Olivier Grisel
  5. #
  6. # adapted from multiprocessing/reduction.py (17/02/2017)
  7. # * Add adapted reduction for LokyProcesses and socket/Connection
  8. #
  9. import os
  10. import sys
  11. import socket
  12. import _socket
  13. from .reduction import register
  14. from .context import get_spawning_popen
  15. if sys.version_info >= (3, 3):
  16. from multiprocessing.connection import Connection
  17. else:
  18. from _multiprocessing import Connection
  19. HAVE_SEND_HANDLE = (hasattr(socket, 'CMSG_LEN') and
  20. hasattr(socket, 'SCM_RIGHTS') and
  21. hasattr(socket.socket, 'sendmsg'))
  22. def _mk_inheritable(fd):
  23. if sys.version_info[:2] > (3, 3):
  24. os.set_inheritable(fd, True)
  25. return fd
  26. def DupFd(fd):
  27. '''Return a wrapper for an fd.'''
  28. popen_obj = get_spawning_popen()
  29. if popen_obj is not None:
  30. return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
  31. elif HAVE_SEND_HANDLE and sys.version_info[:2] > (3, 3):
  32. from multiprocessing import resource_sharer
  33. return resource_sharer.DupFd(fd)
  34. else:
  35. raise TypeError(
  36. 'Cannot pickle connection object. This object can only be '
  37. 'passed when spawning a new process'
  38. )
  39. if sys.version_info[:2] != (3, 3):
  40. def _reduce_socket(s):
  41. df = DupFd(s.fileno())
  42. return _rebuild_socket, (df, s.family, s.type, s.proto)
  43. def _rebuild_socket(df, family, type, proto):
  44. fd = df.detach()
  45. return socket.fromfd(fd, family, type, proto)
  46. else:
  47. from multiprocessing.reduction import reduce_socket as _reduce_socket
  48. register(socket.socket, _reduce_socket)
  49. register(_socket.socket, _reduce_socket)
  50. if sys.version_info[:2] != (3, 3):
  51. def reduce_connection(conn):
  52. df = DupFd(conn.fileno())
  53. return rebuild_connection, (df, conn.readable, conn.writable)
  54. def rebuild_connection(df, readable, writable):
  55. fd = df.detach()
  56. return Connection(fd, readable, writable)
  57. else:
  58. from multiprocessing.reduction import reduce_connection
  59. register(Connection, reduce_connection)