@@ -26,10 +26,11 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
2626use risingwave_hummock_sdk:: compaction_group:: { StateTableId , StaticCompactionGroupId } ;
2727use risingwave_hummock_sdk:: CompactionGroupId ;
2828use risingwave_pb:: hummock:: group_delta:: DeltaType ;
29+ use risingwave_pb:: hummock:: hummock_version_delta:: GroupDeltas ;
2930use risingwave_pb:: hummock:: rise_ctl_update_compaction_config_request:: mutable_config:: MutableConfig ;
3031use risingwave_pb:: hummock:: {
31- CompactionConfig , CompactionGroupInfo , GroupConstruct , GroupDelta , GroupDestroy ,
32- GroupMetaChange ,
32+ compact_task , CompactionConfig , CompactionGroupInfo , GroupConstruct , GroupDelta , GroupDestroy ,
33+ GroupMetaChange , GroupTableChange ,
3334} ;
3435use tokio:: sync:: { OnceCell , RwLock } ;
3536
@@ -61,7 +62,7 @@ impl<S: MetaStore> HummockManager<S> {
6162 ) -> Result < RwLock < CompactionGroupManager > > {
6263 let compaction_group_manager = RwLock :: new ( CompactionGroupManager {
6364 compaction_groups : BTreeMap :: new ( ) ,
64- provided_default_config_for_test : config,
65+ default_config : config,
6566 } ) ;
6667 compaction_group_manager
6768 . write ( )
@@ -421,11 +422,24 @@ impl<S: MetaStore> HummockManager<S> {
421422
422423 /// Splits a compaction group into two. The new one will contain `table_ids`.
423424 /// Returns the newly created compaction group id.
424- #[ named]
425425 pub async fn split_compaction_group (
426426 & self ,
427427 parent_group_id : CompactionGroupId ,
428428 table_ids : & [ StateTableId ] ,
429+ ) -> Result < CompactionGroupId > {
430+ self . move_state_table_to_compaction_group ( parent_group_id, table_ids, None , false )
431+ . await
432+ }
433+
434+ /// move some table to another compaction-group. Create a new compaction group if it does not
435+ /// exist.
436+ #[ named]
437+ pub async fn move_state_table_to_compaction_group (
438+ & self ,
439+ parent_group_id : CompactionGroupId ,
440+ table_ids : & [ StateTableId ] ,
441+ target_group_id : Option < CompactionGroupId > ,
442+ allow_split_by_table : bool ,
429443 ) -> Result < CompactionGroupId > {
430444 if table_ids. is_empty ( ) {
431445 return Ok ( parent_group_id) ;
@@ -453,120 +467,176 @@ impl<S: MetaStore> HummockManager<S> {
453467 parent_group_id
454468 ) ) ) ;
455469 }
470+ if let Some ( compaction_group_id) = target_group_id {
471+ if !versioning. check_branched_sst_in_target_group (
472+ & table_ids,
473+ & parent_group_id,
474+ & compaction_group_id,
475+ ) {
476+ return Err ( Error :: CompactionGroup ( format ! (
477+ "invalid split attempt for group {}: we shall wait some time for parent group and target group could compact stale sst files" ,
478+ parent_group_id
479+ ) ) ) ;
480+ }
481+ }
456482
457483 let mut new_version_delta = BTreeMapEntryTransaction :: new_insert (
458484 & mut versioning. hummock_version_deltas ,
459485 current_version. id + 1 ,
460486 build_version_delta_after_version ( current_version) ,
461487 ) ;
462-
463- // Remove tables from parent group.
464- for table_id in & table_ids {
465- let group_deltas = & mut new_version_delta
466- . group_deltas
467- . entry ( parent_group_id)
468- . or_default ( )
469- . group_deltas ;
470- group_deltas. push ( GroupDelta {
471- delta_type : Some ( DeltaType :: GroupMetaChange ( GroupMetaChange {
472- table_ids_remove : vec ! [ * table_id] ,
473- ..Default :: default ( )
474- } ) ) ,
475- } ) ;
476- }
477-
478- // Add tables to new group.
479- let new_group_id = self
480- . env
481- . id_gen_manager ( )
482- . generate :: < { IdCategory :: CompactionGroup } > ( )
483- . await ?;
484488 let new_sst_start_id = self
485489 . env
486490 . id_gen_manager ( )
487491 . generate_interval :: < { IdCategory :: HummockSstableId } > (
488- versioning . current_version . count_new_ssts_in_group_split (
492+ current_version. count_new_ssts_in_group_split (
489493 parent_group_id,
490- & HashSet :: from_iter ( table_ids. iter ( ) . cloned ( ) ) ,
494+ HashSet :: from_iter ( table_ids. clone ( ) ) ,
491495 ) ,
492496 )
493497 . await ?;
494- let group_deltas = & mut new_version_delta
495- . group_deltas
496- . entry ( new_group_id)
497- . or_default ( )
498- . group_deltas ;
499- let config = self
500- . compaction_group_manager
501- . read ( )
502- . await
503- . get_compaction_group_config ( new_group_id)
504- . compaction_config
505- . as_ref ( )
506- . clone ( ) ;
507- group_deltas. push ( GroupDelta {
508- delta_type : Some ( DeltaType :: GroupConstruct ( GroupConstruct {
509- group_config : Some ( config) ,
510- group_id : new_group_id,
511- parent_group_id,
512- table_ids,
513- new_sst_start_id,
514- } ) ) ,
515- } ) ;
498+ let mut new_group = None ;
499+ let target_compaction_group_id = match target_group_id {
500+ Some ( compaction_group_id) => {
501+ match current_version. levels . get ( & compaction_group_id) {
502+ Some ( group) => {
503+ for table_id in & table_ids {
504+ if group. member_table_ids . contains ( table_id) {
505+ return Err ( Error :: CompactionGroup ( format ! (
506+ "table {} already exist in group {}" ,
507+ * table_id, compaction_group_id,
508+ ) ) ) ;
509+ }
510+ }
511+ }
512+ None => {
513+ return Err ( Error :: CompactionGroup ( format ! (
514+ "target group {} does not exist" ,
515+ compaction_group_id,
516+ ) ) ) ;
517+ }
518+ }
519+ let group_deltas = & mut new_version_delta
520+ . group_deltas
521+ . entry ( compaction_group_id)
522+ . or_default ( )
523+ . group_deltas ;
524+ group_deltas. push ( GroupDelta {
525+ delta_type : Some ( DeltaType :: GroupTableChange ( GroupTableChange {
526+ table_ids : table_ids. to_vec ( ) ,
527+ origin_group_id : parent_group_id,
528+ target_group_id : compaction_group_id,
529+ new_sst_start_id,
530+ } ) ) ,
531+ } ) ;
532+ compaction_group_id
533+ }
534+ None => {
535+ // All NewCompactionGroup pairs are mapped to one new compaction group.
536+ let new_compaction_group_id = self
537+ . env
538+ . id_gen_manager ( )
539+ . generate :: < { IdCategory :: CompactionGroup } > ( )
540+ . await ?;
541+ let mut config = self
542+ . compaction_group_manager
543+ . read ( )
544+ . await
545+ . get_default_compaction_group_config ( ) ;
546+ config. split_by_state_table = allow_split_by_table;
547+
548+ new_version_delta. group_deltas . insert (
549+ new_compaction_group_id,
550+ GroupDeltas {
551+ group_deltas : vec ! [ GroupDelta {
552+ delta_type: Some ( DeltaType :: GroupConstruct ( GroupConstruct {
553+ group_config: Some ( config. clone( ) ) ,
554+ group_id: new_compaction_group_id,
555+ parent_group_id,
556+ new_sst_start_id,
557+ table_ids: table_ids. to_vec( ) ,
558+ } ) ) ,
559+ } ] ,
560+ } ,
561+ ) ;
516562
563+ new_group = Some ( ( new_compaction_group_id, config) ) ;
564+ new_version_delta. group_deltas . insert (
565+ parent_group_id,
566+ GroupDeltas {
567+ group_deltas : vec ! [ GroupDelta {
568+ delta_type: Some ( DeltaType :: GroupMetaChange ( GroupMetaChange {
569+ table_ids_remove: table_ids. to_vec( ) ,
570+ ..Default :: default ( )
571+ } ) ) ,
572+ } ] ,
573+ } ,
574+ ) ;
575+ new_compaction_group_id
576+ }
577+ } ;
517578 let mut branched_ssts = BTreeMapTransaction :: new ( & mut versioning. branched_ssts ) ;
518579 let mut trx = Transaction :: default ( ) ;
519580 new_version_delta. apply_to_txn ( & mut trx) ?;
520- self . env . meta_store ( ) . txn ( trx) . await ?;
581+ if let Some ( ( new_compaction_group_id, config) ) = new_group {
582+ let mut compaction_group_manager = self . compaction_group_manager . write ( ) . await ;
583+ let insert = BTreeMapEntryTransaction :: new_insert (
584+ & mut compaction_group_manager. compaction_groups ,
585+ new_compaction_group_id,
586+ CompactionGroup {
587+ group_id : new_compaction_group_id,
588+ compaction_config : Arc :: new ( config) ,
589+ } ,
590+ ) ;
591+ insert. apply_to_txn ( & mut trx) ?;
592+ self . env . meta_store ( ) . txn ( trx) . await ?;
593+ insert. commit ( ) ;
594+ } else {
595+ self . env . meta_store ( ) . txn ( trx) . await ?;
596+ }
521597 let sst_split_info = versioning
522598 . current_version
523599 . apply_version_delta ( & new_version_delta) ;
524600 // Updates SST split info
525- for ( object_id, sst_id, parent_old_sst_id , parent_new_sst_id) in sst_split_info {
601+ for ( object_id, sst_id, _parent_old_sst_id , parent_new_sst_id) in sst_split_info {
526602 match branched_ssts. get_mut ( object_id) {
527603 Some ( mut entry) => {
528- let p = entry. get_mut ( & parent_group_id) . unwrap ( ) ;
529- let parent_pos = p. iter ( ) . position ( |id| * id == parent_old_sst_id) . unwrap ( ) ;
530604 if let Some ( parent_new_sst_id) = parent_new_sst_id {
531- p [ parent_pos ] = parent_new_sst_id;
605+ entry . insert ( parent_group_id , parent_new_sst_id) ;
532606 } else {
533- p. remove ( parent_pos) ;
534- if p. is_empty ( ) {
535- entry. remove ( & parent_group_id) ;
536- }
607+ entry. remove ( & parent_group_id) ;
537608 }
538- entry. entry ( new_group_id ) . or_default ( ) . push ( sst_id) ;
609+ entry. insert ( target_compaction_group_id , sst_id) ;
539610 }
540611 None => {
541- branched_ssts. insert (
542- object_id,
543- if let Some ( parent_new_sst_id) = parent_new_sst_id {
544- [
545- ( parent_group_id, vec ! [ parent_new_sst_id] ) ,
546- ( new_group_id, vec ! [ sst_id] ) ,
547- ]
548- . into_iter ( )
549- . collect ( )
550- } else {
551- [ ( new_group_id, vec ! [ sst_id] ) ] . into_iter ( ) . collect ( )
552- } ,
553- ) ;
612+ let mut groups = HashMap :: from_iter ( [ ( target_compaction_group_id, sst_id) ] ) ;
613+ if let Some ( parent_new_sst_id) = parent_new_sst_id {
614+ groups. insert ( parent_group_id, parent_new_sst_id) ;
615+ }
616+ branched_ssts. insert ( object_id, groups) ;
554617 }
555618 }
556619 }
557620 new_version_delta. commit ( ) ;
558621 branched_ssts. commit_memory ( ) ;
559622 self . notify_last_version_delta ( versioning) ;
560-
561- Ok ( new_group_id)
623+ // Don't trigger compactions if we enable deterministic compaction
624+ if !self . env . opts . compaction_deterministic_test {
625+ // commit_epoch may contains SSTs from any compaction group
626+ self . try_send_compaction_request ( parent_group_id, compact_task:: TaskType :: SpaceReclaim ) ;
627+ self . try_send_compaction_request (
628+ target_compaction_group_id,
629+ compact_task:: TaskType :: SpaceReclaim ,
630+ ) ;
631+ }
632+ Ok ( target_compaction_group_id)
562633 }
563634}
564635
565636#[ derive( Default ) ]
566637pub ( super ) struct CompactionGroupManager {
567638 compaction_groups : BTreeMap < CompactionGroupId , CompactionGroup > ,
568- /// Provided default config, only used in test.
569- provided_default_config_for_test : CompactionConfig ,
639+ default_config : CompactionConfig ,
570640}
571641
572642impl CompactionGroupManager {
@@ -602,14 +672,20 @@ impl CompactionGroupManager {
602672 compaction_group_ids
603673 . iter ( )
604674 . map ( |id| {
605- let group = self . compaction_groups . get ( id) . cloned ( ) . unwrap_or_else ( || {
606- CompactionGroup :: new ( * id, self . provided_default_config_for_test . clone ( ) )
607- } ) ;
675+ let group = self
676+ . compaction_groups
677+ . get ( id)
678+ . cloned ( )
679+ . unwrap_or_else ( || CompactionGroup :: new ( * id, self . default_config . clone ( ) ) ) ;
608680 ( * id, group)
609681 } )
610682 . collect ( )
611683 }
612684
685+ fn get_default_compaction_group_config ( & self ) -> CompactionConfig {
686+ self . default_config . clone ( )
687+ }
688+
613689 async fn update_compaction_config < S : MetaStore > (
614690 & mut self ,
615691 compaction_group_ids : & [ CompactionGroupId ] ,
@@ -621,10 +697,7 @@ impl CompactionGroupManager {
621697 if !compaction_groups. contains_key ( compaction_group_id) {
622698 compaction_groups. insert (
623699 * compaction_group_id,
624- CompactionGroup :: new (
625- * compaction_group_id,
626- self . provided_default_config_for_test . clone ( ) ,
627- ) ,
700+ CompactionGroup :: new ( * compaction_group_id, self . default_config . clone ( ) ) ,
628701 ) ;
629702 }
630703 let group = compaction_groups. get ( compaction_group_id) . unwrap ( ) ;
0 commit comments