Skip to content

Commit 340d80c

Browse files
committed
minor refactor
Signed-off-by: xxchan <[email protected]>
1 parent 1f9a4c6 commit 340d80c

File tree

8 files changed

+64
-96
lines changed

8 files changed

+64
-96
lines changed

.git-blame-ignore-revs

+3
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,6 @@ c583e2c6c054764249acf484438c7bf7197765f4
4545

4646
# chore: cleanup v2 naming for sql metastore (#18941)
4747
9a6a7f9052d5679165ff57cc01417c742c95351c
48+
49+
# refactor: split catalog to smaller files (#19870)
50+
d6341b74be3f1913cc93993a95c147999df1ff74

src/meta/src/barrier/command.rs

+14-10
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ pub struct Reschedule {
8585
}
8686

8787
/// Replacing an old job with a new one. All actors in the job will be rebuilt.
88-
/// Used for `ALTER TABLE` ([`Command::ReplaceStreamJob`]) and sink into table ([`Command::CreateStreamingJob`]).
88+
///
89+
/// Current use cases:
90+
/// - `ALTER SOURCE` (via [`Command::ReplaceStreamJob`]) will replace a source job's plan.
91+
/// - `ALTER TABLE` (via [`Command::ReplaceStreamJob`]) and `CREATE SINK INTO table` ([`Command::CreateStreamingJob`])
92+
/// will replace a table job's plan.
8993
#[derive(Debug, Clone)]
9094
pub struct ReplaceStreamJobPlan {
9195
pub old_fragments: StreamJobFragments,
@@ -102,7 +106,7 @@ pub struct ReplaceStreamJobPlan {
102106
pub init_split_assignment: SplitAssignment,
103107
/// The `StreamingJob` info of the table to be replaced. Must be `StreamingJob::Table`
104108
pub streaming_job: StreamingJob,
105-
/// The temporary dummy table fragments id of new table fragment
109+
/// The temporary dummy job fragments id of new table fragment
106110
pub tmp_id: u32,
107111
}
108112

@@ -264,7 +268,7 @@ pub enum Command {
264268
///
265269
/// Barriers from the actors to be dropped will STILL be collected.
266270
/// After the barrier is collected, it notifies the local stream manager of compute nodes to
267-
/// drop actors, and then delete the table fragments info from meta store.
271+
/// drop actors, and then delete the job fragments info from meta store.
268272
DropStreamingJobs {
269273
table_fragments_ids: HashSet<TableId>,
270274
actors: Vec<ActorId>,
@@ -278,7 +282,7 @@ pub enum Command {
278282
/// be collected since the barrier should be passthrough.
279283
///
280284
/// After the barrier is collected, these newly created actors will be marked as `Running`. And
281-
/// it adds the table fragments info to meta store. However, the creating progress will **last
285+
/// it adds the job fragments info to meta store. However, the creating progress will **last
282286
/// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
283287
/// will be set to `Created`.
284288
CreateStreamingJob {
@@ -302,16 +306,16 @@ pub enum Command {
302306
},
303307

304308
/// `ReplaceStreamJob` command generates a `Update` barrier with the given `merge_updates`. This is
305-
/// essentially switching the downstream of the old table fragments to the new ones, and
306-
/// dropping the old table fragments. Used for table schema change.
309+
/// essentially switching the downstream of the old job fragments to the new ones, and
310+
/// dropping the old job fragments. Used for table schema change.
307311
///
308312
/// This can be treated as a special case of `RescheduleFragment`, while the upstream fragment
309313
/// of the Merge executors are changed additionally.
310314
ReplaceStreamJob(ReplaceStreamJobPlan),
311315

312-
/// `SourceSplitAssignment` generates a `Splits` barrier for pushing initialized splits or
316+
/// `SourceChangeSplit` generates a `Splits` barrier for pushing initialized splits or
313317
/// changed splits.
314-
SourceSplitAssignment(SplitAssignment),
318+
SourceChangeSplit(SplitAssignment),
315319

316320
/// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
317321
/// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
@@ -416,7 +420,7 @@ impl Command {
416420
),
417421
Command::ReplaceStreamJob(plan) => Some(plan.fragment_changes()),
418422
Command::MergeSnapshotBackfillStreamingJobs(_) => None,
419-
Command::SourceSplitAssignment(_) => None,
423+
Command::SourceChangeSplit(_) => None,
420424
Command::Throttle(_) => None,
421425
Command::CreateSubscription { .. } => None,
422426
Command::DropSubscription { .. } => None,
@@ -640,7 +644,7 @@ impl Command {
640644
}
641645
}
642646

643-
Command::SourceSplitAssignment(change) => {
647+
Command::SourceChangeSplit(change) => {
644648
let mut diff = HashMap::new();
645649

646650
for actor_splits in change.values() {

src/meta/src/barrier/context/context_impl.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl CommandContext {
159159

160160
Command::Resume(_) => {}
161161

162-
Command::SourceSplitAssignment(split_assignment) => {
162+
Command::SourceChangeSplit(split_assignment) => {
163163
barrier_manager_context
164164
.metadata_manager
165165
.update_actor_splits_by_split_assignment(split_assignment)

src/meta/src/controller/catalog/drop_op.rs

+15-28
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,8 @@ impl CatalogController {
207207
}
208208
});
209209

210-
let (source_fragments, removed_actors, removed_fragments) =
211-
resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;
212-
213-
let fragment_ids = get_fragment_ids_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?;
210+
let (removed_source_fragments, removed_actors, removed_fragments) =
211+
get_fragments_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?;
214212

215213
// Find affect users with privileges on all this objects.
216214
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
@@ -245,10 +243,10 @@ impl CatalogController {
245243
.notify_frontend(NotificationOperation::Delete, relation_group)
246244
.await;
247245

248-
let fragment_mappings = fragment_ids
249-
.into_iter()
246+
let fragment_mappings = removed_fragments
247+
.iter()
250248
.map(|fragment_id| PbFragmentWorkerSlotMapping {
251-
fragment_id: fragment_id as _,
249+
fragment_id: *fragment_id as _,
252250
mapping: None,
253251
})
254252
.collect();
@@ -259,11 +257,10 @@ impl CatalogController {
259257
Ok((
260258
ReleaseContext {
261259
database_id,
262-
streaming_job_ids: to_drop_streaming_jobs,
263-
state_table_ids: to_drop_state_table_ids,
264-
source_ids: to_drop_source_ids,
265-
connections: vec![],
266-
source_fragments,
260+
removed_streaming_job_ids: to_drop_streaming_jobs,
261+
removed_state_table_ids: to_drop_state_table_ids,
262+
removed_source_ids: to_drop_source_ids,
263+
removed_source_fragments,
267264
removed_actors,
268265
removed_fragments,
269266
},
@@ -418,8 +415,8 @@ impl CatalogController {
418415
.all(&txn)
419416
.await?;
420417

421-
let (source_fragments, removed_actors, removed_fragments) =
422-
resolve_source_register_info_for_jobs(&txn, streaming_jobs.clone()).await?;
418+
let (removed_source_fragments, removed_actors, removed_fragments) =
419+
get_fragments_for_jobs(&txn, streaming_jobs.clone()).await?;
423420

424421
let state_table_ids: Vec<TableId> = Table::find()
425422
.select_only()
@@ -445,15 +442,6 @@ impl CatalogController {
445442
.all(&txn)
446443
.await?;
447444

448-
let connections = Connection::find()
449-
.inner_join(Object)
450-
.filter(object::Column::DatabaseId.eq(Some(database_id)))
451-
.all(&txn)
452-
.await?
453-
.into_iter()
454-
.map(|conn| conn.connection_id)
455-
.collect_vec();
456-
457445
// Find affect users with privileges on the database and the objects in the database.
458446
let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
459447
.select_only()
@@ -503,11 +491,10 @@ impl CatalogController {
503491
Ok((
504492
ReleaseContext {
505493
database_id,
506-
streaming_job_ids: streaming_jobs,
507-
state_table_ids,
508-
source_ids,
509-
connections,
510-
source_fragments,
494+
removed_streaming_job_ids: streaming_jobs,
495+
removed_state_table_ids: state_table_ids,
496+
removed_source_ids: source_ids,
497+
removed_source_fragments,
511498
removed_actors,
512499
removed_fragments,
513500
},

src/meta/src/controller/catalog/mod.rs

+10-21
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,7 @@ use super::utils::{
7878
rename_relation, rename_relation_refer,
7979
};
8080
use crate::controller::catalog::util::update_internal_tables;
81-
use crate::controller::utils::{
82-
build_relation_group_for_delete, check_connection_name_duplicate,
83-
check_database_name_duplicate, check_function_signature_duplicate,
84-
check_relation_name_duplicate, check_schema_name_duplicate, check_secret_name_duplicate,
85-
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id,
86-
extract_external_table_name_from_definition, get_referring_objects,
87-
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
88-
resolve_source_register_info_for_jobs, PartialObject,
89-
};
81+
use crate::controller::utils::*;
9082
use crate::controller::ObjectModel;
9183
use crate::manager::{
9284
get_referred_connection_ids_from_source, get_referred_secret_ids_from_source, MetaSrvEnv,
@@ -121,20 +113,17 @@ pub struct CatalogController {
121113
#[derive(Clone, Default)]
122114
pub struct ReleaseContext {
123115
pub(crate) database_id: DatabaseId,
124-
pub(crate) streaming_job_ids: Vec<ObjectId>,
116+
pub(crate) removed_streaming_job_ids: Vec<ObjectId>,
125117
/// Dropped state table list, need to unregister from hummock.
126-
pub(crate) state_table_ids: Vec<TableId>,
127-
/// Dropped source list, need to unregister from source manager.
128-
pub(crate) source_ids: Vec<SourceId>,
129-
/// Dropped connection list, need to delete from vpc endpoints.
130-
#[allow(dead_code)]
131-
pub(crate) connections: Vec<ConnectionId>,
132-
133-
/// Dropped fragments that are fetching data from the target source.
134-
pub(crate) source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
135-
/// Dropped actors.
136-
pub(crate) removed_actors: HashSet<ActorId>,
118+
pub(crate) removed_state_table_ids: Vec<TableId>,
137119

120+
/// Dropped sources (when `DROP SOURCE`), need to unregister from source manager.
121+
pub(crate) removed_source_ids: Vec<SourceId>,
122+
/// Dropped Source fragments (when `DROP MATERIALIZED VIEW` referencing sources),
123+
/// need to unregister from source manager.
124+
pub(crate) removed_source_fragments: HashMap<SourceId, BTreeSet<FragmentId>>,
125+
126+
pub(crate) removed_actors: HashSet<ActorId>,
138127
pub(crate) removed_fragments: HashSet<FragmentId>,
139128
}
140129

src/meta/src/controller/utils.rs

+12-27
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ use risingwave_pb::meta::subscribe_response::Info as NotificationInfo;
3939
use risingwave_pb::meta::{
4040
FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbRelation, PbRelationGroup,
4141
};
42-
use risingwave_pb::stream_plan::stream_node::NodeBody;
43-
use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource};
42+
use risingwave_pb::stream_plan::PbFragmentTypeFlag;
4443
use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject};
4544
use risingwave_pb::user::{PbGrantPrivilege, PbUserInfo};
4645
use risingwave_sqlparser::ast::Statement as SqlStatement;
@@ -1059,25 +1058,11 @@ where
10591058
Ok(fragment_actors.into_iter().into_group_map())
10601059
}
10611060

1062-
/// Find the external stream source info inside the stream node, if any.
1063-
pub fn find_stream_source(stream_node: &PbStreamNode) -> Option<&StreamSource> {
1064-
if let Some(NodeBody::Source(source)) = &stream_node.node_body {
1065-
if let Some(inner) = &source.source_inner {
1066-
return Some(inner);
1067-
}
1068-
}
1069-
1070-
for child in &stream_node.input {
1071-
if let Some(source) = find_stream_source(child) {
1072-
return Some(source);
1073-
}
1074-
}
1075-
1076-
None
1077-
}
1078-
1079-
/// Resolve fragment list that are subscribing to sources and actor lists.
1080-
pub async fn resolve_source_register_info_for_jobs<C>(
1061+
/// For the given streaming jobs, returns
1062+
/// - All source fragments
1063+
/// - All actors
1064+
/// - All fragments
1065+
pub async fn get_fragments_for_jobs<C>(
10811066
db: &C,
10821067
streaming_jobs: Vec<ObjectId>,
10831068
) -> MetaResult<(
@@ -1113,28 +1098,28 @@ where
11131098
.all(db)
11141099
.await?;
11151100

1116-
let removed_fragments = fragments
1101+
let fragment_ids = fragments
11171102
.iter()
11181103
.map(|(fragment_id, _, _)| *fragment_id)
11191104
.collect();
11201105

1121-
let mut source_fragment_ids = HashMap::new();
1106+
let mut source_fragment_ids: HashMap<SourceId, BTreeSet<FragmentId>> = HashMap::new();
11221107
for (fragment_id, mask, stream_node) in fragments {
11231108
if mask & PbFragmentTypeFlag::Source as i32 == 0 {
11241109
continue;
11251110
}
1126-
if let Some(source) = find_stream_source(&stream_node.to_protobuf()) {
1111+
if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
11271112
source_fragment_ids
1128-
.entry(source.source_id as SourceId)
1129-
.or_insert_with(BTreeSet::new)
1113+
.entry(source_id as _)
1114+
.or_default()
11301115
.insert(fragment_id);
11311116
}
11321117
}
11331118

11341119
Ok((
11351120
source_fragment_ids,
11361121
actors.into_iter().collect(),
1137-
removed_fragments,
1122+
fragment_ids,
11381123
))
11391124
}
11401125

src/meta/src/rpc/ddl_controller.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -1252,24 +1252,24 @@ impl DdlController {
12521252

12531253
let ReleaseContext {
12541254
database_id,
1255-
streaming_job_ids,
1256-
state_table_ids,
1257-
source_ids,
1258-
source_fragments,
1255+
removed_streaming_job_ids,
1256+
removed_state_table_ids,
1257+
removed_source_ids,
1258+
removed_source_fragments,
12591259
removed_actors,
12601260
removed_fragments,
12611261
..
12621262
} = release_ctx;
12631263

12641264
// unregister sources.
12651265
self.source_manager
1266-
.unregister_sources(source_ids.into_iter().map(|id| id as _).collect())
1266+
.unregister_sources(removed_source_ids.into_iter().map(|id| id as _).collect())
12671267
.await;
12681268

12691269
// unregister fragments and actors from source manager.
12701270
self.source_manager
12711271
.drop_source_fragments(
1272-
source_fragments
1272+
removed_source_fragments
12731273
.into_iter()
12741274
.map(|(source_id, fragments)| {
12751275
(
@@ -1287,8 +1287,8 @@ impl DdlController {
12871287
.drop_streaming_jobs(
12881288
risingwave_common::catalog::DatabaseId::new(database_id as _),
12891289
removed_actors.into_iter().map(|id| id as _).collect(),
1290-
streaming_job_ids,
1291-
state_table_ids,
1290+
removed_streaming_job_ids,
1291+
removed_state_table_ids,
12921292
removed_fragments.iter().map(|id| *id as _).collect(),
12931293
)
12941294
.await;

src/meta/src/stream/source_manager.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1276,7 +1276,7 @@ impl SourceManager {
12761276

12771277
for (database_id, split_assignment) in split_assignment {
12781278
if !split_assignment.is_empty() {
1279-
let command = Command::SourceSplitAssignment(split_assignment);
1279+
let command = Command::SourceChangeSplit(split_assignment);
12801280
tracing::info!(command = ?command, "pushing down split assignment command");
12811281
self.barrier_scheduler
12821282
.run_command(database_id, command)

0 commit comments

Comments
 (0)