diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f7a6352bf5076..53ff3d4351b81 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -488,13 +488,23 @@ pub struct MetaDeveloperConfig { #[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")] pub actor_cnt_per_worker_parallelism_hard_limit: usize, - #[serde(default = "default::developer::hummock_time_travel_sst_info_fetch_batch_size")] /// Max number of SSTs fetched from meta store per SELECT, during time travel Hummock version replay. + #[serde(default = "default::developer::hummock_time_travel_sst_info_fetch_batch_size")] pub hummock_time_travel_sst_info_fetch_batch_size: usize, - #[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")] /// Max number of SSTs inserted into meta store per INSERT, during time travel metadata writing. + #[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")] pub hummock_time_travel_sst_info_insert_batch_size: usize, + + #[serde(default = "default::developer::hummock_delta_log_delete_batch_size")] + pub hummock_delta_log_delete_batch_size: usize, + + #[serde(default = "default::developer::time_travel_vacuum_interval_sec")] + pub time_travel_vacuum_interval_sec: u64, + + /// Max number of epoch-to-version inserted into meta store per INSERT, during time travel metadata writing. + #[serde(default = "default::developer::hummock_time_travel_epoch_version_insert_batch_size")] + pub hummock_time_travel_epoch_version_insert_batch_size: usize, } /// The section `[server]` in `risingwave.toml`. @@ -1412,7 +1422,7 @@ pub mod default { } pub fn vacuum_spin_interval_ms() -> u64 { - 200 + 100 } pub fn hummock_version_checkpoint_interval_sec() -> u64 { @@ -1964,6 +1974,16 @@ pub mod default { pub fn hummock_time_travel_sst_info_insert_batch_size() -> usize { 100 } + pub fn hummock_delta_log_delete_batch_size() -> usize { + 512 + } + + pub fn time_travel_vacuum_interval_sec() -> u64 { + 30 + } + pub fn hummock_time_travel_epoch_version_insert_batch_size() -> usize { + 1000 + } pub fn memory_controller_threshold_aggressive() -> f64 { 0.9 diff --git a/src/config/docs.md b/src/config/docs.md index 5fc1211983981..2b2800e83f608 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -64,7 +64,7 @@ This page is automatically generated by `./risedev generate-example-config` | table_write_throughput_threshold | The threshold of write throughput to trigger a group split. Increase this configuration value to avoid split too many groups with few data write. | 16777216 | | unrecognized | | | | vacuum_interval_sec | Interval of invoking a vacuum job, to remove stale metadata from meta store and objects from object store. | 30 | -| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 200 | +| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 100 | ## meta.compaction_config diff --git a/src/config/example.toml b/src/config/example.toml index 0c5bb6e67d180..9983041b1692d 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -21,7 +21,7 @@ gc_history_retention_time_sec = 21600 max_inflight_time_travel_query = 1000 periodic_compaction_interval_sec = 60 vacuum_interval_sec = 30 -vacuum_spin_interval_ms = 200 +vacuum_spin_interval_ms = 100 hummock_version_checkpoint_interval_sec = 30 enable_hummock_data_archive = false hummock_time_travel_snapshot_interval = 100 @@ -87,6 +87,9 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100 meta_actor_cnt_per_worker_parallelism_hard_limit = 400 meta_hummock_time_travel_sst_info_fetch_batch_size = 10000 meta_hummock_time_travel_sst_info_insert_batch_size = 100 +meta_hummock_delta_log_delete_batch_size = 512 +meta_time_travel_vacuum_interval_sec = 30 +meta_hummock_time_travel_epoch_version_insert_batch_size = 1000 [meta.meta_store_config] max_connections = 10 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 269b0eb6b8fb7..66a672a326df1 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -354,6 +354,10 @@ pub fn start( compaction_deterministic_test: config.meta.enable_compaction_deterministic, default_parallelism: config.meta.default_parallelism, vacuum_interval_sec: config.meta.vacuum_interval_sec, + time_travel_vacuum_interval_sec: config + .meta + .developer + .time_travel_vacuum_interval_sec, vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms, hummock_version_checkpoint_interval_sec: config .meta @@ -370,6 +374,14 @@ pub fn start( .meta .developer .hummock_time_travel_sst_info_insert_batch_size, + hummock_delta_log_delete_batch_size: config + .meta + .developer + .hummock_delta_log_delete_batch_size, + hummock_time_travel_epoch_version_insert_batch_size: config + .meta + .developer + .hummock_time_travel_epoch_version_insert_batch_size, min_delta_log_num_for_hummock_version_checkpoint: config .meta .min_delta_log_num_for_hummock_version_checkpoint, diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 0ff0dd67ab743..20772153d44b1 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -182,23 +182,23 @@ impl HummockManager { let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let context_info = self.context_info.read().await; - let deltas_to_delete = versioning + let deltas_to_delete_count = versioning .hummock_version_deltas .range(..=versioning.checkpoint.version.id) - .map(|(k, _)| *k) - .collect_vec(); + .count(); // If there is any safe point, skip this to ensure meta backup has required delta logs to // replay version. if !context_info.version_safe_points.is_empty() { - return Ok((0, deltas_to_delete.len())); + return Ok((0, deltas_to_delete_count)); } - let mut hummock_version_deltas = - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas); - let batch = deltas_to_delete - .iter() + let batch = versioning + .hummock_version_deltas + .range(..=versioning.checkpoint.version.id) + .map(|(k, _)| *k) .take(batch_size) - .cloned() .collect_vec(); + let mut hummock_version_deltas = + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas); if batch.is_empty() { return Ok((0, 0)); } @@ -212,7 +212,7 @@ impl HummockManager { drop(versioning_guard); self.check_state_consistency().await; } - Ok((batch.len(), deltas_to_delete.len() - batch.len())) + Ok((batch.len(), deltas_to_delete_count - batch.len())) } /// Filters by Hummock version and Writes GC history. @@ -469,7 +469,7 @@ impl HummockManager { /// /// Returns number of deleted deltas pub async fn delete_metadata(&self) -> MetaResult { - let batch_size = 64usize; + let batch_size = self.env.opts.hummock_delta_log_delete_batch_size; let mut total_deleted = 0; loop { if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 { @@ -482,7 +482,10 @@ impl HummockManager { break; } } + Ok(total_deleted) + } + pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> { let current_epoch_time = Epoch::now().physical_time(); let epoch_watermark = Epoch::from_physical_time( current_epoch_time.saturating_sub( @@ -494,8 +497,7 @@ impl HummockManager { ) .0; self.truncate_time_travel_metadata(epoch_watermark).await?; - - Ok(total_deleted) + Ok(()) } /// Deletes stale SST objects from object store. diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 2d16fc31c1049..f6c28e9237bf4 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -381,11 +381,29 @@ impl HummockManager { self.write_checkpoint(&versioning_guard.checkpoint).await?; checkpoint_version }; - for version_delta in hummock_version_deltas.values() { - if version_delta.prev_id == redo_state.id { - redo_state.apply_version_delta(version_delta); + let mut applied_delta_count = 0; + let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count(); + tracing::info!( + total_delta = hummock_version_deltas.len(), + total_to_apply, + "Start redo Hummock version." + ); + for version_delta in hummock_version_deltas + .range(redo_state.id + 1..) + .map(|(_, v)| v) + { + assert_eq!( + version_delta.prev_id, redo_state.id, + "delta prev_id {}, redo state id {}", + version_delta.prev_id, redo_state.id + ); + redo_state.apply_version_delta(version_delta); + applied_delta_count += 1; + if applied_delta_count % 1000 == 0 { + tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}."); } } + tracing::info!("Finish redo Hummock version."); versioning_guard.version_stats = hummock_version_stats::Entity::find() .one(&meta_store.conn) .await diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index af5a3d2d7dc44..d4e6ed34e68ab 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -456,8 +456,12 @@ impl HummockManager { version_id: Set(version_id.try_into().unwrap()), }; batch.push(m); - // Use the same batch size as hummock_time_travel_sst_info_insert_batch_size. - if batch.len() >= self.env.opts.hummock_time_travel_sst_info_insert_batch_size { + if batch.len() + >= self + .env + .opts + .hummock_time_travel_epoch_version_insert_batch_size + { // There should be no conflict rows. hummock_epoch_to_version::Entity::insert_many(std::mem::take(&mut batch)) .do_nothing() diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index cb4be256a4551..7802c2de7bb36 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -56,6 +56,10 @@ pub fn start_hummock_workers( hummock_manager.clone(), Duration::from_secs(meta_opts.vacuum_interval_sec), ), + start_vacuum_time_travel_metadata_loop( + hummock_manager.clone(), + Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec), + ), ]; workers } @@ -87,6 +91,32 @@ pub fn start_vacuum_metadata_loop( (join_handle, shutdown_tx) } +pub fn start_vacuum_time_travel_metadata_loop( + hummock_manager: HummockManagerRef, + interval: Duration, +) -> (JoinHandle<()>, Sender<()>) { + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + let join_handle = tokio::spawn(async move { + let mut min_trigger_interval = tokio::time::interval(interval); + min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + tokio::select! { + // Wait for interval + _ = min_trigger_interval.tick() => {}, + // Shutdown vacuum + _ = &mut shutdown_rx => { + tracing::info!("Vacuum time travel metadata loop is stopped"); + return; + } + } + if let Err(err) = hummock_manager.delete_time_travel_metadata().await { + tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error"); + } + } + }); + (join_handle, shutdown_tx) +} + pub fn start_checkpoint_loop( hummock_manager: HummockManagerRef, backup_manager: BackupManagerRef, diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 46e34d51e5ab8..9a4eeadc645ab 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -109,12 +109,15 @@ pub struct MetaOpts { /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of /// meta node. pub vacuum_spin_interval_ms: u64, + pub time_travel_vacuum_interval_sec: u64, /// Interval of hummock version checkpoint. pub hummock_version_checkpoint_interval_sec: u64, pub enable_hummock_data_archive: bool, pub hummock_time_travel_snapshot_interval: u64, pub hummock_time_travel_sst_info_fetch_batch_size: usize, pub hummock_time_travel_sst_info_insert_batch_size: usize, + pub hummock_delta_log_delete_batch_size: usize, + pub hummock_time_travel_epoch_version_insert_batch_size: usize, /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is @@ -256,12 +259,15 @@ impl MetaOpts { compaction_deterministic_test: false, default_parallelism: DefaultParallelism::Full, vacuum_interval_sec: 30, + time_travel_vacuum_interval_sec: 30, vacuum_spin_interval_ms: 0, hummock_version_checkpoint_interval_sec: 30, enable_hummock_data_archive: false, hummock_time_travel_snapshot_interval: 0, hummock_time_travel_sst_info_fetch_batch_size: 10_000, hummock_time_travel_sst_info_insert_batch_size: 10, + hummock_delta_log_delete_batch_size: 1000, + hummock_time_travel_epoch_version_insert_batch_size: 1000, min_delta_log_num_for_hummock_version_checkpoint: 1, min_sst_retention_time_sec: 3600 * 24 * 7, full_gc_interval_sec: 3600 * 24 * 7,