Skip to content

Commit b9a39de

Browse files
authored
refactor(meta): exclude table change log from time travel metadata (#20948) (#21130)
1 parent 40a92bc commit b9a39de

File tree

7 files changed

+88
-88
lines changed

7 files changed

+88
-88
lines changed

src/meta/src/hummock/manager/checkpoint.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl HummockManager {
148148
// `object_sizes` is used to calculate size of stale objects.
149149
let mut object_sizes = object_size_map(&old_checkpoint.version);
150150
// The set of object ids that once exist in any hummock version
151-
let mut versions_object_ids = old_checkpoint.version.get_object_ids();
151+
let mut versions_object_ids = old_checkpoint.version.get_object_ids(false);
152152
for (_, version_delta) in versioning
153153
.hummock_version_deltas
154154
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
@@ -189,11 +189,11 @@ impl HummockManager {
189189
),
190190
);
191191
}
192-
versions_object_ids.extend(version_delta.newly_added_object_ids());
192+
versions_object_ids.extend(version_delta.newly_added_object_ids(false));
193193
}
194194

195195
// Object ids that once exist in any hummock version but not exist in the latest hummock version
196-
let removed_object_ids = &versions_object_ids - &current_version.get_object_ids();
196+
let removed_object_ids = &versions_object_ids - &current_version.get_object_ids(false);
197197
let total_file_size = removed_object_ids
198198
.iter()
199199
.map(|t| {

src/meta/src/hummock/manager/time_travel.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl HummockManager {
5858
else {
5959
return Ok(());
6060
};
61-
guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids();
61+
guard.last_time_travel_snapshot_sst_ids = version.get_sst_ids(true);
6262
Ok(())
6363
}
6464

@@ -115,8 +115,8 @@ impl HummockManager {
115115
) = {
116116
(
117117
latest_valid_version.id,
118-
latest_valid_version.get_sst_ids(),
119-
latest_valid_version.get_object_ids(),
118+
latest_valid_version.get_sst_ids(true),
119+
latest_valid_version.get_object_ids(true),
120120
)
121121
};
122122
let mut object_ids_to_delete: HashSet<_> = HashSet::default();
@@ -184,7 +184,7 @@ impl HummockManager {
184184
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
185185
&delta_to_delete.version_delta.to_protobuf(),
186186
);
187-
let new_sst_ids = delta_to_delete.newly_added_sst_ids();
187+
let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
188188
// The SST ids added and then deleted by compaction between the 2 versions.
189189
sst_ids_to_delete.extend(&new_sst_ids - &latest_valid_version_sst_ids);
190190
if sst_ids_to_delete.len() >= delete_sst_batch_size {
@@ -195,7 +195,7 @@ impl HummockManager {
195195
)
196196
.await?;
197197
}
198-
let new_object_ids = delta_to_delete.newly_added_object_ids();
198+
let new_object_ids = delta_to_delete.newly_added_object_ids(true);
199199
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
200200
}
201201
let mut next_version_sst_ids = latest_valid_version_sst_ids;
@@ -214,7 +214,7 @@ impl HummockManager {
214214
&prev_version.version.to_protobuf(),
215215
)
216216
};
217-
let sst_ids = prev_version.get_sst_ids();
217+
let sst_ids = prev_version.get_sst_ids(true);
218218
// The SST ids deleted by compaction between the 2 versions.
219219
sst_ids_to_delete.extend(&sst_ids - &next_version_sst_ids);
220220
if sst_ids_to_delete.len() >= delete_sst_batch_size {
@@ -225,7 +225,7 @@ impl HummockManager {
225225
)
226226
.await?;
227227
}
228-
let new_object_ids = prev_version.get_object_ids();
228+
let new_object_ids = prev_version.get_object_ids(true);
229229
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
230230
next_version_sst_ids = sst_ids;
231231
}
@@ -379,7 +379,7 @@ impl HummockManager {
379379
);
380380

381381
let mut sst_ids = actual_version
382-
.get_sst_ids()
382+
.get_sst_ids(true)
383383
.into_iter()
384384
.collect::<VecDeque<_>>();
385385
let sst_count = sst_ids.len();
@@ -495,7 +495,7 @@ impl HummockManager {
495495
// `version_sst_ids` is used to update `last_time_travel_snapshot_sst_ids`.
496496
version_sst_ids = Some(
497497
version
498-
.get_sst_infos()
498+
.get_sst_infos(true)
499499
.filter_map(|s| {
500500
if s.table_ids
501501
.iter()
@@ -508,7 +508,7 @@ impl HummockManager {
508508
.collect(),
509509
);
510510
write_sstable_infos(
511-
version.get_sst_infos().filter(|s| {
511+
version.get_sst_infos(true).filter(|s| {
512512
!skip_sst_ids.contains(&s.sst_id)
513513
&& s.table_ids
514514
.iter()
@@ -537,7 +537,7 @@ impl HummockManager {
537537
return Ok(version_sst_ids);
538538
}
539539
let written = write_sstable_infos(
540-
delta.newly_added_sst_infos().filter(|s| {
540+
delta.newly_added_sst_infos(true).filter(|s| {
541541
!skip_sst_ids.contains(&s.sst_id)
542542
&& s.table_ids
543543
.iter()

src/meta/src/hummock/manager/versioning.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,13 @@ impl Versioning {
8989
min_pinned_version_id: HummockVersionId,
9090
) -> HashSet<HummockSstableObjectId> {
9191
// object ids in checkpoint version
92-
let mut tracked_object_ids = self.checkpoint.version.get_object_ids();
92+
let mut tracked_object_ids = self.checkpoint.version.get_object_ids(false);
9393
// add object ids added between checkpoint version and current version
9494
for (_, delta) in self.hummock_version_deltas.range((
9595
Excluded(self.checkpoint.version.id),
9696
Included(self.current_version.id),
9797
)) {
98-
tracked_object_ids.extend(delta.newly_added_object_ids());
98+
tracked_object_ids.extend(delta.newly_added_object_ids(false));
9999
}
100100
// add stale object ids before the checkpoint version
101101
tracked_object_ids.extend(

src/storage/backup/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl MetaSnapshotMetadata {
7070
Self {
7171
id,
7272
hummock_version_id: v.id,
73-
ssts: v.get_object_ids(),
73+
ssts: v.get_object_ids(false),
7474
format_version,
7575
remarks,
7676
state_table_info: v

src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -951,25 +951,41 @@ where
951951
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
952952
}
953953

954-
pub fn get_object_ids(&self) -> HashSet<HummockSstableObjectId> {
955-
self.get_sst_infos().map(|s| s.object_id()).collect()
954+
pub fn get_object_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
955+
self.get_sst_infos(exclude_change_log)
956+
.map(|s| s.object_id())
957+
.collect()
956958
}
957959

958-
pub fn get_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
959-
self.get_sst_infos().map(|s| s.sst_id()).collect()
960+
pub fn get_sst_ids(&self, exclude_change_log: bool) -> HashSet<HummockSstableObjectId> {
961+
self.get_sst_infos(exclude_change_log)
962+
.map(|s| s.sst_id())
963+
.collect()
960964
}
961965

962-
pub fn get_sst_infos(&self) -> impl Iterator<Item = &T> {
966+
pub fn get_sst_infos(&self, exclude_change_log: bool) -> impl Iterator<Item = &T> {
967+
let may_table_change_log = if exclude_change_log {
968+
None
969+
} else {
970+
Some(self.table_change_log.values())
971+
};
963972
self.get_combined_levels()
964973
.flat_map(|level| level.table_infos.iter())
965-
.chain(self.table_change_log.values().flat_map(|change_log| {
966-
change_log.iter().flat_map(|epoch_change_log| {
967-
epoch_change_log
968-
.old_value
969-
.iter()
970-
.chain(epoch_change_log.new_value.iter())
971-
})
972-
}))
974+
.chain(
975+
may_table_change_log
976+
.map(|v| {
977+
v.flat_map(|table_change_log| {
978+
table_change_log.iter().flat_map(|epoch_change_log| {
979+
epoch_change_log
980+
.old_value
981+
.iter()
982+
.chain(epoch_change_log.new_value.iter())
983+
})
984+
})
985+
})
986+
.into_iter()
987+
.flatten(),
988+
)
973989
}
974990
}
975991

@@ -1581,7 +1597,7 @@ mod tests {
15811597
)]),
15821598
..Default::default()
15831599
};
1584-
assert_eq!(version.get_object_ids().len(), 0);
1600+
assert_eq!(version.get_object_ids(false).len(), 0);
15851601

15861602
// Add to sub level
15871603
version
@@ -1599,7 +1615,7 @@ mod tests {
15991615
.into()],
16001616
..Default::default()
16011617
});
1602-
assert_eq!(version.get_object_ids().len(), 1);
1618+
assert_eq!(version.get_object_ids(false).len(), 1);
16031619

16041620
// Add to non sub level
16051621
version.levels.get_mut(&0).unwrap().levels.push(Level {
@@ -1611,7 +1627,7 @@ mod tests {
16111627
.into()],
16121628
..Default::default()
16131629
});
1614-
assert_eq!(version.get_object_ids().len(), 2);
1630+
assert_eq!(version.get_object_ids(false).len(), 2);
16151631
}
16161632

16171633
#[test]

src/storage/hummock_sdk/src/time_travel.rs

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ use std::collections::{HashMap, HashSet};
1616

1717
use risingwave_pb::hummock::hummock_version::PbLevels;
1818
use risingwave_pb::hummock::hummock_version_delta::{PbChangeLogDelta, PbGroupDeltas};
19-
use risingwave_pb::hummock::{group_delta, PbEpochNewChangeLog, PbLevel, PbSstableInfo};
19+
use risingwave_pb::hummock::{group_delta, PbLevel, PbSstableInfo};
2020

21-
use crate::change_log::{TableChangeLog, TableChangeLogCommon};
2221
use crate::compaction_group::StateTableId;
2322
use crate::level::Level;
2423
use crate::sstable_info::SstableInfo;
@@ -50,12 +49,6 @@ pub fn refill_version(
5049
.table_infos
5150
.retain(|t| t.table_ids.contains(&table_id));
5251
}
53-
version
54-
.table_change_log
55-
.retain(|t, _| t.table_id == table_id);
56-
for t in version.table_change_log.values_mut() {
57-
refill_table_change_log(t, sst_id_to_info);
58-
}
5952
}
6053

6154
fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>) {
@@ -64,20 +57,6 @@ fn refill_level(level: &mut Level, sst_id_to_info: &HashMap<HummockSstableId, Ss
6457
}
6558
}
6659

67-
fn refill_table_change_log(
68-
table_change_log: &mut TableChangeLog,
69-
sst_id_to_info: &HashMap<HummockSstableId, SstableInfo>,
70-
) {
71-
for c in table_change_log.iter_mut() {
72-
for s in &mut c.old_value {
73-
refill_sstable_info(s, sst_id_to_info);
74-
}
75-
for s in &mut c.new_value {
76-
refill_sstable_info(s, sst_id_to_info);
77-
}
78-
}
79-
}
80-
8160
/// Caller should ensure `sst_id_to_info` includes an entry corresponding to `sstable_info`.
8261
fn refill_sstable_info(
8362
sstable_info: &mut SstableInfo,
@@ -106,29 +85,8 @@ impl From<(&HummockVersion, &HashSet<StateTableId>)> for IncompleteHummockVersio
10685
.collect(),
10786
max_committed_epoch: version.max_committed_epoch,
10887
table_watermarks: version.table_watermarks.clone(),
109-
table_change_log: version
110-
.table_change_log
111-
.iter()
112-
.filter_map(|(table_id, change_log)| {
113-
if !time_travel_table_ids.contains(&table_id.table_id()) {
114-
return None;
115-
}
116-
debug_assert!(change_log.iter().all(|d| {
117-
d.new_value.iter().chain(d.old_value.iter()).all(|s| {
118-
s.table_ids
119-
.iter()
120-
.any(|tid| time_travel_table_ids.contains(tid))
121-
})
122-
}));
123-
let incomplete_table_change_log = change_log
124-
.iter()
125-
.map(|e| PbEpochNewChangeLog::from(e).into());
126-
Some((
127-
*table_id,
128-
TableChangeLogCommon::new(incomplete_table_change_log),
129-
))
130-
})
131-
.collect(),
88+
// time travel metadata doesn't include table change log
89+
table_change_log: HashMap::default(),
13290
state_table_info: version.state_table_info.clone(),
13391
}
13492
}

src/storage/hummock_sdk/src/version.rs

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -538,19 +538,33 @@ where
538538
///
539539
/// Note: the result can be false positive because we only collect the set of sst object ids in the `inserted_table_infos`,
540540
/// but it is possible that the object is moved or split from other compaction groups or levels.
541-
pub fn newly_added_object_ids(&self) -> HashSet<HummockSstableObjectId> {
542-
self.newly_added_sst_infos()
541+
pub fn newly_added_object_ids(
542+
&self,
543+
exclude_table_change_log: bool,
544+
) -> HashSet<HummockSstableObjectId> {
545+
self.newly_added_sst_infos(exclude_table_change_log)
543546
.map(|sst| sst.object_id())
544547
.collect()
545548
}
546549

547-
pub fn newly_added_sst_ids(&self) -> HashSet<HummockSstableObjectId> {
548-
self.newly_added_sst_infos()
550+
pub fn newly_added_sst_ids(
551+
&self,
552+
exclude_table_change_log: bool,
553+
) -> HashSet<HummockSstableObjectId> {
554+
self.newly_added_sst_infos(exclude_table_change_log)
549555
.map(|sst| sst.sst_id())
550556
.collect()
551557
}
552558

553-
pub fn newly_added_sst_infos(&self) -> impl Iterator<Item = &'_ T> {
559+
pub fn newly_added_sst_infos(
560+
&self,
561+
exclude_table_change_log: bool,
562+
) -> impl Iterator<Item = &'_ T> {
563+
let may_table_change_delta = if exclude_table_change_log {
564+
None
565+
} else {
566+
Some(self.change_log_delta.values())
567+
};
554568
self.group_deltas
555569
.values()
556570
.flat_map(|group_deltas| {
@@ -568,11 +582,23 @@ where
568582
sst_slice.into_iter().flatten()
569583
})
570584
})
571-
.chain(self.change_log_delta.values().flat_map(|delta| {
572-
// TODO: optimization: strip table change log
573-
let new_log = delta.new_log.as_ref().unwrap();
574-
new_log.new_value.iter().chain(new_log.old_value.iter())
575-
}))
585+
.chain(
586+
may_table_change_delta
587+
.map(|v| {
588+
v.flat_map(|delta| {
589+
delta
590+
.new_log
591+
.as_ref()
592+
.map(|new_log| {
593+
new_log.new_value.iter().chain(new_log.old_value.iter())
594+
})
595+
.into_iter()
596+
.flatten()
597+
})
598+
})
599+
.into_iter()
600+
.flatten(),
601+
)
576602
}
577603
}
578604

0 commit comments

Comments
 (0)