Skip to content

Commit 92c9bff

Browse files
authored
Implement hub callback priorities (#2530)
* Implement hub callback priorities * Update glue/core/hub.py * Address review comments * Move yield loop out of subscriber loop * Add test * Update docstrings --------- Co-authored-by: Derek Homeier <[email protected]> Co-authored-by: Thomas Robitaille <[email protected]>
1 parent d59f708 commit 92c9bff

File tree

3 files changed

+65
-8
lines changed

3 files changed

+65
-8
lines changed

glue/core/hub.py

+15-5
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ def __init__(self, *args):
6868

6969
def subscribe(self, subscriber, message_class,
7070
handler=None,
71-
filter=lambda x: True):
71+
filter=lambda x: True,
72+
priority=10):
7273
"""Subscribe an object to a type of message class.
7374
7475
:param subscriber: The subscribing object
@@ -88,6 +89,11 @@ def subscribe(self, subscriber, message_class,
8889
are only passed to the subscriber if filter(message) == True.
8990
The default is to always pass messages.
9091
92+
:param priority:
93+
An optional integer to set the priority of the handler. Handlers
94+
are sorted such that higher priority handlers get called first
95+
when broadcasting a message.
96+
9197
9298
Raises:
9399
InvalidMessage: If the input class isn't a
@@ -113,7 +119,7 @@ def subscribe(self, subscriber, message_class,
113119
if subscriber not in self._subscriptions:
114120
self._subscriptions[subscriber] = HubCallbackContainer()
115121

116-
self._subscriptions[subscriber][message_class] = handler, filter
122+
self._subscriptions[subscriber][message_class] = handler, filter, priority
117123

118124
def is_subscribed(self, subscriber, message):
119125
"""
@@ -160,9 +166,10 @@ def _find_handlers(self, message):
160166
"""Yields all (subscriber, handler) pairs that should receive a message
161167
"""
162168
# self._subscriptions:
163-
# subscriber => { message type => (filter, handler)}
169+
# subscriber => { message type => (filter, handler, priority)}
164170

165171
# loop over subscribed objects
172+
prioritized_handlers = []
166173
for subscriber, subscriptions in list(self._subscriptions.items()):
167174

168175
# subscriptions to message or its superclasses
@@ -175,9 +182,12 @@ def _find_handlers(self, message):
175182
# narrow to the most-specific message
176183
candidate = max(messages, key=_mro_count)
177184

178-
handler, test = subscriptions[candidate]
185+
handler, test, priority = subscriptions[candidate]
179186
if test(message):
180-
yield subscriber, handler
187+
prioritized_handlers.append((subscriber, handler, priority))
188+
189+
for subscriber, handler, _ in sorted(prioritized_handlers, key=lambda x: x[2], reverse=True):
190+
yield subscriber, handler
181191

182192
@contextmanager
183193
def ignore_callbacks(self, ignore_type):

glue/core/hub_callback_container.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class HubCallbackContainer(object):
1919
def __init__(self):
2020
self.callbacks = {}
2121

22-
def _wrap(self, handler, filter):
22+
def _wrap(self, handler, filter, priority):
2323
"""
2424
Given a function/method, this will automatically wrap a method using
2525
weakref to avoid circular references.
@@ -57,6 +57,8 @@ def _wrap(self, handler, filter):
5757

5858
value += (filter, None)
5959

60+
value += (priority,)
61+
6062
return value
6163

6264
def _auto_remove(self, method_instance):
@@ -91,6 +93,9 @@ def __getitem__(self, message_class):
9193
inst = callback[3]()
9294
result += (partial(func, inst),)
9395

96+
# Add priority
97+
result += (callback[4],)
98+
9499
return result
95100

96101
def __iter__(self):
@@ -108,8 +113,8 @@ def is_bound_method(func):
108113
return hasattr(func, '__func__') and getattr(func, '__self__', None) is not None
109114

110115
def __setitem__(self, message_class, value):
111-
handler, filter = value
112-
self.callbacks[message_class] = self._wrap(handler, filter)
116+
handler, filter, priority = value
117+
self.callbacks[message_class] = self._wrap(handler, filter, priority)
113118

114119
def pop(self, message_class):
115120
return self.callbacks.pop(message_class)

glue/core/tests/test_hub.py

+42
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,48 @@ def test_invalid_init(self):
165165
assert exc.value.args[0] == ("Inputs must be HubListener, data, "
166166
"subset, or data collection objects")
167167

168+
@pytest.mark.parametrize('priorities', (False, True))
169+
def test_handler_priorities(self, priorities):
170+
"""
171+
Test that handlers are called in order of descending priority, if set,
172+
in order subscribed to otherwise.
173+
"""
174+
msg, _, subscriber1 = self.get_subscription()
175+
_, _, subscriber2 = self.get_subscription()
176+
_, _, subscriber3 = self.get_subscription()
177+
178+
class Handlers:
179+
180+
def __init__(self):
181+
self.priority_test_val = 0
182+
183+
def handler1(self, msg):
184+
self.priority_test_val += 10
185+
186+
def handler2(self, msg):
187+
global priority_test_val
188+
self.priority_test_val *= 2
189+
190+
def handler3(self, msg):
191+
self.priority_test_val = 18
192+
193+
handlers = Handlers()
194+
195+
if priorities:
196+
self.hub.subscribe(subscriber1, msg, handlers.handler1, priority=100)
197+
self.hub.subscribe(subscriber2, msg, handlers.handler2)
198+
self.hub.subscribe(subscriber3, msg, handlers.handler3, priority=200)
199+
else:
200+
self.hub.subscribe(subscriber1, msg, handlers.handler1)
201+
self.hub.subscribe(subscriber2, msg, handlers.handler2)
202+
self.hub.subscribe(subscriber3, msg, handlers.handler3)
203+
204+
msg_instance = msg("Test")
205+
206+
self.hub.broadcast(msg_instance)
207+
208+
assert handlers.priority_test_val == (56 if priorities else 18)
209+
168210

169211
class TestHubListener(object):
170212
"""This is a dumb test, I know. Fixated on code coverage"""

0 commit comments

Comments
 (0)