Skip to content

Conversation

@vshekar
Copy link
Contributor

@vshekar vshekar commented Nov 6, 2025

Checklist

  • Add a Changelog entry
  • Add the ticket number which this PR closes to the comment section

)
self._pubsub = PubSub()

async def incr_seq(self, node_id: str) -> int:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a potential race here. Consider using itertools.count with collections.defaultdict.

)


class PubSub:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reuse the CallbackHandler? It's defined under tiled.client.streams but could be moved to tiled.calback_handler for use by client- or server-side modules.

class CallbackRegistry(Generic[T]):
"""
Distribute updates to user-provided callback functions.
Parameters
----------
executor : concurrent.futures.Executor
Launches tasks asynchronously, in response to updates
"""
def __init__(self, executor: concurrent.futures.Executor):
self._executor = executor
self._callbacks: set[T] = set()
@property
def executor(self):
return self._executor
def process(self, update: T):
"Fan an update out to all registered callbacks."
for ref in self._callbacks:
callback = ref()
if callback is not None:
self.executor.submit(callback, update)
def add_callback(self, callback: Callback[T]) -> Self:
"""
Register a callback to be run when the Subscription receives an update.
The callback registry only holds a weak reference to the callback. If
no hard references are held elsewhere in the program, the callback will
be silently removed.
Parameters
----------
callback : Callback
Returns
-------
Subscription
Examples
--------
Simply subscribe the print function.
>>> sub.add_callback(print)
Subscribe a custom function.
>>> def f(sub, data):
...
>>> sub.add_callback(f)
Start receiving updates, beginning with the next one.
>>> sub.start()
Or start receiving updates beginning as far back as the server has
available.
>>> sub.start(0)
Or start receiving updates beginning with a specific sequence number.
>>> sub.start(3)
The method calls can be chained like:
>>> sub.add_callback(f).add_callback(g).start()
"""
def cleanup(ref: weakref.ref) -> None:
# When an object is garbage collected, remove its entry
# from the set of callbacks.
self._callbacks.remove(ref)
if inspect.ismethod(callback):
# This holds the reference to the method until the object it is
# bound to is garbage collected.
ref = weakref.WeakMethod(callback, cleanup)
else:
ref = weakref.ref(callback, cleanup)
self._callbacks.add(ref)
def remove_callback(self, callback: Callback[T]) -> Self:
"""
Unregister a callback.
Parameters
----------
callback : Callback
Returns
-------
Subscription
"""
self._callbacks.remove(callback)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure this is a good idea. It's nice that your version is async. (This one uses threads.) Maybe just borrowing patterns from this rather than copy/pasting it would be better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants