Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl HummockManager {
// `object_sizes` is used to calculate size of stale objects.
let mut object_sizes = object_size_map(&old_checkpoint.version);
// The set of object ids that once exist in any hummock version
let mut versions_object_ids = old_checkpoint.version.get_object_ids();
let mut versions_object_ids = old_checkpoint.version.get_object_ids(false);
for (_, version_delta) in versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
Expand Down Expand Up @@ -189,11 +189,11 @@ impl HummockManager {
),
);
}
versions_object_ids.extend(version_delta.newly_added_object_ids());
versions_object_ids.extend(version_delta.newly_added_object_ids(false));
}

// Object ids that once exist in any hummock version but not exist in the latest hummock version
let removed_object_ids = &versions_object_ids - &current_version.get_object_ids();
let removed_object_ids = &versions_object_ids - &current_version.get_object_ids(false);
let total_file_size = removed_object_ids
.iter()
.map(|t| {
Expand Down
22 changes: 11 additions & 11 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl HummockManager {
else {
return Ok(());
};
guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
Ok(())
}

Expand Down Expand Up @@ -117,8 +117,8 @@ impl HummockManager {
) = {
(
latest_valid_version.id,
latest_valid_version.get_sst_ids(),
latest_valid_version.get_object_ids(),
latest_valid_version.get_sst_ids(true),
latest_valid_version.get_object_ids(true),
)
};
let mut object_ids_to_delete: HashSet<_> = HashSet::default();
Expand Down Expand Up @@ -186,7 +186,7 @@ impl HummockManager {
let delta_to_delete = HummockVersionDelta::from_persisted_protobuf(
&delta_to_delete.version_delta.to_protobuf(),
);
let new_sst_ids = delta_to_delete.newly_added_sst_ids();
let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
// The SST ids added and then deleted by compaction between the 2 versions.
sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
if sst_ids_to_delete.len() >= delete_sst_batch_size {
Expand All @@ -197,7 +197,7 @@ impl HummockManager {
)
.await?;
}
let new_object_ids = delta_to_delete.newly_added_object_ids();
let new_object_ids = delta_to_delete.newly_added_object_ids(true);
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
}
let mut next_version_sst_ids = latest_valid_version_sst_ids;
Expand All @@ -214,7 +214,7 @@ impl HummockManager {
})?;
HummockVersion::from_persisted_protobuf(&prev_version.version.to_protobuf())
};
let sst_ids = prev_version.get_sst_ids();
let sst_ids = prev_version.get_sst_ids(true);
// The SST ids deleted by compaction between the 2 versions.
sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
if sst_ids_to_delete.len() >= delete_sst_batch_size {
Expand All @@ -225,7 +225,7 @@ impl HummockManager {
)
.await?;
}
let new_object_ids = prev_version.get_object_ids();
let new_object_ids = prev_version.get_object_ids(true);
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
next_version_sst_ids = sst_ids;
}
Expand Down Expand Up @@ -379,7 +379,7 @@ impl HummockManager {
);

let mut sst_ids = actual_version
.get_sst_ids()
.get_sst_ids(true)
.into_iter()
.collect::<VecDeque<_>>();
let sst_count = sst_ids.len();
Expand Down Expand Up @@ -512,13 +512,13 @@ impl HummockManager {
if let Some(version) = version {
version_sst_ids = Some(
version
.get_sst_infos_from_groups(&select_groups)
.get_sst_infos_from_groups_exclude_table_change_log(&select_groups)
.map(|s| s.sst_id)
.collect(),
);
write_sstable_infos(
version
.get_sst_infos_from_groups(&select_groups)
.get_sst_infos_from_groups_exclude_table_change_log(&select_groups)
.filter(|s| !skip_sst_ids.contains(&s.sst_id))
.unique_by(|s| s.sst_id),
txn,
Expand Down Expand Up @@ -546,7 +546,7 @@ impl HummockManager {
}
let written = write_sstable_infos(
delta
.newly_added_sst_infos(Some(&select_groups))
.newly_added_sst_infos(Some(&select_groups), true)
.filter(|s| !skip_sst_ids.contains(&s.sst_id))
.unique_by(|s| s.sst_id),
txn,
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ impl Versioning {
min_pinned_version_id: HummockVersionId,
) -> HashSet<HummockSstableObjectId> {
// object ids in checkpoint version
let mut tracked_object_ids = self.checkpoint.version.get_object_ids();
let mut tracked_object_ids = self.checkpoint.version.get_object_ids(false);
// add object ids added between checkpoint version and current version
for (_, delta) in self.hummock_version_deltas.range((
Excluded(self.checkpoint.version.id),
Included(self.current_version.id),
)) {
tracked_object_ids.extend(delta.newly_added_object_ids());
tracked_object_ids.extend(delta.newly_added_object_ids(false));
}
// add stale object ids before the checkpoint version
tracked_object_ids.extend(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl MetaSnapshotMetadata {
Self {
id,
hummock_version_id: v.id,
ssts: v.get_object_ids(),
ssts: v.get_object_ids(false),
format_version,
remarks,
state_table_info: v
Expand Down
53 changes: 30 additions & 23 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,41 @@ impl HummockVersion {
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
}

pub fn get_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.object_id).collect()
pub fn get_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos(exclude_change_log)
.map(|s| s.object_id)
.collect()
}

pub fn get_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.sst_id).collect()
pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos(exclude_change_log)
.map(|s| s.sst_id)
.collect()
}

pub fn get_sst_infos(&self) -> impl Iterator<Item = &SstableInfo> {
pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &SstableInfo> {
let may_table_change_log = if exclude_change_log {
None
} else {
Some(self.table_change_log.values())
};
self.get_combined_levels()
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
change_log.0.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
}))
.chain(
may_table_change_log
.map(|v| {
v.flat_map(|table_change_log| {
table_change_log.0.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
})
})
.into_iter()
.flatten(),
)
}

// only scan the sst infos from levels in the specified compaction group (without table change log)
Expand All @@ -122,7 +138,7 @@ impl HummockVersion {
/// `get_sst_infos_from_groups` doesn't guarantee that all returned sst info belongs to `select_group`.
/// i.e. `select_group` is just a hint.
/// We separate `get_sst_infos_from_groups` and `get_sst_infos` because `get_sst_infos_from_groups` may be further customized in the future.
pub fn get_sst_infos_from_groups<'a>(
pub fn get_sst_infos_from_groups_exclude_table_change_log<'a>(
&'a self,
select_group: &'a HashSet<CompactionGroupId>,
) -> impl Iterator<Item = &SstableInfo> + 'a {
Expand All @@ -137,15 +153,6 @@ impl HummockVersion {
})
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
.flat_map(|level| level.table_infos.iter())
.chain(self.table_change_log.values().flat_map(|change_log| {
// TODO: optimization: strip table change log
change_log.0.iter().flat_map(|epoch_change_log| {
epoch_change_log
.old_value
.iter()
.chain(epoch_change_log.new_value.iter())
})
}))
}

pub fn level_iter<F: FnMut(&Level) -> bool>(
Expand Down
38 changes: 3 additions & 35 deletions src/storage/hummock_sdk/src/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use std::collections::{HashMap, HashSet};

use risingwave_pb::hummock::hummock_version::PbLevels;
use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
use risingwave_pb::hummock::{PbEpochNewChangeLog, PbSstableInfo};
use risingwave_pb::hummock::PbSstableInfo;

use crate::change_log::{TableChangeLog, TableChangeLogCommon};
use crate::level::Level;
use crate::sstable_info::SstableInfo;
use crate::version::{
Expand Down Expand Up @@ -48,12 +47,6 @@ pub fn refill_version(
.table_infos
.retain(|t| t.table_ids.contains(&table_id));
}
version
.table_change_log
.retain(|t, _| t.table_id == table_id);
for t in version.table_change_log.values_mut() {
refill_table_change_log(t, sst_id_to_info);
}
}

fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
Expand All @@ -62,20 +55,6 @@ fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, Ss
}
}

fn refill_table_change_log(
table_change_log: &mut TableChangeLog,
sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
) {
for c in &mut table_change_log.0 {
for s in &mut c.old_value {
refill_sstable_info(s, sst_id_to_info);
}
for s in &mut c.new_value {
refill_sstable_info(s, sst_id_to_info);
}
}
}

/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`.
fn refill_sstable_info(
sstable_info: &mut SstableInfo,
Expand Down Expand Up @@ -110,19 +89,8 @@ impl From<(&HummockVersion, &HashSet<CompactionGroupId>)> for IncompleteHummockV
.collect(),
max_committed_epoch: version.max_committed_epoch,
table_watermarks: version.table_watermarks.clone(),
// TODO: optimization: strip table change log based on select_group
table_change_log: version
.table_change_log
.iter()
.map(|(table_id, change_log)| {
let incomplete_table_change_log = change_log
.0
.iter()
.map(|e| PbEpochNewChangeLog::from(e).into())
.collect();
(*table_id, TableChangeLogCommon(incomplete_table_change_log))
})
.collect(),
// time travel metadata doesn't include table change log
table_change_log: HashMap::default(),
state_table_info: version.state_table_info.clone(),
}
}
Expand Down
42 changes: 33 additions & 9 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,22 +526,34 @@ impl HummockVersionDelta {
///
/// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
/// but it is possible that the object is moved or split from other compaction groups or levels.
pub fn newly_added_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(None)
pub fn newly_added_object_ids(
&self,
exclude_table_change_log: bool,
) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(None, exclude_table_change_log)
.map(|sst| sst.object_id)
.collect()
}

pub fn newly_added_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(None)
pub fn newly_added_sst_ids(
&self,
exclude_table_change_log: bool,
) -> HashSet<HummockSstableObjectId> {
self.newly_added_sst_infos(None, exclude_table_change_log)
.map(|sst| sst.sst_id)
.collect()
}

pub fn newly_added_sst_infos<'a>(
&'a self,
select_group: Option<&'a HashSet<CompactionGroupId>>,
exclude_table_change_log: bool,
) -> impl Iterator<Item = &SstableInfo> + 'a {
let may_table_change_delta = if exclude_table_change_log {
None
} else {
Some(self.change_log_delta.values())
};
self.group_deltas
.iter()
.filter_map(move |(cg_id, group_deltas)| {
Expand All @@ -568,11 +580,23 @@ impl HummockVersionDelta {
sst_slice.into_iter().flatten()
})
})
.chain(self.change_log_delta.values().flat_map(|delta| {
// TODO: optimization: strip table change log
let new_log = delta.new_log.as_ref().unwrap();
new_log.new_value.iter().chain(new_log.old_value.iter())
}))
.chain(
may_table_change_delta
.map(|v| {
v.flat_map(|delta| {
delta
.new_log
.as_ref()
.map(|new_log| {
new_log.new_value.iter().chain(new_log.old_value.iter())
})
.into_iter()
.flatten()
})
})
.into_iter()
.flatten(),
)
}

#[expect(deprecated)]
Expand Down