Skip to content

refactor(meta): exclude table change log from time travel metadata (#20948) #21130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl HummockManager {
// `object_sizes` is used to calculate size of stale objects.
let mut object_sizes = object_size_map(&old_checkpoint.version);
// The set of object ids that once exist in any hummock version
let mut versions_object_ids = old_checkpoint.version.get_object_ids();
let mut versions_object_ids = old_checkpoint.version.get_object_ids(false);
for (_, version_delta) in versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
Expand Down Expand Up @@ -189,11 +189,11 @@ impl HummockManager {
),
);
}
versions_object_ids.extend(version_delta.newly_added_object_ids());
versions_object_ids.extend(version_delta.newly_added_object_ids(false));
}

// Object ids that once exist in any hummock version but not exist in the latest hummock version
let removed_object_ids = &versions_object_ids - &current_version.get_object_ids();
let removed_object_ids = &versions_object_ids - &current_version.get_object_ids(false);
let total_file_size = removed_object_ids
.iter()
.map(|t| {
Expand Down
22 changes: 11 additions & 11 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl HummockManager {
else {
return Ok(());
};
guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
Ok(())
}

Expand Down Expand Up @@ -115,8 +115,8 @@ impl HummockManager {
) = {
(
latest_valid_version.id,
latest_valid_version.get_sst_ids(),
latest_valid_version.get_object_ids(),
latest_valid_version.get_sst_ids(true),
latest_valid_version.get_object_ids(true),
)
};
let mut object_ids_to_delete: HashSet<_> = HashSet::default();
Expand Down Expand Up @@ -184,7 +184,7 @@ impl HummockManager {
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
&delta_to_delete.version_delta.to_protobuf(),
);
let new_sst_ids = delta_to_delete.newly_added_sst_ids();
let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
// The SST ids added and then deleted by compaction between the 2 versions.
sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
if sst_ids_to_delete.len() >= delete_sst_batch_size {
Expand All @@ -195,7 +195,7 @@ impl HummockManager {
)
.await?;
}
let new_object_ids = delta_to_delete.newly_added_object_ids();
let new_object_ids = delta_to_delete.newly_added_object_ids(true);
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
}
let mut next_version_sst_ids = latest_valid_version_sst_ids;
Expand All @@ -214,7 +214,7 @@ impl HummockManager {
&prev_version.version.to_protobuf(),
)
};
let sst_ids = prev_version.get_sst_ids();
let sst_ids = prev_version.get_sst_ids(true);
// The SST ids deleted by compaction between the 2 versions.
sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
if sst_ids_to_delete.len() >= delete_sst_batch_size {
Expand All @@ -225,7 +225,7 @@ impl HummockManager {
)
.await?;
}
let new_object_ids = prev_version.get_object_ids();
let new_object_ids = prev_version.get_object_ids(true);
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
next_version_sst_ids = sst_ids;
}
Expand Down Expand Up @@ -379,7 +379,7 @@ impl HummockManager {
);

let mut sst_ids = actual_version
.get_sst_ids()
.get_sst_ids(true)
.into_iter()
.collect::<VecDeque<_>>();
let sst_count = sst_ids.len();
Expand Down Expand Up @@ -495,7 +495,7 @@ impl HummockManager {
// `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
version_sst_ids = Some(
version
.get_sst_infos()
.get_sst_infos(true)
.filter_map(|s| {
if s.table_ids
.iter()
Expand All @@ -508,7 +508,7 @@ impl HummockManager {
.collect(),
);
write_sstable_infos(
version.get_sst_infos().filter(|s| {
version.get_sst_infos(true).filter(|s| {
!skip_sst_ids.contains(&s.sst_id)
&& s.table_ids
.iter()
Expand Down Expand Up @@ -537,7 +537,7 @@ impl HummockManager {
return Ok(version_sst_ids);
}
let written = write_sstable_infos(
delta.newly_added_sst_infos().filter(|s| {
delta.newly_added_sst_infos(true).filter(|s| {
!skip_sst_ids.contains(&s.sst_id)
&& s.table_ids
.iter()
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ impl Versioning {
min_pinned_version_id: HummockVersionId,
) -> HashSet<HummockSstableObjectId> {
// object ids in checkpoint version
let mut tracked_object_ids = self.checkpoint.version.get_object_ids();
let mut tracked_object_ids = self.checkpoint.version.get_object_ids(false);
// add object ids added between checkpoint version and current version
for (_, delta) in self.hummock_version_deltas.range((
Excluded(self.checkpoint.version.id),
Included(self.current_version.id),
)) {
tracked_object_ids.extend(delta.newly_added_object_ids());
tracked_object_ids.extend(delta.newly_added_object_ids(false));
}
// add stale object ids before the checkpoint version
tracked_object_ids.extend(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl MetaSnapshotMetadata {
Self {
id,
hummock_version_id: v.id,
ssts: v.get_object_ids(),
ssts: v.get_object_ids(false),
format_version,
remarks,
state_table_info: v
Expand Down
48 changes: 32 additions & 16 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,25 +951,41 @@ where
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
}

pub fn get_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.object_id()).collect()
pub fn get_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos(exclude_change_log)
.map(|s| s.object_id())
.collect()
}

pub fn get_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.sst_id()).collect()
pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos(exclude_change_log)
.map(|s| s.sst_id())
.collect()
}

pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
let may_table_change_log = if exclude_change_log {
None
} else {
Some(self.table_change_log.values())
};
self.get_combined_levels()
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
change_log.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
}))
.chain(
may_table_change_log
.map(|v| {
v.flat_map(|table_change_log| {
table_change_log.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
})
})
.into_iter()
.flatten(),
)
}
}

Expand Down Expand Up @@ -1581,7 +1597,7 @@ mod tests {
)]),
..Default::default()
};
assert_eq!(version.get_object_ids().len(), 0);
assert_eq!(version.get_object_ids(false).len(), 0);

// Add to sub level
version
Expand All @@ -1599,7 +1615,7 @@ mod tests {
.into()],
..Default::default()
});
assert_eq!(version.get_object_ids().len(), 1);
assert_eq!(version.get_object_ids(false).len(), 1);

// Add to non sub level
version.levels.get_mut(&0).unwrap().levels.push(Level {
Expand All @@ -1611,7 +1627,7 @@ mod tests {
.into()],
..Default::default()
});
assert_eq!(version.get_object_ids().len(), 2);
assert_eq!(version.get_object_ids(false).len(), 2);
}

#[test]
Expand Down
48 changes: 3 additions & 45 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use std::collections::{HashMap, HashSet};

use risingwave_pb::hummock::hummock_version::PbLevels;
use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
use risingwave_pb::hummock::{group_delta, PbEpochNewChangeLog, PbLevel, PbSstableInfo};
use risingwave_pb::hummock::{group_delta, PbLevel, PbSstableInfo};

use crate::change_log::{TableChangeLog, TableChangeLogCommon};
use crate::compaction_group::StateTableId;
use crate::level::Level;
use crate::sstable_info::SstableInfo;
Expand Down Expand Up @@ -50,12 +49,6 @@ pub fn refill_version(
.table_infos
.retain(|t| t.table_ids.contains(&table_id));
}
version
.table_change_log
.retain(|t, _| t.table_id == table_id);
for t in version.table_change_log.values_mut() {
refill_table_change_log(t, sst_id_to_info);
}
}

fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
Expand All @@ -64,20 +57,6 @@ fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, Ss
}
}

fn refill_table_change_log(
table_change_log: &mut TableChangeLog,
sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
) {
for c in table_change_log.iter_mut() {
for s in &mut c.old_value {
refill_sstable_info(s, sst_id_to_info);
}
for s in &mut c.new_value {
refill_sstable_info(s, sst_id_to_info);
}
}
}

/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`.
fn refill_sstable_info(
sstable_info: &mut SstableInfo,
Expand Down Expand Up @@ -106,29 +85,8 @@ impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersio
.collect(),
max_committed_epoch: version.max_committed_epoch,
table_watermarks: version.table_watermarks.clone(),
table_change_log: version
.table_change_log
.iter()
.filter_map(|(table_id, change_log)| {
if !time_travel_table_ids.contains(&table_id.table_id()) {
return None;
}
debug_assert!(change_log.iter().all(|d| {
d.new_value.iter().chain(d.old_value.iter()).all(|s| {
s.table_ids
.iter()
.any(|tid| time_travel_table_ids.contains(tid))
})
}));
let incomplete_table_change_log = change_log
.iter()
.map(|e| PbEpochNewChangeLog::from(e).into());
Some((
*table_id,
TableChangeLogCommon::new(incomplete_table_change_log),
))
})
.collect(),
// time travel metadata doesn't include table change log
table_change_log: HashMap::default(),
state_table_info: version.state_table_info.clone(),
}
}
Expand Down
46 changes: 36 additions & 10 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,19 +538,33 @@ where
///
/// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
/// but it is possible that the object is moved or split from other compaction groups or levels.
pub fn newly_added_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos()
pub fn newly_added_object_ids(
&self,
exclude_table_change_log: bool,
) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(exclude_table_change_log)
.map(|sst| sst.object_id())
.collect()
}

pub fn newly_added_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos()
pub fn newly_added_sst_ids(
&self,
exclude_table_change_log: bool,
) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(exclude_table_change_log)
.map(|sst| sst.sst_id())
.collect()
}

pub fn newly_added_sst_infos(&self) -> impl Iterator<Item = &'_ T> {
pub fn newly_added_sst_infos(
&self,
exclude_table_change_log: bool,
) -> impl Iterator<Item = &'_ T> {
let may_table_change_delta = if exclude_table_change_log {
None
} else {
Some(self.change_log_delta.values())
};
self.group_deltas
.values()
.flat_map(|group_deltas| {
Expand All @@ -568,11 +582,23 @@ where
sst_slice.into_iter().flatten()
})
})
.chain(self.change_log_delta.values().flat_map(|delta| {
// TODO: optimization: strip table change log
let new_log = delta.new_log.as_ref().unwrap();
new_log.new_value.iter().chain(new_log.old_value.iter())
}))
.chain(
may_table_change_delta
.map(|v| {
v.flat_map(|delta| {
delta
.new_log
.as_ref()
.map(|new_log| {
new_log.new_value.iter().chain(new_log.old_value.iter())
})
.into_iter()
.flatten()
})
})
.into_iter()
.flatten(),
)
}
}

Expand Down