Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit c7c4484

Browse files
committed
Move tombstones to it's own package.
Signed-off-by: Callum Styan <[email protected]>
1 parent 2f88210 commit c7c4484

18 files changed

+174
-179
lines changed

block.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
"github.com/prometheus/tsdb/fileutil"
3333
"github.com/prometheus/tsdb/index"
3434
"github.com/prometheus/tsdb/labels"
35-
"github.com/prometheus/tsdb/record"
35+
"github.com/prometheus/tsdb/tombstones"
3636
)
3737

3838
// IndexWriter serializes the index for a block of series data.
@@ -137,7 +137,7 @@ type BlockReader interface {
137137
Chunks() (ChunkReader, error)
138138

139139
// Tombstones returns a TombstoneReader over the block's deleted data.
140-
Tombstones() (record.TombstoneReader, error)
140+
Tombstones() (tombstones.TombstoneReader, error)
141141

142142
// MinTime returns the min time of the block.
143143
MinTime() int64
@@ -282,7 +282,7 @@ type Block struct {
282282

283283
chunkr ChunkReader
284284
indexr IndexReader
285-
tombstones record.TombstoneReader
285+
tombstones tombstones.TombstoneReader
286286

287287
logger log.Logger
288288

@@ -324,7 +324,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
324324
}
325325
closers = append(closers, ir)
326326

327-
tr, sizeTomb, err := record.ReadTombstones(dir)
327+
tr, sizeTomb, err := tombstones.ReadTombstones(dir)
328328
if err != nil {
329329
return nil, err
330330
}
@@ -415,7 +415,7 @@ func (pb *Block) Chunks() (ChunkReader, error) {
415415
}
416416

417417
// Tombstones returns a new TombstoneReader against the block data.
418-
func (pb *Block) Tombstones() (record.TombstoneReader, error) {
418+
func (pb *Block) Tombstones() (tombstones.TombstoneReader, error) {
419419
if err := pb.startRead(); err != nil {
420420
return nil, err
421421
}
@@ -484,7 +484,7 @@ func (r blockIndexReader) Close() error {
484484
}
485485

486486
type blockTombstoneReader struct {
487-
record.TombstoneReader
487+
tombstones.TombstoneReader
488488
b *Block
489489
}
490490

@@ -520,7 +520,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
520520
ir := pb.indexr
521521

522522
// Choose only valid postings which have chunks in the time-range.
523-
stones := record.NewMemTombstones()
523+
stones := tombstones.NewMemTombstones()
524524

525525
var lset labels.Labels
526526
var chks []chunks.Meta
@@ -536,7 +536,7 @@ Outer:
536536
if chk.OverlapsClosedInterval(mint, maxt) {
537537
// Delete only until the current values and not beyond.
538538
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
539-
stones.AddInterval(p.At(), record.Interval{tmin, tmax})
539+
stones.AddInterval(p.At(), tombstones.Interval{tmin, tmax})
540540
continue Outer
541541
}
542542
}
@@ -546,7 +546,7 @@ Outer:
546546
return p.Err()
547547
}
548548

549-
err = pb.tombstones.Iter(func(id uint64, ivs record.Intervals) error {
549+
err = pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
550550
for _, iv := range ivs {
551551
stones.AddInterval(id, iv)
552552
}
@@ -558,7 +558,7 @@ Outer:
558558
pb.tombstones = stones
559559
pb.meta.Stats.NumTombstones = pb.tombstones.Total()
560560

561-
n, err := record.WriteTombstoneFile(pb.logger, pb.dir, pb.tombstones)
561+
n, err := tombstones.WriteTombstoneFile(pb.logger, pb.dir, pb.tombstones)
562562
if err != nil {
563563
return err
564564
}
@@ -576,7 +576,7 @@ Outer:
576576
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
577577
numStones := 0
578578

579-
if err := pb.tombstones.Iter(func(id uint64, ivs record.Intervals) error {
579+
if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
580580
numStones += len(ivs)
581581
return nil
582582
}); err != nil {
@@ -611,7 +611,7 @@ func (pb *Block) Snapshot(dir string) error {
611611
for _, fname := range []string{
612612
metaFilename,
613613
indexFilename,
614-
record.TombstoneFilename,
614+
tombstones.TombstoneFilename,
615615
} {
616616
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
617617
return errors.Wrapf(err, "create snapshot %s", fname)

compact.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
"github.com/prometheus/tsdb/fileutil"
3636
"github.com/prometheus/tsdb/index"
3737
"github.com/prometheus/tsdb/labels"
38-
"github.com/prometheus/tsdb/record"
38+
"github.com/prometheus/tsdb/tombstones"
3939
)
4040

4141
// ExponentialBlockRanges returns the time ranges based on the stepSize.
@@ -608,7 +608,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
608608
}
609609

610610
// Create an empty tombstones file.
611-
if _, err := record.WriteTombstoneFile(c.logger, tmp, record.NewMemTombstones()); err != nil {
611+
if _, err := tombstones.WriteTombstoneFile(c.logger, tmp, record.NewMemTombstones()); err != nil {
612612
return errors.Wrap(err, "write new tombstones file")
613613
}
614614

@@ -851,15 +851,15 @@ type compactionSeriesSet struct {
851851
p index.Postings
852852
index IndexReader
853853
chunks ChunkReader
854-
tombstones record.TombstoneReader
854+
tombstones tombstones.TombstoneReader
855855

856856
l labels.Labels
857857
c []chunks.Meta
858-
intervals record.Intervals
858+
intervals tombstones.Intervals
859859
err error
860860
}
861861

862-
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t record.TombstoneReader, p index.Postings) *compactionSeriesSet {
862+
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.TombstoneReader, p index.Postings) *compactionSeriesSet {
863863
return &compactionSeriesSet{
864864
index: i,
865865
chunks: c,
@@ -889,7 +889,7 @@ func (c *compactionSeriesSet) Next() bool {
889889
if len(c.intervals) > 0 {
890890
chks := make([]chunks.Meta, 0, len(c.c))
891891
for _, chk := range c.c {
892-
if !(record.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(c.intervals)) {
892+
if !(tombstones.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(c.intervals)) {
893893
chks = append(chks, chk)
894894
}
895895
}
@@ -917,7 +917,7 @@ func (c *compactionSeriesSet) Err() error {
917917
return c.p.Err()
918918
}
919919

920-
func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, record.Intervals) {
920+
func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
921921
return c.l, c.c, c.intervals
922922
}
923923

@@ -927,7 +927,7 @@ type compactionMerger struct {
927927
aok, bok bool
928928
l labels.Labels
929929
c []chunks.Meta
930-
intervals record.Intervals
930+
intervals tombstones.Intervals
931931
}
932932

933933
func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
@@ -1004,6 +1004,6 @@ func (c *compactionMerger) Err() error {
10041004
return c.b.Err()
10051005
}
10061006

1007-
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, record.Intervals) {
1007+
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
10081008
return c.l, c.c, c.intervals
10091009
}

compact_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import (
3030
"github.com/prometheus/tsdb/chunks"
3131
"github.com/prometheus/tsdb/fileutil"
3232
"github.com/prometheus/tsdb/labels"
33-
"github.com/prometheus/tsdb/record"
3433
"github.com/prometheus/tsdb/testutil"
34+
"github.com/prometheus/tsdb/tombstones"
3535
)
3636

3737
func TestSplitByRange(t *testing.T) {
@@ -458,7 +458,7 @@ type erringBReader struct{}
458458

459459
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
460460
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
461-
func (erringBReader) Tombstones() (record.TombstoneReader, error) {
461+
func (erringBReader) Tombstones() (tombstones.TombstoneReader, error) {
462462
return nil, errors.New("tombstones")
463463
}
464464
func (erringBReader) MinTime() int64 { return 0 }

db_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/prometheus/tsdb/labels"
3636
"github.com/prometheus/tsdb/record"
3737
"github.com/prometheus/tsdb/testutil"
38+
"github.com/prometheus/tsdb/tombstones"
3839
"github.com/prometheus/tsdb/tsdbutil"
3940
"github.com/prometheus/tsdb/wal"
4041
)
@@ -244,27 +245,27 @@ func TestDeleteSimple(t *testing.T) {
244245
numSamples := int64(10)
245246

246247
cases := []struct {
247-
intervals record.Intervals
248+
intervals tombstones.Intervals
248249
remaint []int64
249250
}{
250251
{
251-
intervals: record.Intervals{{0, 3}},
252+
intervals: tombstones.Intervals{{0, 3}},
252253
remaint: []int64{4, 5, 6, 7, 8, 9},
253254
},
254255
{
255-
intervals: record.Intervals{{1, 3}},
256+
intervals: tombstones.Intervals{{1, 3}},
256257
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
257258
},
258259
{
259-
intervals: record.Intervals{{1, 3}, {4, 7}},
260+
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
260261
remaint: []int64{0, 8, 9},
261262
},
262263
{
263-
intervals: record.Intervals{{1, 3}, {4, 700}},
264+
intervals: tombstones.Intervals{{1, 3}, {4, 700}},
264265
remaint: []int64{0},
265266
},
266267
{ // This case is to ensure that labels and symbols are deleted.
267-
intervals: record.Intervals{{0, 9}},
268+
intervals: tombstones.Intervals{{0, 9}},
268269
remaint: []int64{},
269270
},
270271
}
@@ -506,11 +507,11 @@ func TestDB_SnapshotWithDelete(t *testing.T) {
506507

507508
testutil.Ok(t, app.Commit())
508509
cases := []struct {
509-
intervals record.Intervals
510+
intervals tombstones.Intervals
510511
remaint []int64
511512
}{
512513
{
513-
intervals: record.Intervals{{1, 3}, {4, 7}},
514+
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
514515
remaint: []int64{0, 8, 9},
515516
},
516517
}
@@ -833,11 +834,11 @@ func TestTombstoneClean(t *testing.T) {
833834

834835
testutil.Ok(t, app.Commit())
835836
cases := []struct {
836-
intervals record.Intervals
837+
intervals tombstones.Intervals
837838
remaint []int64
838839
}{
839840
{
840-
intervals: record.Intervals{{1, 3}, {4, 7}},
841+
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
841842
remaint: []int64{0, 8, 9},
842843
},
843844
}
@@ -909,7 +910,7 @@ func TestTombstoneClean(t *testing.T) {
909910
}
910911

911912
for _, b := range db.Blocks() {
912-
testutil.Equals(t, record.NewMemTombstones(), b.tombstones)
913+
testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones)
913914
}
914915
}
915916
}
@@ -935,8 +936,8 @@ func TestTombstoneCleanFail(t *testing.T) {
935936
block, err := OpenBlock(nil, blockDir, nil)
936937
testutil.Ok(t, err)
937938
// Add some some fake tombstones to trigger the compaction.
938-
tomb := record.NewMemTombstones()
939-
tomb.AddInterval(0, record.Interval{0, 1})
939+
tomb := tombstones.NewMemTombstones()
940+
tomb.AddInterval(0, tombstones.Interval{0, 1})
940941
block.tombstones = tomb
941942

942943
db.blocks = append(db.blocks, block)

head.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/prometheus/tsdb/index"
3434
"github.com/prometheus/tsdb/labels"
3535
"github.com/prometheus/tsdb/record"
36+
"github.com/prometheus/tsdb/tombstones"
3637
"github.com/prometheus/tsdb/wal"
3738
)
3839

@@ -43,7 +44,7 @@ var (
4344

4445
// emptyTombstoneReader is a no-op Tombstone Reader.
4546
// This is used by head to satisfy the Tombstones() function call.
46-
emptyTombstoneReader = record.NewMemTombstones()
47+
emptyTombstoneReader = tombstones.NewMemTombstones()
4748
)
4849

4950
// Head handles reads and writes of time series data within a time window.
@@ -346,11 +347,20 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
346347
}
347348

348349
var (
350+
<<<<<<< HEAD
349351
dec RecordDecoder
350352
series []RefSeries
351353
samples []RefSample
352354
tstones []Stone
353355
allStones = newMemTombstones()
356+
=======
357+
dec record.RecordDecoder
358+
series []record.RefSeries
359+
samples []record.RefSample
360+
tstones []tombstones.Stone
361+
allStones = tombstones.NewMemTombstones()
362+
err error
363+
>>>>>>> Move tombstones to it's own package.
354364
)
355365
defer func() {
356366
if err := allStones.Close(); err != nil {
@@ -377,7 +387,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
377387
if !created {
378388
// There's already a different ref for this series.
379389
multiRefLock.Lock()
380-
multiRef[s.Ref] = series.ref
390+
multiRef[s.Ref] = series.Ref
381391
multiRefLock.Unlock()
382392
}
383393

@@ -464,11 +474,15 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
464474
}
465475
wg.Wait()
466476

477+
<<<<<<< HEAD
467478
if r.Err() != nil {
468479
return errors.Wrap(r.Err(), "read records")
469480
}
470481

471482
if err := allStones.Iter(func(ref uint64, dranges Intervals) error {
483+
=======
484+
if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error {
485+
>>>>>>> Move tombstones to it's own package.
472486
return h.chunkRewrite(ref, dranges)
473487
}); err != nil {
474488
return errors.Wrap(r.Err(), "deleting samples from tombstones")
@@ -676,7 +690,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) {
676690
return h.head.chunksRange(h.mint, h.maxt), nil
677691
}
678692

679-
func (h *rangeHead) Tombstones() (record.TombstoneReader, error) {
693+
func (h *rangeHead) Tombstones() (tombstones.TombstoneReader, error) {
680694
return emptyTombstoneReader, nil
681695
}
682696

@@ -932,7 +946,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
932946
return errors.Wrap(err, "select series")
933947
}
934948

935-
var stones []record.Stone
949+
var stones []tombstones.Stone
936950
dirty := false
937951
for p.Next() {
938952
series := h.series.getByID(p.At())
@@ -944,9 +958,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
944958
// Delete only until the current values and not beyond.
945959
t0, t1 = clampInterval(mint, maxt, t0, t1)
946960
if h.wal != nil {
947-
stones = append(stones, record.Stone{p.At(), record.Intervals{{t0, t1}}})
961+
stones = append(stones, tombstones.Stone{p.At(), tombstones.Intervals{{t0, t1}}})
948962
}
949-
if err := h.chunkRewrite(p.At(), record.Intervals{{t0, t1}}); err != nil {
963+
if err := h.chunkRewrite(p.At(), tombstones.Intervals{{t0, t1}}); err != nil {
950964
return errors.Wrap(err, "delete samples")
951965
}
952966
dirty = true
@@ -973,7 +987,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
973987
// chunkRewrite re-writes the chunks which overlaps with deleted ranges
974988
// and removes the samples in the deleted ranges.
975989
// Chunks is deleted if no samples are left at the end.
976-
func (h *Head) chunkRewrite(ref uint64, dranges record.Intervals) (err error) {
990+
func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) {
977991
if len(dranges) == 0 {
978992
return nil
979993
}
@@ -1063,7 +1077,7 @@ func (h *Head) gc() {
10631077
}
10641078

10651079
// Tombstones returns a new reader over the head's tombstones
1066-
func (h *Head) Tombstones() (record.TombstoneReader, error) {
1080+
func (h *Head) Tombstones() (tombstones.TombstoneReader, error) {
10671081
return emptyTombstoneReader, nil
10681082
}
10691083

0 commit comments

Comments
 (0)