reduction.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. ###############################################################################
  2. # Customizable Pickler with some basic reducers
  3. #
  4. # author: Thomas Moreau
  5. #
  6. # adapted from multiprocessing/reduction.py (17/02/2017)
  7. # * Replace the ForkingPickler with a similar _LokyPickler,
  8. # * Add CustomizableLokyPickler to allow customizing pickling process
  9. # on the fly.
  10. #
  11. import io
  12. import os
  13. import sys
  14. import functools
  15. from multiprocessing import util
  16. try:
  17. # Python 2 compat
  18. from cPickle import loads as pickle_loads
  19. except ImportError:
  20. from pickle import loads as pickle_loads
  21. import copyreg
  22. from pickle import HIGHEST_PROTOCOL
  23. if sys.platform == "win32":
  24. if sys.version_info[:2] > (3, 3):
  25. from multiprocessing.reduction import duplicate
  26. else:
  27. from multiprocessing.forking import duplicate
  28. ###############################################################################
  29. # Enable custom pickling in Loky.
  30. # To allow instance customization of the pickling process, we use 2 classes.
  31. # _ReducerRegistry gives module level customization and CustomizablePickler
  32. # permits to use instance base custom reducers. Only CustomizablePickler
  33. # should be used.
  34. class _ReducerRegistry(object):
  35. """Registry for custom reducers.
  36. HIGHEST_PROTOCOL is selected by default as this pickler is used
  37. to pickle ephemeral datastructures for interprocess communication
  38. hence no backward compatibility is required.
  39. """
  40. # We override the pure Python pickler as its the only way to be able to
  41. # customize the dispatch table without side effects in Python 2.6
  42. # to 3.2. For Python 3.3+ leverage the new dispatch_table
  43. # feature from http://bugs.python.org/issue14166 that makes it possible
  44. # to use the C implementation of the Pickler which is faster.
  45. dispatch_table = {}
  46. @classmethod
  47. def register(cls, type, reduce_func):
  48. """Attach a reducer function to a given type in the dispatch table."""
  49. if sys.version_info < (3,):
  50. # Python 2 pickler dispatching is not explicitly customizable.
  51. # Let us use a closure to workaround this limitation.
  52. def dispatcher(cls, obj):
  53. reduced = reduce_func(obj)
  54. cls.save_reduce(obj=obj, *reduced)
  55. cls.dispatch_table[type] = dispatcher
  56. else:
  57. cls.dispatch_table[type] = reduce_func
  58. ###############################################################################
  59. # Registers extra pickling routines to improve picklization for loky
  60. register = _ReducerRegistry.register
  61. # make methods picklable
  62. def _reduce_method(m):
  63. if m.__self__ is None:
  64. return getattr, (m.__class__, m.__func__.__name__)
  65. else:
  66. return getattr, (m.__self__, m.__func__.__name__)
  67. class _C:
  68. def f(self):
  69. pass
  70. @classmethod
  71. def h(cls):
  72. pass
  73. register(type(_C().f), _reduce_method)
  74. register(type(_C.h), _reduce_method)
  75. if not hasattr(sys, "pypy_version_info"):
  76. # PyPy uses functions instead of method_descriptors and wrapper_descriptors
  77. def _reduce_method_descriptor(m):
  78. return getattr, (m.__objclass__, m.__name__)
  79. register(type(list.append), _reduce_method_descriptor)
  80. register(type(int.__add__), _reduce_method_descriptor)
  81. # Make partial func pickable
  82. def _reduce_partial(p):
  83. return _rebuild_partial, (p.func, p.args, p.keywords or {})
  84. def _rebuild_partial(func, args, keywords):
  85. return functools.partial(func, *args, **keywords)
  86. register(functools.partial, _reduce_partial)
  87. if sys.platform != "win32":
  88. from ._posix_reduction import _mk_inheritable # noqa: F401
  89. else:
  90. from . import _win_reduction # noqa: F401
  91. # global variable to change the pickler behavior
  92. try:
  93. from joblib.externals import cloudpickle # noqa: F401
  94. DEFAULT_ENV = "cloudpickle"
  95. except ImportError:
  96. # If cloudpickle is not present, fallback to pickle
  97. DEFAULT_ENV = "pickle"
  98. ENV_LOKY_PICKLER = os.environ.get("LOKY_PICKLER", DEFAULT_ENV)
  99. _LokyPickler = None
  100. _loky_pickler_name = None
  101. def set_loky_pickler(loky_pickler=None):
  102. global _LokyPickler, _loky_pickler_name
  103. if loky_pickler is None:
  104. loky_pickler = ENV_LOKY_PICKLER
  105. loky_pickler_cls = None
  106. # The default loky_pickler is cloudpickle
  107. if loky_pickler in ["", None]:
  108. loky_pickler = "cloudpickle"
  109. if loky_pickler == _loky_pickler_name:
  110. return
  111. if loky_pickler == "cloudpickle":
  112. from joblib.externals.cloudpickle import CloudPickler as loky_pickler_cls
  113. else:
  114. try:
  115. from importlib import import_module
  116. module_pickle = import_module(loky_pickler)
  117. loky_pickler_cls = module_pickle.Pickler
  118. except (ImportError, AttributeError) as e:
  119. extra_info = ("\nThis error occurred while setting loky_pickler to"
  120. " '{}', as required by the env variable LOKY_PICKLER"
  121. " or the function set_loky_pickler."
  122. .format(loky_pickler))
  123. e.args = (e.args[0] + extra_info,) + e.args[1:]
  124. e.msg = e.args[0]
  125. raise e
  126. util.debug("Using '{}' for serialization."
  127. .format(loky_pickler if loky_pickler else "cloudpickle"))
  128. class CustomizablePickler(loky_pickler_cls):
  129. _loky_pickler_cls = loky_pickler_cls
  130. if sys.version_info < (3,):
  131. # Make the dispatch registry an instance level attribute instead of
  132. # a reference to the class dictionary under Python 2
  133. _dispatch = loky_pickler_cls.dispatch.copy()
  134. _dispatch.update(_ReducerRegistry.dispatch_table)
  135. else:
  136. # Under Python 3 initialize the dispatch table with a copy of the
  137. # default registry
  138. _dispatch_table = copyreg.dispatch_table.copy()
  139. _dispatch_table.update(_ReducerRegistry.dispatch_table)
  140. def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL):
  141. loky_pickler_cls.__init__(self, writer, protocol=protocol)
  142. if reducers is None:
  143. reducers = {}
  144. if sys.version_info < (3,):
  145. self.dispatch = self._dispatch.copy()
  146. else:
  147. if getattr(self, "dispatch_table", None) is not None:
  148. self.dispatch_table.update(self._dispatch_table.copy())
  149. else:
  150. self.dispatch_table = self._dispatch_table.copy()
  151. for type, reduce_func in reducers.items():
  152. self.register(type, reduce_func)
  153. def register(self, type, reduce_func):
  154. """Attach a reducer function to a given type in the dispatch table.
  155. """
  156. if sys.version_info < (3,):
  157. # Python 2 pickler dispatching is not explicitly customizable.
  158. # Let us use a closure to workaround this limitation.
  159. def dispatcher(self, obj):
  160. reduced = reduce_func(obj)
  161. self.save_reduce(obj=obj, *reduced)
  162. self.dispatch[type] = dispatcher
  163. else:
  164. self.dispatch_table[type] = reduce_func
  165. _LokyPickler = CustomizablePickler
  166. _loky_pickler_name = loky_pickler
  167. def get_loky_pickler_name():
  168. global _loky_pickler_name
  169. return _loky_pickler_name
  170. def get_loky_pickler():
  171. global _LokyPickler
  172. return _LokyPickler
  173. # Set it to its default value
  174. set_loky_pickler()
  175. def loads(buf):
  176. # Compat for python2.7 version
  177. if sys.version_info < (3, 3) and isinstance(buf, io.BytesIO):
  178. buf = buf.getvalue()
  179. return pickle_loads(buf)
  180. def dump(obj, file, reducers=None, protocol=None):
  181. '''Replacement for pickle.dump() using _LokyPickler.'''
  182. global _LokyPickler
  183. _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  184. def dumps(obj, reducers=None, protocol=None):
  185. global _LokyPickler
  186. buf = io.BytesIO()
  187. dump(obj, buf, reducers=reducers, protocol=protocol)
  188. if sys.version_info < (3, 3):
  189. return buf.getvalue()
  190. return buf.getbuffer()
  191. __all__ = ["dump", "dumps", "loads", "register", "set_loky_pickler"]
  192. if sys.platform == "win32":
  193. __all__ += ["duplicate"]