Skip to content

Commit 307f915

Browse files
codesomeaknuds1
andauthoredMay 30, 2024··
Add a db.CompactHeadWithoutTruncation function (#638)
* Add a db.CompactHeadWithoutTruncation function Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> * Apply suggestions from code review Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> --------- Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
1 parent e5b85c1 commit 307f915

File tree

2 files changed

+74
-3
lines changed

2 files changed

+74
-3
lines changed
 

‎tsdb/db.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -1249,7 +1249,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) {
12491249
// We do need to wait for any overlapping appenders that started previously to finish.
12501250
db.head.WaitForAppendersOverlapping(rh.MaxTime())
12511251

1252-
if err := db.compactHead(rh); err != nil {
1252+
if err := db.compactHead(rh, true); err != nil {
12531253
return fmt.Errorf("compact head: %w", err)
12541254
}
12551255
// Consider only successful compactions for WAL truncation.
@@ -1286,7 +1286,7 @@ func (db *DB) CompactHead(head *RangeHead) error {
12861286
db.cmtx.Lock()
12871287
defer db.cmtx.Unlock()
12881288

1289-
if err := db.compactHead(head); err != nil {
1289+
if err := db.compactHead(head, true); err != nil {
12901290
return fmt.Errorf("compact head: %w", err)
12911291
}
12921292

@@ -1296,6 +1296,18 @@ func (db *DB) CompactHead(head *RangeHead) error {
12961296
return nil
12971297
}
12981298

1299+
// CompactHeadWithoutTruncation compacts the given RangeHead but does not truncate the
1300+
// in-memory data and the WAL related to this compaction.
1301+
func (db *DB) CompactHeadWithoutTruncation(head *RangeHead) error {
1302+
db.cmtx.Lock()
1303+
defer db.cmtx.Unlock()
1304+
1305+
if err := db.compactHead(head, false); err != nil {
1306+
return fmt.Errorf("compact head without truncation: %w", err)
1307+
}
1308+
return nil
1309+
}
1310+
12991311
// CompactOOOHead compacts the OOO Head.
13001312
func (db *DB) CompactOOOHead(ctx context.Context) error {
13011313
db.cmtx.Lock()
@@ -1400,7 +1412,7 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
14001412

14011413
// compactHead compacts the given RangeHead.
14021414
// The compaction mutex should be held before calling this method.
1403-
func (db *DB) compactHead(head *RangeHead) error {
1415+
func (db *DB) compactHead(head *RangeHead, truncateMemory bool) error {
14041416
uid, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
14051417
if err != nil {
14061418
return fmt.Errorf("persist head block: %w", err)
@@ -1415,6 +1427,9 @@ func (db *DB) compactHead(head *RangeHead) error {
14151427
}
14161428
return fmt.Errorf("reloadBlocks blocks: %w", err)
14171429
}
1430+
if !truncateMemory {
1431+
return nil
1432+
}
14181433
if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
14191434
return fmt.Errorf("head memory truncate: %w", err)
14201435
}

‎tsdb/db_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -7057,3 +7057,59 @@ func TestAbortBlockCompactions(t *testing.T) {
70577057
require.True(t, db.head.compactable(), "head should be compactable")
70587058
require.Equal(t, 4, compactions, "expected 4 compactions to be completed")
70597059
}
7060+
7061+
func TestCompactHeadWithoutTruncation(t *testing.T) {
7062+
setupDB := func() *DB {
7063+
db := openTestDB(t, nil, nil)
7064+
t.Cleanup(func() {
7065+
require.NoError(t, db.Close())
7066+
})
7067+
db.DisableCompactions()
7068+
7069+
// Add samples to the head.
7070+
lbls := labels.FromStrings("foo", "bar")
7071+
app := db.Appender(context.Background())
7072+
_, err := app.Append(0, lbls, 0, 0)
7073+
require.NoError(t, err)
7074+
_, err = app.Append(0, lbls, DefaultBlockDuration/2, float64(DefaultBlockDuration/2))
7075+
require.NoError(t, err)
7076+
_, err = app.Append(0, lbls, DefaultBlockDuration-1, float64(DefaultBlockDuration-1))
7077+
require.NoError(t, err)
7078+
_, err = app.Append(0, lbls, 2*DefaultBlockDuration, float64(2*DefaultBlockDuration))
7079+
require.NoError(t, err)
7080+
require.NoError(t, app.Commit())
7081+
7082+
return db
7083+
}
7084+
7085+
testQuery := func(db *DB, expSamples []chunks.Sample) {
7086+
rh := NewRangeHead(db.Head(), math.MinInt64, math.MaxInt64)
7087+
q, err := NewBlockQuerier(rh, math.MinInt64, math.MaxInt64)
7088+
require.NoError(t, err)
7089+
ss := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
7090+
require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expSamples}, ss)
7091+
}
7092+
7093+
{ // Compact the head with truncation.
7094+
db := setupDB()
7095+
rh := NewRangeHead(db.Head(), 0, DefaultBlockDuration-1)
7096+
require.NoError(t, db.CompactHead(rh))
7097+
// Samples got truncated from the head.
7098+
testQuery(db, []chunks.Sample{
7099+
sample{t: 2 * DefaultBlockDuration, f: float64(2 * DefaultBlockDuration)},
7100+
})
7101+
}
7102+
7103+
{ // Compact the head without truncation.
7104+
db := setupDB()
7105+
rh := NewRangeHead(db.Head(), 0, DefaultBlockDuration-1)
7106+
require.NoError(t, db.CompactHeadWithoutTruncation(rh))
7107+
// All samples still exist in the head.
7108+
testQuery(db, []chunks.Sample{
7109+
sample{t: 0, f: 0},
7110+
sample{t: DefaultBlockDuration / 2, f: float64(DefaultBlockDuration / 2)},
7111+
sample{t: DefaultBlockDuration - 1, f: float64(DefaultBlockDuration - 1)},
7112+
sample{t: 2 * DefaultBlockDuration, f: float64(2 * DefaultBlockDuration)},
7113+
})
7114+
}
7115+
}

0 commit comments

Comments
 (0)
Please sign in to comment.