Skip to content

Commit cab31b7

Browse files
github-actions[bot]yezizp2012huangjw806
authored
fix: introduce blocked status for scheduled queue when recovering (#11543) (#11555)
Co-authored-by: August <[email protected]> Co-authored-by: Huangjw <[email protected]>
1 parent 5e10fcb commit cab31b7

File tree

3 files changed

+85
-25
lines changed

3 files changed

+85
-25
lines changed

src/meta/src/barrier/recovery.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,10 @@ where
110110
///
111111
/// Returns the new epoch after recovery.
112112
pub(crate) async fn recovery(&self, prev_epoch: TracedEpoch) -> TracedEpoch {
113-
// pause discovery of all connector split changes and trigger config change.
114-
let _source_pause_guard = self.source_manager.paused.lock().await;
115-
116-
// Abort buffered schedules, they might be dirty already.
117-
self.scheduled_barriers.abort().await;
113+
// Mark blocked and abort buffered schedules, they might be dirty already.
114+
self.scheduled_barriers
115+
.abort_and_mark_blocked("cluster is under recovering")
116+
.await;
118117

119118
tracing::info!("recovery start!");
120119
self.clean_dirty_fragments()
@@ -223,6 +222,7 @@ where
223222
.await
224223
.expect("Retry until recovery success.");
225224
recovery_timer.observe_duration();
225+
self.scheduled_barriers.mark_ready().await;
226226
tracing::info!("recovery success");
227227

228228
new_epoch

src/meta/src/barrier/schedule.rs

+75-20
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ use super::{Command, Scheduled};
2828
use crate::hummock::HummockManagerRef;
2929
use crate::rpc::metrics::MetaMetrics;
3030
use crate::storage::MetaStore;
31-
use crate::MetaResult;
31+
use crate::{MetaError, MetaResult};
3232

3333
/// A queue for scheduling barriers.
3434
///
3535
/// We manually implement one here instead of using channels since we may need to update the front
3636
/// of the queue to add some notifiers for instant flushes.
3737
struct Inner {
38-
queue: RwLock<VecDeque<Scheduled>>,
38+
queue: RwLock<ScheduledQueue>,
3939

4040
/// When `queue` is not empty anymore, all subscribers of this watcher will be notified.
4141
changed_tx: watch::Sender<()>,
@@ -52,6 +52,47 @@ struct Inner {
5252
metrics: Arc<MetaMetrics>,
5353
}
5454

55+
enum QueueStatus {
56+
/// The queue is ready to accept new command.
57+
Ready,
58+
/// The queue is blocked to accept new command with the given reason.
59+
Blocked(String),
60+
}
61+
62+
struct ScheduledQueue {
63+
queue: VecDeque<Scheduled>,
64+
status: QueueStatus,
65+
}
66+
67+
impl ScheduledQueue {
68+
fn new() -> Self {
69+
Self {
70+
queue: VecDeque::new(),
71+
status: QueueStatus::Ready,
72+
}
73+
}
74+
75+
fn mark_blocked(&mut self, reason: String) {
76+
self.status = QueueStatus::Blocked(reason);
77+
}
78+
79+
fn mark_ready(&mut self) {
80+
self.status = QueueStatus::Ready;
81+
}
82+
83+
fn len(&self) -> usize {
84+
self.queue.len()
85+
}
86+
87+
fn push_back(&mut self, scheduled: Scheduled) -> MetaResult<()> {
88+
if let QueueStatus::Blocked(reason) = &self.status {
89+
return Err(MetaError::unavailable(reason.clone()));
90+
}
91+
self.queue.push_back(scheduled);
92+
Ok(())
93+
}
94+
}
95+
5596
impl Inner {
5697
/// Create a new scheduled barrier with the given `checkpoint`, `command` and `notifiers`.
5798
fn new_scheduled(
@@ -100,7 +141,7 @@ impl<S: MetaStore> BarrierScheduler<S> {
100141
checkpoint_frequency,
101142
);
102143
let inner = Arc::new(Inner {
103-
queue: RwLock::new(VecDeque::new()),
144+
queue: RwLock::new(ScheduledQueue::new()),
104145
changed_tx: watch::channel(()).0,
105146
num_uncheckpointed_barrier: AtomicUsize::new(0),
106147
checkpoint_frequency: AtomicUsize::new(checkpoint_frequency),
@@ -118,20 +159,21 @@ impl<S: MetaStore> BarrierScheduler<S> {
118159
}
119160

120161
/// Push a scheduled barrier into the queue.
121-
async fn push(&self, scheduleds: impl IntoIterator<Item = Scheduled>) {
162+
async fn push(&self, scheduleds: impl IntoIterator<Item = Scheduled>) -> MetaResult<()> {
122163
let mut queue = self.inner.queue.write().await;
123164
for scheduled in scheduleds {
124-
queue.push_back(scheduled);
165+
queue.push_back(scheduled)?;
125166
if queue.len() == 1 {
126167
self.inner.changed_tx.send(()).ok();
127168
}
128169
}
170+
Ok(())
129171
}
130172

131173
/// Try to cancel scheduled cmd for create streaming job, return true if cancelled.
132174
pub async fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool {
133-
let mut queue = self.inner.queue.write().await;
134-
if let Some(idx) = queue.iter().position(|scheduled| {
175+
let queue = &mut self.inner.queue.write().await;
176+
if let Some(idx) = queue.queue.iter().position(|scheduled| {
135177
if let Command::CreateStreamingJob {
136178
table_fragments, ..
137179
} = &scheduled.command
@@ -142,7 +184,7 @@ impl<S: MetaStore> BarrierScheduler<S> {
142184
false
143185
}
144186
}) {
145-
queue.remove(idx).unwrap();
187+
queue.queue.remove(idx).unwrap();
146188
true
147189
} else {
148190
false
@@ -152,9 +194,13 @@ impl<S: MetaStore> BarrierScheduler<S> {
152194
/// Attach `new_notifiers` to the very first scheduled barrier. If there's no one scheduled, a
153195
/// default barrier will be created. If `new_checkpoint` is true, the barrier will become a
154196
/// checkpoint.
155-
async fn attach_notifiers(&self, new_notifiers: Vec<Notifier>, new_checkpoint: bool) {
197+
async fn attach_notifiers(
198+
&self,
199+
new_notifiers: Vec<Notifier>,
200+
new_checkpoint: bool,
201+
) -> MetaResult<()> {
156202
let mut queue = self.inner.queue.write().await;
157-
match queue.front_mut() {
203+
match queue.queue.front_mut() {
158204
Some(Scheduled {
159205
notifiers,
160206
checkpoint,
@@ -169,10 +215,11 @@ impl<S: MetaStore> BarrierScheduler<S> {
169215
new_checkpoint,
170216
Command::barrier(),
171217
new_notifiers,
172-
));
218+
))?;
173219
self.inner.changed_tx.send(()).ok();
174220
}
175221
}
222+
Ok(())
176223
}
177224

178225
/// Wait for the next barrier to collect. Note that the barrier flowing in our stream graph is
@@ -183,7 +230,7 @@ impl<S: MetaStore> BarrierScheduler<S> {
183230
collected: Some(tx),
184231
..Default::default()
185232
};
186-
self.attach_notifiers(vec![notifier], checkpoint).await;
233+
self.attach_notifiers(vec![notifier], checkpoint).await?;
187234
rx.await.unwrap()
188235
}
189236

@@ -219,7 +266,7 @@ impl<S: MetaStore> BarrierScheduler<S> {
219266
));
220267
}
221268

222-
self.push(scheduleds).await;
269+
self.push(scheduleds).await?;
223270

224271
for Context {
225272
collect_rx,
@@ -278,7 +325,7 @@ impl ScheduledBarriers {
278325
pub(super) async fn pop_or_default(&self) -> Scheduled {
279326
let mut queue = self.inner.queue.write().await;
280327
let checkpoint = self.try_get_checkpoint();
281-
let scheduled = match queue.pop_front() {
328+
let scheduled = match queue.queue.pop_front() {
282329
Some(mut scheduled) => {
283330
scheduled.checkpoint = scheduled.checkpoint || checkpoint;
284331
scheduled
@@ -305,16 +352,24 @@ impl ScheduledBarriers {
305352
rx.changed().await.unwrap();
306353
}
307354

308-
/// Clear all queued scheduled barriers, and notify their subscribers with failed as aborted.
309-
pub(super) async fn abort(&self) {
355+
/// Mark command scheduler as blocked and abort all queued scheduled command and notify with
356+
/// specific reason.
357+
pub(super) async fn abort_and_mark_blocked(&self, reason: impl Into<String> + Copy) {
310358
let mut queue = self.inner.queue.write().await;
311-
while let Some(Scheduled { notifiers, .. }) = queue.pop_front() {
312-
notifiers.into_iter().for_each(|notify| {
313-
notify.notify_collection_failed(anyhow!("Scheduled barrier abort.").into())
314-
})
359+
queue.mark_blocked(reason.into());
360+
while let Some(Scheduled { notifiers, .. }) = queue.queue.pop_front() {
361+
notifiers
362+
.into_iter()
363+
.for_each(|notify| notify.notify_collection_failed(anyhow!(reason.into()).into()))
315364
}
316365
}
317366

367+
/// Mark command scheduler as ready to accept new command.
368+
pub(super) async fn mark_ready(&self) {
369+
let mut queue = self.inner.queue.write().await;
370+
queue.mark_ready();
371+
}
372+
318373
/// Whether the barrier(checkpoint = true) should be injected.
319374
fn try_get_checkpoint(&self) -> bool {
320375
self.inner

src/meta/src/stream/stream_manager.rs

+5
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,10 @@ mod tests {
656656
&self,
657657
_request: Request<ForceStopActorsRequest>,
658658
) -> std::result::Result<Response<ForceStopActorsResponse>, Status> {
659+
self.inner.actor_streams.lock().unwrap().clear();
660+
self.inner.actor_ids.lock().unwrap().clear();
661+
self.inner.actor_infos.lock().unwrap().clear();
662+
659663
Ok(Response::new(ForceStopActorsResponse::default()))
660664
}
661665

@@ -1029,6 +1033,7 @@ mod tests {
10291033
assert_eq!(table_fragments.actor_ids(), (0..=3).collect_vec());
10301034

10311035
// test drop materialized_view
1036+
tokio::time::sleep(Duration::from_secs(2)).await;
10321037
services
10331038
.drop_materialized_views(vec![table_id])
10341039
.await

0 commit comments

Comments
 (0)