Skip to content

rust: Add subscription manager #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 19, 2025
Merged

rust: Add subscription manager #281

merged 10 commits into from
Mar 19, 2025

Conversation

gasmith
Copy link
Contributor

@gasmith gasmith commented Mar 11, 2025

Changelog

None

Related changes

Description

Updates:

  • Removed synchronization, we'll do that in the Context with an RwLock around channels, sinks, and subs together.
  • Removed the ArcSwap cache - pre-computing the SmallVec doesn't make much of a difference since it's small and we have to clone it anyway.
  • Moved the module to crate::context::subscriptions.

This change adds a new type for tracking subscriptions of sinks to channels, but doesn't make use of it yet. That will come in a later PR.

We're planning to support two kinds of subscriptions: static subscriptions to all channels (e.g., for mcap), and dynamic subscriptions to particular channels (e.g., for websockets). We maintain these subscriptions as two separate sets. When subscriptions change, the Context will be responsible for querying the subscription manager in order to regenerate the LogSinkSet for affected channels.

Part of: FG-10743

@gasmith gasmith self-assigned this Mar 11, 2025
Copy link

linear bot commented Mar 11, 2025

@gasmith gasmith force-pushed the gasmith/fg-10743-subscr branch from 1cec7f5 to 83dbb3c Compare March 11, 2025 23:39
@gasmith gasmith force-pushed the gasmith/fg-10745-ns-arc branch 2 times, most recently from 3ee1893 to a10dd95 Compare March 12, 2025 18:42
@gasmith gasmith force-pushed the gasmith/fg-10743-subscr branch 4 times, most recently from 38136ba to 66663f1 Compare March 14, 2025 15:25
@gasmith gasmith force-pushed the gasmith/fg-10745-ns-arc branch from a10dd95 to 763aeba Compare March 14, 2025 15:33
@gasmith gasmith force-pushed the gasmith/fg-10743-subscr branch from 66663f1 to 5939b94 Compare March 14, 2025 15:46
@gasmith gasmith force-pushed the gasmith/fg-10745-ns-arc branch from 763aeba to 1f40c6b Compare March 14, 2025 15:56
@gasmith gasmith force-pushed the gasmith/fg-10743-subscr branch from 5939b94 to 9bd1af0 Compare March 14, 2025 15:57
@gasmith gasmith requested review from bryfox and eloff March 14, 2025 16:08
@gasmith gasmith marked this pull request as ready for review March 14, 2025 16:08
@gasmith gasmith changed the title rust: Add subscription module rust: Add subscription manager Mar 14, 2025
Copy link
Member

@jtbandes jtbandes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little hard to fully grasp how this is going to play out as we use it in practice, but seems probably ok to move forward and evolve this over time. None of my comments are really blocking but I will be curious to read your rebuttals 😄 Overall I felt like the doc comments could use a bit more fleshing out to make it clearer what problems the classes are solving (e.g. "tracking and querying channel subscriptions" feels a bit light on context)

Comment on lines 28 to 40
/// Current subscriptions.
subscriptions: Mutex<SubscriptionMap<K, V>>,
/// Cached map from channel to interested subscribers.
subscribers: ArcSwap<SubscriberMap<V>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would be incorrect/racy to ever perform a store on subscribers when not holding the subscriptions mutex. Is there a way to encode that better in the type system?

Relatedly, I am a bit skeptical of where we may end up using has_subscribers() and whether (because of the lack of locking) it will be easy to accidentally introduce races by assuming that a subscriber doesn't sneak in between calls to has_subscribers() and other functions...

Copy link
Contributor Author

@gasmith gasmith Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would be incorrect/racy to ever perform a store on subscribers when not holding the subscriptions mutex. Is there a way to encode that better in the type system?

Nothing springs to mind.

Relatedly, I am a bit skeptical of where we may end up using has_subscribers() and whether (because of the lack of locking) it will be easy to accidentally introduce races by assuming that a subscriber doesn't sneak in between calls to has_subscribers() and other functions...

I'll add some comments to describe the intended use for has_subscribers(). The basic idea is to use it as a guard around channel.log, so that you don't waste time serializing messages when nobody is listening. In that situation, if has_subscribers() returns false, and a subscriber sneaks in on another thread, it's not a big deal. Those things weren't synchronized to begin with.

There's perhaps a bit more to think about in the other direction. Consider the proposed changes to Channel::log, and suppose that a sink is unregistered between the call to get_subscribers() and log_to_sinks(). We'll deliver a message to a sink that isn't expecting one.

Base automatically changed from gasmith/fg-10745-ns-arc to main March 17, 2025 16:29
@gasmith gasmith force-pushed the gasmith/fg-10743-subscr branch 2 times, most recently from 5a973fa to 03557f1 Compare March 17, 2025 18:04
@gasmith gasmith changed the base branch from main to gasmith/fg-10742-sink-ctx March 17, 2025 18:04
@gasmith gasmith force-pushed the gasmith/fg-10742-sink-ctx branch from 4b8d8de to 39471b2 Compare March 17, 2025 18:54
Base automatically changed from gasmith/fg-10742-sink-ctx to main March 17, 2025 19:09
@gasmith gasmith force-pushed the gasmith/fg-10743-subscr branch from 03557f1 to 4033ec7 Compare March 17, 2025 19:09
@gasmith gasmith force-pushed the gasmith/fg-10743-subscr branch from 0be7ee2 to 2901071 Compare March 18, 2025 00:37
@gasmith
Copy link
Contributor Author

gasmith commented Mar 18, 2025

I've simplified this change quite a bit. I realized that we probably want to do our synchronization elsewhere (like on Context), and that the ArcSwap cache isn't actually buying us any performance wins compared to an implementation that constructs the SubscriberVec dynamically. I also got rid of the generics, now that SinkId has merged.

@gasmith gasmith requested a review from jtbandes March 18, 2025 00:38
/// Has no effect if the subscriber has a global subscription.
pub fn subscribe_channels(
&mut self,
sink: Arc<dyn Sink>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

micro-nit: curious if there are common conventions here - it seems like taking the arc by value is requiring an extra clone on the caller's side, when we are already planning to clone internally. Would taking &Arc be a common alternative? On the other hand I think clone should be quite cheap for Arc – which raises the question of why Arc isn't Copy 🤔 🤷 These are just musings from a rust noob – feel free to ignore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that could totally be an &Arc. Let's do that.

which raises the question of why Arc isn't Copy

My guess: Even though the clone is cheap, it's not a trivial copy, because it involves incrementing a refcount. And likewise, there's a non-trivial Drop to decrement the refcount.

}

/// Remove all global and per-channel subscriptions for a particular subscriber.
pub fn remove_subscriber(&mut self, sink_id: SinkId) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: maybe consider whether this should be called something like unsubscribe_all? I'm also fine with remove_subscriber if you prefer it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mildly prefer remove_subscriber; I feel like unsubscribe_all might be confused for "remove a global subscription".

Comment on lines 105 to 108
self.by_channel.retain(|_, subs| {
removed |= subs.remove(&sink_id).is_some();
!subs.is_empty()
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small optimization: if removed is already true, you can probably skip this, right? (since a sink with a global subscription shouldn't also have channel subscriptions)

Copy link
Contributor

@eloff eloff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits only, this looks solid

///
/// We use a [`SmallVec`] to avoid heap allocations for lookups, when the number of concurrent
/// subscribers is reasonably small.
pub(crate) type SubscriberVec = SmallVec<[Arc<dyn Sink>; 6]>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea

#[derive(Default)]
pub(crate) struct Subscriptions {
/// Global subscriptions (all channels).
global: HashMap<SinkId, Arc<dyn Sink>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea splitting these out

@gasmith gasmith merged commit 2effe12 into main Mar 19, 2025
38 checks passed
@gasmith gasmith deleted the gasmith/fg-10743-subscr branch March 19, 2025 18:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants