Skip to content

Commit 0c27a8b

Browse files
authored
fix: select channel for clear barrier worker with higher priority (#19700)
1 parent 2a6aa41 commit 0c27a8b

File tree

1 file changed

+17
-17
lines changed

1 file changed

+17
-17
lines changed

src/stream/src/task/barrier_manager.rs

+17-17
Original file line numberDiff line numberDiff line change
@@ -311,23 +311,6 @@ impl LocalBarrierWorker {
311311
loop {
312312
select! {
313313
biased;
314-
(partial_graph_id, completed_epoch) = self.state.next_completed_epoch() => {
315-
let result = self.on_epoch_completed(partial_graph_id, completed_epoch);
316-
if let Err(err) = result {
317-
self.notify_other_failure(err, "failed to complete epoch").await;
318-
}
319-
},
320-
event = self.barrier_event_rx.recv() => {
321-
// event should not be None because the LocalBarrierManager holds a copy of tx
322-
let result = self.handle_barrier_event(event.expect("should not be none"));
323-
if let Err((actor_id, err)) = result {
324-
self.notify_actor_failure(actor_id, err, "failed to handle barrier event").await;
325-
}
326-
},
327-
failure = self.actor_failure_rx.recv() => {
328-
let (actor_id, err) = failure.unwrap();
329-
self.notify_actor_failure(actor_id, err, "recv actor failure").await;
330-
},
331314
actor_op = actor_op_rx.recv() => {
332315
if let Some(actor_op) = actor_op {
333316
match actor_op {
@@ -358,6 +341,23 @@ impl LocalBarrierWorker {
358341
break;
359342
}
360343
},
344+
(partial_graph_id, completed_epoch) = self.state.next_completed_epoch() => {
345+
let result = self.on_epoch_completed(partial_graph_id, completed_epoch);
346+
if let Err(err) = result {
347+
self.notify_other_failure(err, "failed to complete epoch").await;
348+
}
349+
},
350+
event = self.barrier_event_rx.recv() => {
351+
// event should not be None because the LocalBarrierManager holds a copy of tx
352+
let result = self.handle_barrier_event(event.expect("should not be none"));
353+
if let Err((actor_id, err)) = result {
354+
self.notify_actor_failure(actor_id, err, "failed to handle barrier event").await;
355+
}
356+
},
357+
failure = self.actor_failure_rx.recv() => {
358+
let (actor_id, err) = failure.unwrap();
359+
self.notify_actor_failure(actor_id, err, "recv actor failure").await;
360+
},
361361
request = self.control_stream_handle.next_request() => {
362362
let result = self.handle_streaming_control_request(request);
363363
if let Err(err) = result {

0 commit comments

Comments
 (0)