Skip to content

Commit 3a33031

Browse files
authored
fix(meta): fix time travel GC bug (#20107)
1 parent 9e1cd76 commit 3a33031

File tree

3 files changed

+57
-28
lines changed

3 files changed

+57
-28
lines changed

src/meta/src/hummock/manager/gc.rs

+9-26
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use std::cmp;
1616
use std::collections::HashSet;
17-
use std::ops::Bound::{Excluded, Included};
1817
use std::ops::DerefMut;
1918
use std::sync::atomic::{AtomicBool, Ordering};
2019
use std::time::{Duration, SystemTime};
@@ -223,30 +222,8 @@ impl HummockManager {
223222
) -> Result<Vec<HummockSstableObjectId>> {
224223
// This lock ensures `commit_epoch` and `report_compat_task` can see the latest GC history during sanity check.
225224
let versioning = self.versioning.read().await;
226-
let tracked_object_ids: HashSet<HummockSstableObjectId> = {
227-
let context_info = self.context_info.read().await;
228-
// object ids in checkpoint version
229-
let mut tracked_object_ids = versioning.checkpoint.version.get_object_ids();
230-
// add object ids added between checkpoint version and current version
231-
for (_, delta) in versioning.hummock_version_deltas.range((
232-
Excluded(versioning.checkpoint.version.id),
233-
Included(versioning.current_version.id),
234-
)) {
235-
tracked_object_ids.extend(delta.newly_added_object_ids());
236-
}
237-
// add stale object ids before the checkpoint version
238-
let min_pinned_version_id = context_info.min_pinned_version_id();
239-
tracked_object_ids.extend(
240-
versioning
241-
.checkpoint
242-
.stale_objects
243-
.iter()
244-
.filter(|(version_id, _)| **version_id >= min_pinned_version_id)
245-
.flat_map(|(_, objects)| objects.id.iter())
246-
.cloned(),
247-
);
248-
tracked_object_ids
249-
};
225+
let tracked_object_ids: HashSet<HummockSstableObjectId> = versioning
226+
.get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
250227
let to_delete = object_ids.filter(|object_id| !tracked_object_ids.contains(object_id));
251228
self.write_gc_history(to_delete.clone()).await?;
252229
Ok(to_delete.collect())
@@ -561,9 +538,15 @@ impl HummockManager {
561538
};
562539
// Objects pinned by either meta backup or time travel should be filtered out.
563540
let backup_pinned: HashSet<_> = backup_manager.list_pinned_ssts();
541+
// The version_pinned is obtained after the candidate object_ids for deletion, which is new enough for filtering purpose.
542+
let version_pinned = {
543+
let versioning = self.versioning.read().await;
544+
versioning
545+
.get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
546+
};
564547
let object_ids = object_ids
565548
.into_iter()
566-
.filter(|s| !backup_pinned.contains(s));
549+
.filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(s));
567550
let object_ids = self.filter_out_objects_by_time_travel(object_ids).await?;
568551
// Retry is not necessary. Full GC will handle these objects eventually.
569552
self.delete_objects(object_ids.into_iter().collect())

src/meta/src/hummock/manager/time_travel.rs

+22-2
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,18 @@ impl HummockManager {
109109
txn.commit().await?;
110110
return Ok(());
111111
};
112-
let (latest_valid_version_id, latest_valid_version_sst_ids) =
113-
{ (latest_valid_version.id, latest_valid_version.get_sst_ids()) };
112+
let (
113+
latest_valid_version_id,
114+
latest_valid_version_sst_ids,
115+
latest_valid_version_object_ids,
116+
) = {
117+
(
118+
latest_valid_version.id,
119+
latest_valid_version.get_sst_ids(),
120+
latest_valid_version.get_object_ids(),
121+
)
122+
};
123+
let mut object_ids_to_delete: HashSet<_> = HashSet::default();
114124
let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
115125
hummock_time_travel_version::Entity::find()
116126
.select_only()
@@ -154,6 +164,8 @@ impl HummockManager {
154164
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
155165
.exec(&txn)
156166
.await?;
167+
let new_object_ids = delta_to_delete.newly_added_object_ids();
168+
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
157169
tracing::debug!(
158170
delta_id = delta_to_delete.id.to_u64(),
159171
"delete {} rows from hummock_sstable_info",
@@ -181,13 +193,21 @@ impl HummockManager {
181193
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
182194
.exec(&txn)
183195
.await?;
196+
let new_object_ids = prev_version.get_object_ids();
197+
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
184198
tracing::debug!(
185199
prev_version_id,
186200
"delete {} rows from hummock_sstable_info",
187201
res.rows_affected
188202
);
189203
next_version_sst_ids = sst_ids;
190204
}
205+
if !object_ids_to_delete.is_empty() {
206+
// IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
207+
// So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
208+
self.gc_manager
209+
.add_may_delete_object_ids(object_ids_to_delete.into_iter());
210+
}
191211

192212
let res = hummock_time_travel_version::Entity::delete_many()
193213
.filter(

src/meta/src/hummock/manager/versioning.rs

+26
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::cmp;
16+
use std::collections::Bound::{Excluded, Included};
1617
use std::collections::{BTreeMap, HashMap, HashSet};
1718

1819
use itertools::Itertools;
@@ -82,6 +83,31 @@ impl Versioning {
8283
pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
8384
self.time_travel_snapshot_interval_counter = u64::MAX;
8485
}
86+
87+
pub fn get_tracked_object_ids(
88+
&self,
89+
min_pinned_version_id: HummockVersionId,
90+
) -> HashSet<HummockSstableObjectId> {
91+
// object ids in checkpoint version
92+
let mut tracked_object_ids = self.checkpoint.version.get_object_ids();
93+
// add object ids added between checkpoint version and current version
94+
for (_, delta) in self.hummock_version_deltas.range((
95+
Excluded(self.checkpoint.version.id),
96+
Included(self.current_version.id),
97+
)) {
98+
tracked_object_ids.extend(delta.newly_added_object_ids());
99+
}
100+
// add stale object ids before the checkpoint version
101+
tracked_object_ids.extend(
102+
self.checkpoint
103+
.stale_objects
104+
.iter()
105+
.filter(|(version_id, _)| **version_id >= min_pinned_version_id)
106+
.flat_map(|(_, objects)| objects.id.iter())
107+
.cloned(),
108+
);
109+
tracked_object_ids
110+
}
85111
}
86112

87113
impl HummockManager {

0 commit comments

Comments
 (0)