-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch subscriber #148
base: main
Are you sure you want to change the base?
Batch subscriber #148
Conversation
f09f842
to
6fdf427
Compare
6fdf427
to
4335922
Compare
2505843
to
0046e5a
Compare
Upon reset_node and apply_update, we want to be able to react to a batch of changes rather than individual changes. In quickwit for instance, we want to be able to react to the reception of a batch of deleted shard and group this reaction into a single metastore call.
0046e5a
to
58cecb6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i still have to read through half of state.rs. In the meantime, here are my first comments
let node_state = self.cluster_state.node_state_mut(chitchat_id); | ||
) -> anyhow::Result<()> { | ||
// We validate the version is compatible with what is in key_values. | ||
if !key_values.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is redundant (if key_values is empty, the for will not iterate)
listener_idx: usize, | ||
// Callbacks is a hashmap because as we delete listeners, we create "holes" in the | ||
// callback_id -> callback mapping | ||
callbacks: HashMap<usize, CallbackEntry>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make this a HashMap<CallbackId, CallbackEntry>
to make clear it goes hand in hand with listeners
NodeState { | ||
chitchat_id, | ||
heartbeat: Heartbeat(0), | ||
key_values: Default::default(), | ||
max_version: 0u64, | ||
listeners, | ||
// listeners, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// listeners, |
impl<'a> Deref for NodeStateMut<'a> { | ||
type Target = NodeState; | ||
|
||
fn deref(&self) -> &NodeState { | ||
self.node_state_mut | ||
} | ||
} | ||
|
||
impl<'a> DerefMut for NodeStateMut<'a> { | ||
fn deref_mut(&mut self) -> &mut NodeState { | ||
self.node_state_mut | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think deref makes sense, but deref_mut is bound to cause mutations which don't notify listeners
key.to_string(), | ||
VersionedValue { | ||
value: value.to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these to_string() look redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(see also first batch)
// 0 stale values | ||
{ | ||
cluster_state.node_state_mut(&node5); | ||
} | ||
|
||
// 3 stale values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand where the stale value count come from. Before node5 and node6 had respectively 3 and 0 (instead of 0 and 3) stale values. I don't see an obvious reason this changed
Upon reset_node and apply_update, we want to be
able to react to a batch of changes rather than individual changes.
In quickwit for instance, we want to be able to react to the reception of a batch of deleted shard and group this reaction into a single metastore call.