Skip to content

Commit dc0ea2d

Browse files
fix: avoid flush panic when recovery encountered (#15851) (#16126)
Co-authored-by: August <[email protected]>
1 parent f0a05f8 commit dc0ea2d

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

src/meta/src/barrier/mod.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -745,12 +745,19 @@ impl GlobalBarrierManager {
745745

746746
send_latency_timer.observe_duration();
747747

748-
let node_to_collect = self
748+
let node_to_collect = match self
749749
.control_stream_manager
750750
.inject_barrier(command_ctx.clone())
751-
.inspect_err(|_| {
751+
{
752+
Ok(node_to_collect) => node_to_collect,
753+
Err(err) => {
754+
for notifier in notifiers {
755+
notifier.notify_failed(err.clone());
756+
}
752757
fail_point!("inject_barrier_err_success");
753-
})?;
758+
return Err(err);
759+
}
760+
};
754761

755762
// Notify about the injection.
756763
let prev_paused_reason = self.state.paused_reason();

src/meta/src/barrier/schedule.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,9 @@ impl BarrierScheduler {
241241
..Default::default()
242242
};
243243
self.attach_notifiers(vec![notifier], checkpoint)?;
244-
rx.await.unwrap()
244+
rx.await
245+
.ok()
246+
.context("failed to wait for barrier collect")?
245247
}
246248

247249
/// Run multiple commands and return when they're all completely finished. It's ensured that

0 commit comments

Comments
 (0)