|
| 1 | +""" |
| 2 | +This module vendors the Dispatcher from bluesky.run_engine, in order |
| 3 | +to avoid a bluesky dependency, since bluesky-tiled-plugins is frequently |
| 4 | +used in data analysis environments where a bluesky dependency would be |
| 5 | +superfluous. |
| 6 | +
|
| 7 | +This dispatcher could in the future be move upstream to event_model |
| 8 | +where it could be shared by bluesky and bluesky-tiled-plugins. |
| 9 | +
|
| 10 | +That code has been extremely stable for about ten years, so divergence |
| 11 | +is not a pressing concern. |
| 12 | +""" |
| 13 | +import sys |
| 14 | +import types |
| 15 | +from itertools import count |
| 16 | +from warnings import warn |
| 17 | +from weakref import WeakKeyDictionary, ref |
| 18 | + |
| 19 | +from event_model import DocumentNames |
| 20 | + |
| 21 | + |
| 22 | +class Dispatcher: |
| 23 | + """Dispatch documents to user-defined consumers on the main thread.""" |
| 24 | + |
| 25 | + def __init__(self): |
| 26 | + self.cb_registry = CallbackRegistry(allowed_sigs=DocumentNames) |
| 27 | + self._counter = count() |
| 28 | + self._token_mapping = dict() # noqa: C408 |
| 29 | + |
| 30 | + def process(self, name, doc): |
| 31 | + """ |
| 32 | + Dispatch document ``doc`` of type ``name`` to the callback registry. |
| 33 | +
|
| 34 | + Parameters |
| 35 | + ---------- |
| 36 | + name : {'start', 'descriptor', 'event', 'stop'} |
| 37 | + doc : dict |
| 38 | + """ |
| 39 | + exceptions = self.cb_registry.process(name, name.name, doc) |
| 40 | + for exc, traceback in exceptions: # noqa: B007 |
| 41 | + warn( # noqa: B028 |
| 42 | + "A %r was raised during the processing of a %s " # noqa: UP031 |
| 43 | + "Document. The error will be ignored to avoid " |
| 44 | + "interrupting data collection. To investigate, " |
| 45 | + "set RunEngine.ignore_callback_exceptions = False " |
| 46 | + "and run again." % (exc, name.name) |
| 47 | + ) |
| 48 | + |
| 49 | + def subscribe(self, func, name="all"): |
| 50 | + """ |
| 51 | + Register a callback function to consume documents. |
| 52 | +
|
| 53 | + .. versionchanged :: 0.10.0 |
| 54 | + The order of the arguments was swapped and the ``name`` |
| 55 | + argument has been given a default value, ``'all'``. Because the |
| 56 | + meaning of the arguments is unambiguous (they must be a callable |
| 57 | + and a string, respectively) the old order will be supported |
| 58 | + indefinitely, with a warning. |
| 59 | +
|
| 60 | + .. versionchanged :: 0.10.0 |
| 61 | + The order of the arguments was swapped and the ``name`` |
| 62 | + argument has been given a default value, ``'all'``. Because the |
| 63 | + meaning of the arguments is unambiguous (they must be a callable |
| 64 | + and a string, respectively) the old order will be supported |
| 65 | + indefinitely, with a warning. |
| 66 | +
|
| 67 | + Parameters |
| 68 | + ---------- |
| 69 | + func: callable |
| 70 | + expecting signature like ``f(name, document)`` |
| 71 | + where name is a string and document is a dict |
| 72 | + name : {'all', 'start', 'descriptor', 'event', 'stop'}, optional |
| 73 | + the type of document this function should receive ('all' by |
| 74 | + default). |
| 75 | +
|
| 76 | + Returns |
| 77 | + ------- |
| 78 | + token : int |
| 79 | + an integer ID that can be used to unsubscribe |
| 80 | +
|
| 81 | + See Also |
| 82 | + -------- |
| 83 | + :meth:`Dispatcher.unsubscribe` |
| 84 | + an integer token that can be used to unsubscribe |
| 85 | + """ |
| 86 | + if callable(name) and isinstance(func, str): |
| 87 | + name, func = func, name |
| 88 | + warn( # noqa: B028 |
| 89 | + "The order of the arguments has been changed. Because the " |
| 90 | + "meaning of the arguments is unambiguous, the old usage will " |
| 91 | + "continue to work indefinitely, but the new usage is " |
| 92 | + "encouraged: call subscribe(func, name) instead of " |
| 93 | + "subscribe(name, func). Additionally, the 'name' argument " |
| 94 | + "has become optional. Its default value is 'all'." |
| 95 | + ) |
| 96 | + if name == "all": |
| 97 | + private_tokens = [] |
| 98 | + for key in DocumentNames: |
| 99 | + private_tokens.append(self.cb_registry.connect(key, func)) |
| 100 | + public_token = next(self._counter) |
| 101 | + self._token_mapping[public_token] = private_tokens |
| 102 | + return public_token |
| 103 | + |
| 104 | + name = DocumentNames[name] |
| 105 | + private_token = self.cb_registry.connect(name, func) |
| 106 | + public_token = next(self._counter) |
| 107 | + self._token_mapping[public_token] = [private_token] |
| 108 | + return public_token |
| 109 | + |
| 110 | + def unsubscribe(self, token): |
| 111 | + """ |
| 112 | + Unregister a callback function using its integer ID. |
| 113 | +
|
| 114 | + Parameters |
| 115 | + ---------- |
| 116 | + token : int |
| 117 | + the integer ID issued by :meth:`Dispatcher.subscribe` |
| 118 | +
|
| 119 | + See Also |
| 120 | + -------- |
| 121 | + :meth:`Dispatcher.subscribe` |
| 122 | + """ |
| 123 | + for private_token in self._token_mapping.pop(token, []): |
| 124 | + self.cb_registry.disconnect(private_token) |
| 125 | + |
| 126 | + def unsubscribe_all(self): |
| 127 | + """Unregister all callbacks from the dispatcher.""" |
| 128 | + for public_token in list(self._token_mapping.keys()): |
| 129 | + self.unsubscribe(public_token) |
| 130 | + |
| 131 | + @property |
| 132 | + def ignore_exceptions(self): |
| 133 | + return self.cb_registry.ignore_exceptions |
| 134 | + |
| 135 | + @ignore_exceptions.setter |
| 136 | + def ignore_exceptions(self, val): |
| 137 | + self.cb_registry.ignore_exceptions = val |
| 138 | + |
| 139 | + |
| 140 | +class CallbackRegistry: |
| 141 | + """ |
| 142 | + See matplotlib.cbook.CallbackRegistry. This is a simplified since |
| 143 | + ``bluesky`` is python3.4+ only! |
| 144 | + """ |
| 145 | + |
| 146 | + def __init__(self, ignore_exceptions=False, allowed_sigs=None): |
| 147 | + self.ignore_exceptions = ignore_exceptions |
| 148 | + self.allowed_sigs = allowed_sigs |
| 149 | + self.callbacks = dict() # noqa: C408 |
| 150 | + self._cid = 0 |
| 151 | + self._func_cid_map = {} |
| 152 | + |
| 153 | + def __getstate__(self): |
| 154 | + # We cannot currently pickle the callables in the registry, so |
| 155 | + # return an empty dictionary. |
| 156 | + return {} |
| 157 | + |
| 158 | + def __setstate__(self, state): |
| 159 | + # re-initialise an empty callback registry |
| 160 | + self.__init__() |
| 161 | + |
| 162 | + def connect(self, sig, func): |
| 163 | + """Register ``func`` to be called when ``sig`` is generated |
| 164 | +
|
| 165 | + Parameters |
| 166 | + ---------- |
| 167 | + sig |
| 168 | + func |
| 169 | +
|
| 170 | + Returns |
| 171 | + ------- |
| 172 | + cid : int |
| 173 | + The callback index. To be used with ``disconnect`` to deregister |
| 174 | + ``func`` so that it will no longer be called when ``sig`` is |
| 175 | + generated |
| 176 | + """ |
| 177 | + if self.allowed_sigs is not None: |
| 178 | + if sig not in self.allowed_sigs: |
| 179 | + raise ValueError(f"Allowed signals are {self.allowed_sigs}") |
| 180 | + self._func_cid_map.setdefault(sig, WeakKeyDictionary()) |
| 181 | + # Note proxy not needed in python 3. |
| 182 | + # TODO rewrite this when support for python2.x gets dropped. |
| 183 | + # Following discussion with TC: weakref.WeakMethod can not be used to |
| 184 | + # replace the custom 'BoundMethodProxy', because it does not accept |
| 185 | + # the 'destroy callback' as a parameter. The 'destroy callback' is |
| 186 | + # necessary to automatically unsubscribe CB registry from the callback |
| 187 | + # when the class object is destroyed and this is the main purpose of |
| 188 | + # BoundMethodProxy. |
| 189 | + proxy = _BoundMethodProxy(func) |
| 190 | + if proxy in self._func_cid_map[sig]: |
| 191 | + return self._func_cid_map[sig][proxy] |
| 192 | + |
| 193 | + proxy.add_destroy_callback(self._remove_proxy) |
| 194 | + self._cid += 1 |
| 195 | + cid = self._cid |
| 196 | + self._func_cid_map[sig][proxy] = cid |
| 197 | + self.callbacks.setdefault(sig, dict()) # noqa: C408 |
| 198 | + self.callbacks[sig][cid] = proxy |
| 199 | + return cid |
| 200 | + |
| 201 | + def _remove_proxy(self, proxy): |
| 202 | + # need the list because `del self._func_cid_map[sig]` mutates the dict |
| 203 | + for sig, proxies in list(self._func_cid_map.items()): |
| 204 | + try: |
| 205 | + # Here we need to delete the last reference to proxy (in 'self.callbacks[sig]') |
| 206 | + # The respective entries in 'self._func_cid_map' are deleted automatically, |
| 207 | + # since 'self._func_cid_map[sig]' entries are WeakKeyDictionary objects. |
| 208 | + del self.callbacks[sig][proxies[proxy]] |
| 209 | + except KeyError: |
| 210 | + pass |
| 211 | + |
| 212 | + # Remove dictionary items for signals with no assigned callbacks |
| 213 | + if len(self.callbacks[sig]) == 0: |
| 214 | + del self.callbacks[sig] |
| 215 | + del self._func_cid_map[sig] |
| 216 | + |
| 217 | + def disconnect(self, cid): |
| 218 | + """Disconnect the callback registered with callback id *cid* |
| 219 | +
|
| 220 | + Parameters |
| 221 | + ---------- |
| 222 | + cid : int |
| 223 | + The callback index and return value from ``connect`` |
| 224 | + """ |
| 225 | + for eventname, callbackd in self.callbacks.items(): # noqa: B007 |
| 226 | + try: |
| 227 | + # This may or may not remove entries in 'self._func_cid_map'. |
| 228 | + del callbackd[cid] |
| 229 | + except KeyError: |
| 230 | + continue |
| 231 | + else: |
| 232 | + # Look for cid in 'self._func_cid_map' as well. It may still be there. |
| 233 | + for sig, functions in self._func_cid_map.items(): # noqa: B007 |
| 234 | + for function, value in list(functions.items()): |
| 235 | + if value == cid: |
| 236 | + del functions[function] |
| 237 | + return |
| 238 | + |
| 239 | + def process(self, sig, *args, **kwargs): |
| 240 | + """Process ``sig`` |
| 241 | +
|
| 242 | + All of the functions registered to receive callbacks on ``sig`` |
| 243 | + will be called with ``args`` and ``kwargs`` |
| 244 | +
|
| 245 | + Parameters |
| 246 | + ---------- |
| 247 | + sig |
| 248 | + args |
| 249 | + kwargs |
| 250 | + """ |
| 251 | + if self.allowed_sigs is not None: |
| 252 | + if sig not in self.allowed_sigs: |
| 253 | + raise ValueError(f"Allowed signals are {self.allowed_sigs}") |
| 254 | + exceptions = [] |
| 255 | + if sig in self.callbacks: |
| 256 | + for cid, func in list(self.callbacks[sig].items()): # noqa: B007 |
| 257 | + try: |
| 258 | + func(*args, **kwargs) |
| 259 | + except ReferenceError: |
| 260 | + self._remove_proxy(func) |
| 261 | + except Exception as e: |
| 262 | + if self.ignore_exceptions: |
| 263 | + exceptions.append((e, sys.exc_info()[2])) |
| 264 | + else: |
| 265 | + raise |
| 266 | + return exceptions |
| 267 | + |
| 268 | + |
| 269 | +class _BoundMethodProxy: |
| 270 | + """ |
| 271 | + Our own proxy object which enables weak references to bound and unbound |
| 272 | + methods and arbitrary callables. Pulls information about the function, |
| 273 | + class, and instance out of a bound method. Stores a weak reference to the |
| 274 | + instance to support garbage collection. |
| 275 | + @organization: IBM Corporation |
| 276 | + @copyright: Copyright (c) 2005, 2006 IBM Corporation |
| 277 | + @license: The BSD License |
| 278 | + Minor bugfixes by Michael Droettboom |
| 279 | + """ |
| 280 | + |
| 281 | + def __init__(self, cb): |
| 282 | + self._hash = hash(cb) |
| 283 | + self._destroy_callbacks = [] |
| 284 | + try: |
| 285 | + # This branch is successful if 'cb' bound method and class method, |
| 286 | + # but destroy_callback mechanism works only for bound methods, |
| 287 | + # since cb.__self__ points to class instance only for |
| 288 | + # bound methods, not for class methods. Therefore destroy_callback |
| 289 | + # will not be called for class methods. |
| 290 | + try: |
| 291 | + self.inst = ref(cb.__self__, self._destroy) |
| 292 | + except TypeError: |
| 293 | + self.inst = None |
| 294 | + self.func = cb.__func__ |
| 295 | + self.klass = cb.__self__.__class__ |
| 296 | + |
| 297 | + except AttributeError: |
| 298 | + # 'cb' is a function, callable object or static method. |
| 299 | + # No weak reference is created, strong reference is stored instead. |
| 300 | + self.inst = None |
| 301 | + self.func = cb |
| 302 | + self.klass = None |
| 303 | + |
| 304 | + def add_destroy_callback(self, callback): |
| 305 | + self._destroy_callbacks.append(_BoundMethodProxy(callback)) |
| 306 | + |
| 307 | + def _destroy(self, wk): |
| 308 | + for callback in self._destroy_callbacks: |
| 309 | + try: |
| 310 | + callback(self) |
| 311 | + except ReferenceError: |
| 312 | + pass |
| 313 | + |
| 314 | + def __getstate__(self): |
| 315 | + d = self.__dict__.copy() |
| 316 | + # de-weak reference inst |
| 317 | + inst = d["inst"] |
| 318 | + if inst is not None: |
| 319 | + d["inst"] = inst() |
| 320 | + return d |
| 321 | + |
| 322 | + def __setstate__(self, statedict): |
| 323 | + self.__dict__ = statedict |
| 324 | + inst = statedict["inst"] |
| 325 | + # turn inst back into a weakref |
| 326 | + if inst is not None: |
| 327 | + self.inst = ref(inst) |
| 328 | + |
| 329 | + def __call__(self, *args, **kwargs): |
| 330 | + """ |
| 331 | + Proxy for a call to the weak referenced object. Take |
| 332 | + arbitrary params to pass to the callable. |
| 333 | + Raises `ReferenceError`: When the weak reference refers to |
| 334 | + a dead object |
| 335 | + """ |
| 336 | + if self.inst is not None and self.inst() is None: |
| 337 | + raise ReferenceError |
| 338 | + elif self.inst is not None: |
| 339 | + # build a new instance method with a strong reference to the |
| 340 | + # instance |
| 341 | + |
| 342 | + mtd = types.MethodType(self.func, self.inst()) |
| 343 | + |
| 344 | + else: |
| 345 | + # not a bound method, just return the func |
| 346 | + mtd = self.func |
| 347 | + # invoke the callable and return the result |
| 348 | + return mtd(*args, **kwargs) |
| 349 | + |
| 350 | + def __eq__(self, other): |
| 351 | + """ |
| 352 | + Compare the held function and instance with that held by |
| 353 | + another proxy. |
| 354 | + """ |
| 355 | + try: |
| 356 | + if self.inst is None: |
| 357 | + return self.func == other.func and other.inst is None |
| 358 | + else: |
| 359 | + return self.func == other.func and self.inst() == other.inst() |
| 360 | + except Exception: |
| 361 | + return False |
| 362 | + |
| 363 | + def __ne__(self, other): |
| 364 | + """ |
| 365 | + Inverse of __eq__. |
| 366 | + """ |
| 367 | + return not self.__eq__(other) |
| 368 | + |
| 369 | + def __hash__(self): |
| 370 | + return self._hash |
0 commit comments