Skip to content

Commit 194f8bb

Browse files
authored
refactor(meta): unify code for different kinds of source change (#19991)
1 parent bef8772 commit 194f8bb

File tree

7 files changed

+229
-169
lines changed

7 files changed

+229
-169
lines changed

src/meta/src/barrier/command.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ pub enum Command {
307307

308308
/// `ReplaceStreamJob` command generates a `Update` barrier with the given `merge_updates`. This is
309309
/// essentially switching the downstream of the old job fragments to the new ones, and
310-
/// dropping the old job fragments. Used for table schema change.
310+
/// dropping the old job fragments. Used for schema change.
311311
///
312312
/// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
313313
/// of the Merge executors are changed additionally.

src/meta/src/barrier/context/context_impl.rs

+30-31
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::barrier::{
3333
Scheduled,
3434
};
3535
use crate::hummock::CommitEpochInfo;
36+
use crate::stream::SourceChange;
3637
use crate::{MetaError, MetaResult};
3738

3839
impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
@@ -166,7 +167,7 @@ impl CommandContext {
166167
.await?;
167168
barrier_manager_context
168169
.source_manager
169-
.apply_source_change(None, None, Some(split_assignment.clone()), None, None)
170+
.apply_source_change(SourceChange::SplitChange(split_assignment.clone()))
170171
.await;
171172
}
172173

@@ -180,17 +181,18 @@ impl CommandContext {
180181
.await?;
181182
}
182183
Command::CreateStreamingJob { info, job_type } => {
183-
let mut fragment_replacements = None;
184-
let mut dropped_actors = None;
184+
let mut is_sink_into_table = false;
185185
match job_type {
186186
CreateStreamingJobType::SinkIntoTable(
187187
replace_plan @ ReplaceStreamJobPlan {
188+
old_fragments,
188189
new_fragments,
189190
dispatchers,
190191
init_split_assignment,
191192
..
192193
},
193194
) => {
195+
is_sink_into_table = true;
194196
barrier_manager_context
195197
.metadata_manager
196198
.catalog_controller
@@ -201,8 +203,15 @@ impl CommandContext {
201203
init_split_assignment,
202204
)
203205
.await?;
204-
fragment_replacements = Some(replace_plan.fragment_replacements());
205-
dropped_actors = Some(replace_plan.dropped_actors());
206+
barrier_manager_context
207+
.source_manager
208+
.handle_replace_job(
209+
old_fragments,
210+
new_fragments.stream_source_fragments(),
211+
init_split_assignment.clone(),
212+
replace_plan.fragment_replacements(),
213+
)
214+
.await;
206215
}
207216
CreateStreamingJobType::Normal => {}
208217
CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
@@ -247,19 +256,17 @@ impl CommandContext {
247256
)
248257
.await?;
249258

250-
// Extract the fragments that include source operators.
251-
let source_fragments = stream_job_fragments.stream_source_fragments();
252-
let backfill_fragments = stream_job_fragments.source_backfill_fragments()?;
253-
barrier_manager_context
254-
.source_manager
255-
.apply_source_change(
256-
Some(source_fragments),
257-
Some(backfill_fragments),
258-
Some(init_split_assignment.clone()),
259-
dropped_actors,
260-
fragment_replacements,
261-
)
262-
.await;
259+
if !is_sink_into_table {
260+
barrier_manager_context
261+
.source_manager
262+
.apply_source_change(SourceChange::CreateJob {
263+
added_source_fragments: stream_job_fragments.stream_source_fragments(),
264+
added_backfill_fragments: stream_job_fragments
265+
.source_backfill_fragments()?,
266+
split_assignment: init_split_assignment.clone(),
267+
})
268+
.await;
269+
}
263270
}
264271
Command::RescheduleFragment {
265272
reschedules,
@@ -296,19 +303,11 @@ impl CommandContext {
296303
// Apply the split changes in source manager.
297304
barrier_manager_context
298305
.source_manager
299-
.drop_source_fragments_vec(std::slice::from_ref(old_fragments))
300-
.await;
301-
let source_fragments = new_fragments.stream_source_fragments();
302-
// XXX: is it possible to have backfill fragments here?
303-
let backfill_fragments = new_fragments.source_backfill_fragments()?;
304-
barrier_manager_context
305-
.source_manager
306-
.apply_source_change(
307-
Some(source_fragments),
308-
Some(backfill_fragments),
309-
Some(init_split_assignment.clone()),
310-
Some(replace_plan.dropped_actors()),
311-
Some(replace_plan.fragment_replacements()),
306+
.handle_replace_job(
307+
old_fragments,
308+
new_fragments.stream_source_fragments(),
309+
init_split_assignment.clone(),
310+
replace_plan.fragment_replacements(),
312311
)
313312
.await;
314313
}

src/meta/src/barrier/context/recovery.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
3535
use crate::controller::fragment::InflightFragmentInfo;
3636
use crate::manager::ActiveStreamingWorkerNodes;
3737
use crate::model::{ActorId, StreamJobFragments, TableParallelism};
38-
use crate::stream::{RescheduleOptions, TableResizePolicy};
38+
use crate::stream::{RescheduleOptions, SourceChange, TableResizePolicy};
3939
use crate::{model, MetaResult};
4040

4141
impl GlobalBarrierWorkerContextImpl {
@@ -46,14 +46,18 @@ impl GlobalBarrierWorkerContextImpl {
4646
.catalog_controller
4747
.clean_dirty_subscription(database_id)
4848
.await?;
49-
let source_ids = self
49+
let dirty_associated_source_ids = self
5050
.metadata_manager
5151
.catalog_controller
5252
.clean_dirty_creating_jobs(database_id)
5353
.await?;
5454

5555
// unregister cleaned sources.
56-
self.source_manager.unregister_sources(source_ids).await;
56+
self.source_manager
57+
.apply_source_change(SourceChange::DropSource {
58+
dropped_source_ids: dirty_associated_source_ids,
59+
})
60+
.await;
5761

5862
Ok(())
5963
}

src/meta/src/controller/catalog/mod.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,9 @@ impl CatalogController {
364364
})
365365
.collect_vec();
366366

367-
let associated_source_ids: Vec<SourceId> = Table::find()
367+
// The source ids for dirty tables with connector.
368+
// FIXME: we should also clean dirty sources.
369+
let dirty_associated_source_ids: Vec<SourceId> = Table::find()
368370
.select_only()
369371
.column(table::Column::OptionalAssociatedSourceId)
370372
.filter(
@@ -402,7 +404,7 @@ impl CatalogController {
402404
.clone()
403405
.into_iter()
404406
.chain(dirty_state_table_ids.into_iter())
405-
.chain(associated_source_ids.clone().into_iter())
407+
.chain(dirty_associated_source_ids.clone().into_iter())
406408
.collect();
407409

408410
let res = Object::delete_many()
@@ -424,7 +426,7 @@ impl CatalogController {
424426
.notify_frontend(NotificationOperation::Delete, relation_group)
425427
.await;
426428

427-
Ok(associated_source_ids)
429+
Ok(dirty_associated_source_ids)
428430
}
429431

430432
pub async fn comment_on(&self, comment: PbComment) -> MetaResult<NotificationVersion> {

src/meta/src/rpc/ddl_controller.rs

+23-15
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ use crate::model::{StreamContext, StreamJobFragments, TableParallelism};
7777
use crate::stream::{
7878
create_source_worker_handle, validate_sink, ActorGraphBuildResult, ActorGraphBuilder,
7979
CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
80-
GlobalStreamManagerRef, ReplaceStreamJobContext, SourceManagerRef, StreamFragmentGraph,
80+
GlobalStreamManagerRef, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
81+
StreamFragmentGraph,
8182
};
8283
use crate::{MetaError, MetaResult};
8384

@@ -974,7 +975,9 @@ impl DdlController {
974975
tracing::warn!(id = job_id, "aborted streaming job");
975976
if let Some(source_id) = source_id {
976977
self.source_manager
977-
.unregister_sources(vec![source_id as SourceId])
978+
.apply_source_change(SourceChange::DropSource {
979+
dropped_source_ids: vec![source_id as SourceId],
980+
})
978981
.await;
979982
}
980983
}
@@ -1263,23 +1266,28 @@ impl DdlController {
12631266

12641267
// unregister sources.
12651268
self.source_manager
1266-
.unregister_sources(removed_source_ids.into_iter().map(|id| id as _).collect())
1269+
.apply_source_change(SourceChange::DropSource {
1270+
dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1271+
})
12671272
.await;
12681273

12691274
// unregister fragments and actors from source manager.
1275+
// FIXME: need also unregister source backfill fragments.
1276+
let dropped_source_fragments = removed_source_fragments
1277+
.into_iter()
1278+
.map(|(source_id, fragments)| {
1279+
(
1280+
source_id,
1281+
fragments.into_iter().map(|id| id as u32).collect(),
1282+
)
1283+
})
1284+
.collect();
1285+
let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
12701286
self.source_manager
1271-
.drop_source_fragments(
1272-
removed_source_fragments
1273-
.into_iter()
1274-
.map(|(source_id, fragments)| {
1275-
(
1276-
source_id,
1277-
fragments.into_iter().map(|id| id as u32).collect(),
1278-
)
1279-
})
1280-
.collect(),
1281-
removed_actors.iter().map(|id| *id as _).collect(),
1282-
)
1287+
.apply_source_change(SourceChange::DropMv {
1288+
dropped_source_fragments,
1289+
dropped_actors,
1290+
})
12831291
.await;
12841292

12851293
// drop streaming jobs.

src/meta/src/stream/scale.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ impl CustomFragmentInfo {
140140

141141
use educe::Educe;
142142

143+
use super::SourceChange;
143144
use crate::controller::id::IdCategory;
144145

145146
// The debug implementation is arbitrary. Just used in debug logs.
@@ -1769,13 +1770,10 @@ impl ScaleController {
17691770

17701771
if !stream_source_actor_splits.is_empty() {
17711772
self.source_manager
1772-
.apply_source_change(
1773-
None,
1774-
None,
1775-
Some(stream_source_actor_splits),
1776-
Some(stream_source_dropped_actors),
1777-
None,
1778-
)
1773+
.apply_source_change(SourceChange::Reschedule {
1774+
split_assignment: stream_source_actor_splits,
1775+
dropped_actors: stream_source_dropped_actors,
1776+
})
17791777
.await;
17801778
}
17811779

0 commit comments

Comments
 (0)