diff --git a/block.go b/block.go index d0fe2b2f..b19f7cb6 100644 --- a/block.go +++ b/block.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/tombstones" ) // IndexWriter serializes the index for a block of series data. @@ -136,7 +137,7 @@ type BlockReader interface { Chunks() (ChunkReader, error) // Tombstones returns a TombstoneReader over the block's deleted data. - Tombstones() (TombstoneReader, error) + Tombstones() (tombstones.TombstoneReader, error) // Meta provides meta information about the block reader. Meta() BlockMeta @@ -278,7 +279,7 @@ type Block struct { chunkr ChunkReader indexr IndexReader - tombstones TombstoneReader + tombstones tombstones.TombstoneReader logger log.Logger @@ -320,7 +321,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er } closers = append(closers, ir) - tr, sizeTomb, err := readTombstones(dir) + tr, sizeTomb, err := tombstones.ReadTombstones(dir) if err != nil { return nil, err } @@ -411,7 +412,7 @@ func (pb *Block) Chunks() (ChunkReader, error) { } // Tombstones returns a new TombstoneReader against the block data. -func (pb *Block) Tombstones() (TombstoneReader, error) { +func (pb *Block) Tombstones() (tombstones.TombstoneReader, error) { if err := pb.startRead(); err != nil { return nil, err } @@ -482,7 +483,7 @@ func (r blockIndexReader) Close() error { } type blockTombstoneReader struct { - TombstoneReader + tombstones.TombstoneReader b *Block } @@ -518,7 +519,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := newMemTombstones() + stones := tombstones.NewMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -534,7 +535,7 @@ Outer: if chk.OverlapsClosedInterval(mint, maxt) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones.addInterval(p.At(), Interval{tmin, tmax}) + stones.AddInterval(p.At(), tombstones.Interval{tmin, tmax}) continue Outer } } @@ -544,9 +545,9 @@ Outer: return p.Err() } - err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + err = pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error { for _, iv := range ivs { - stones.addInterval(id, iv) + stones.AddInterval(id, iv) } return nil }) @@ -556,7 +557,7 @@ Outer: pb.tombstones = stones pb.meta.Stats.NumTombstones = pb.tombstones.Total() - n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones) + n, err := tombstones.WriteTombstoneFile(pb.logger, pb.dir, pb.tombstones) if err != nil { return err } @@ -574,7 +575,7 @@ Outer: func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { numStones := 0 - if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error { numStones += len(ivs) return nil }); err != nil { @@ -609,7 +610,7 @@ func (pb *Block) Snapshot(dir string) error { for _, fname := range []string{ metaFilename, indexFilename, - tombstoneFilename, + tombstones.TombstoneFilename, } { if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil { return errors.Wrapf(err, "create snapshot %s", fname) diff --git a/compact.go b/compact.go index 9443c99e..8fa8edfd 100644 --- a/compact.go +++ b/compact.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/tombstones" ) // ExponentialBlockRanges returns the time ranges based on the stepSize. @@ -607,7 +608,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil { + if _, err := tombstones.WriteTombstoneFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -768,7 +769,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, // // TODO think how to avoid the typecasting to verify when it is head block. if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime { - dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) + dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64}) } else // Sanity check for disk blocks. @@ -876,15 +877,15 @@ type compactionSeriesSet struct { p index.Postings index IndexReader chunks ChunkReader - tombstones TombstoneReader + tombstones tombstones.TombstoneReader l labels.Labels c []chunks.Meta - intervals Intervals + intervals tombstones.Intervals err error } -func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet { +func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.TombstoneReader, p index.Postings) *compactionSeriesSet { return &compactionSeriesSet{ index: i, chunks: c, @@ -914,7 +915,7 @@ func (c *compactionSeriesSet) Next() bool { if len(c.intervals) > 0 { chks := make([]chunks.Meta, 0, len(c.c)) for _, chk := range c.c { - if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { + if !(tombstones.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(c.intervals)) { chks = append(chks, chk) } } @@ -942,7 +943,7 @@ func (c *compactionSeriesSet) Err() error { return c.p.Err() } -func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) { +func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return c.l, c.c, c.intervals } @@ -952,7 +953,7 @@ type compactionMerger struct { aok, bok bool l labels.Labels c []chunks.Meta - intervals Intervals + intervals tombstones.Intervals } func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { @@ -1008,7 +1009,7 @@ func (c *compactionMerger) Next() bool { _, cb, rb := c.b.At() for _, r := range rb { - ra = ra.add(r) + ra = ra.Add(r) } c.l = append(c.l[:0], l...) @@ -1029,6 +1030,6 @@ func (c *compactionMerger) Err() error { return c.b.Err() } -func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) { +func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return c.l, c.c, c.intervals } diff --git a/compact_test.go b/compact_test.go index 18990ed5..2e85115b 100644 --- a/compact_test.go +++ b/compact_test.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tombstones" ) func TestSplitByRange(t *testing.T) { @@ -455,10 +456,12 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { type erringBReader struct{} -func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } -func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } -func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } -func (erringBReader) Meta() BlockMeta { return BlockMeta{} } +func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (tombstones.TombstoneReader, error) { + return nil, errors.New("tombstones") +} +func (erringBReader) Meta() BlockMeta { return BlockMeta{} } type nopChunkWriter struct{} diff --git a/db_test.go b/db_test.go index 25fb8a7e..dd977bc0 100644 --- a/db_test.go +++ b/db_test.go @@ -33,7 +33,9 @@ import ( "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/record" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tombstones" "github.com/prometheus/tsdb/tsdbutil" "github.com/prometheus/tsdb/wal" ) @@ -243,27 +245,27 @@ func TestDeleteSimple(t *testing.T) { numSamples := int64(10) cases := []struct { - intervals Intervals + intervals tombstones.Intervals remaint []int64 }{ { - intervals: Intervals{{0, 3}}, + intervals: tombstones.Intervals{{0, 3}}, remaint: []int64{4, 5, 6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}}, + intervals: tombstones.Intervals{{1, 3}}, remaint: []int64{0, 4, 5, 6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 7}}, + intervals: tombstones.Intervals{{1, 3}, {4, 7}}, remaint: []int64{0, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 700}}, + intervals: tombstones.Intervals{{1, 3}, {4, 700}}, remaint: []int64{0}, }, { // This case is to ensure that labels and symbols are deleted. - intervals: Intervals{{0, 9}}, + intervals: tombstones.Intervals{{0, 9}}, remaint: []int64{}, }, } @@ -561,11 +563,11 @@ func TestDB_SnapshotWithDelete(t *testing.T) { testutil.Ok(t, app.Commit()) cases := []struct { - intervals Intervals + intervals tombstones.Intervals remaint []int64 }{ { - intervals: Intervals{{1, 3}, {4, 7}}, + intervals: tombstones.Intervals{{1, 3}, {4, 7}}, remaint: []int64{0, 8, 9}, }, } @@ -888,11 +890,11 @@ func TestTombstoneClean(t *testing.T) { testutil.Ok(t, app.Commit()) cases := []struct { - intervals Intervals + intervals tombstones.Intervals remaint []int64 }{ { - intervals: Intervals{{1, 3}, {4, 7}}, + intervals: tombstones.Intervals{{1, 3}, {4, 7}}, remaint: []int64{0, 8, 9}, }, } @@ -964,7 +966,7 @@ func TestTombstoneClean(t *testing.T) { } for _, b := range db.Blocks() { - testutil.Equals(t, newMemTombstones(), b.tombstones) + testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones) } } } @@ -990,8 +992,8 @@ func TestTombstoneCleanFail(t *testing.T) { block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. - tomb := newMemTombstones() - tomb.addInterval(0, Interval{0, 1}) + tomb := tombstones.NewMemTombstones() + tomb.AddInterval(0, tombstones.Interval{0, 1}) block.tombstones = tomb db.blocks = append(db.blocks, block) @@ -1470,13 +1472,13 @@ func TestInitializeHeadTimestamp(t *testing.T) { w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) - var enc RecordEncoder + var enc record.Encoder err = w.Log( - enc.Series([]RefSeries{ + enc.Series([]record.RefSeries{ {Ref: 123, Labels: labels.FromStrings("a", "1")}, {Ref: 124, Labels: labels.FromStrings("a", "2")}, }, nil), - enc.Samples([]RefSample{ + enc.Samples([]record.RefSample{ {Ref: 123, T: 5000, V: 1}, {Ref: 124, T: 15000, V: 1}, }, nil), @@ -1520,13 +1522,13 @@ func TestInitializeHeadTimestamp(t *testing.T) { w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) testutil.Ok(t, err) - var enc RecordEncoder + var enc record.Encoder err = w.Log( - enc.Series([]RefSeries{ + enc.Series([]record.RefSeries{ {Ref: 123, Labels: labels.FromStrings("a", "1")}, {Ref: 124, Labels: labels.FromStrings("a", "2")}, }, nil), - enc.Samples([]RefSample{ + enc.Samples([]record.RefSample{ {Ref: 123, T: 5000, V: 1}, {Ref: 124, T: 15000, V: 1}, }, nil), diff --git a/go.sum b/go.sum index 365fa5ec..e854d810 100644 --- a/go.sum +++ b/go.sum @@ -27,11 +27,11 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= diff --git a/head.go b/head.go index 0adb8847..00296ed2 100644 --- a/head.go +++ b/head.go @@ -33,6 +33,8 @@ import ( "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/record" + "github.com/prometheus/tsdb/tombstones" "github.com/prometheus/tsdb/wal" ) @@ -44,17 +46,17 @@ var ( // timestamp smaller than the most recent sample. ErrOutOfOrderSample = errors.New("out of order sample") - // ErrAmendSample is returned if an appended sample has the same timestamp - // as the most recent sample but a different value. - ErrAmendSample = errors.New("amending sample") - // ErrOutOfBounds is returned if an appended sample is out of the // writable time range. ErrOutOfBounds = errors.New("out of bounds") + // ErrAmendSample is returned if an appended sample has the same timestamp + // as the most recent sample but a different value. + ErrAmendSample = errors.New("amending sample") + // emptyTombstoneReader is a no-op Tombstone Reader. // This is used by head to satisfy the Tombstones() function call. - emptyTombstoneReader = newMemTombstones() + emptyTombstoneReader = tombstones.NewMemTombstones() ) // Head handles reads and writes of time series data within a time window. @@ -64,6 +66,7 @@ type Head struct { wal *wal.WAL logger log.Logger appendPool sync.Pool + seriesPool sync.Pool bytesPool sync.Pool numSeries uint64 @@ -256,7 +259,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int // Samples before the mint timestamp are discarded. func (h *Head) processWALSamples( minValidTime int64, - input <-chan []RefSample, output chan<- []RefSample, + input <-chan []record.RefSample, output chan<- []record.RefSample, ) (unknownRefs uint64) { defer close(output) @@ -331,8 +334,8 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { wg sync.WaitGroup multiRefLock sync.Mutex n = runtime.GOMAXPROCS(0) - inputs = make([]chan []RefSample, n) - outputs = make([]chan []RefSample, n) + inputs = make([]chan []record.RefSample, n) + outputs = make([]chan []record.RefSample, n) ) wg.Add(n) @@ -349,10 +352,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { }() for i := 0; i < n; i++ { - outputs[i] = make(chan []RefSample, 300) - inputs[i] = make(chan []RefSample, 300) + outputs[i] = make(chan []record.RefSample, 300) + inputs[i] = make(chan []record.RefSample, 300) - go func(input <-chan []RefSample, output chan<- []RefSample) { + go func(input <-chan []record.RefSample, output chan<- []record.RefSample) { unknown := h.processWALSamples(h.minValidTime, input, output) atomic.AddUint64(&unknownRefs, unknown) wg.Done() @@ -360,11 +363,11 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } var ( - dec RecordDecoder - series []RefSeries - samples []RefSample - tstones []Stone - allStones = newMemTombstones() + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + tstones []tombstones.Stone + allStones = tombstones.NewMemTombstones() ) defer func() { if err := allStones.Close(); err != nil { @@ -376,7 +379,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err = dec.Series(rec, series) if err != nil { return &wal.CorruptionErr{ @@ -399,7 +402,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { h.lastSeriesID = s.Ref } } - case RecordSamples: + case record.Samples: samples, err = dec.Samples(rec, samples) s := samples if err != nil { @@ -418,9 +421,9 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { if len(samples) < m { m = len(samples) } - shards := make([][]RefSample, n) + shards := make([][]record.RefSample, n) for i := 0; i < n; i++ { - var buf []RefSample + var buf []record.RefSample select { case buf = <-outputs[i]: default: @@ -440,7 +443,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { samples = samples[m:] } samples = s // Keep whole slice for reuse. - case RecordTombstones: + case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { return &wal.CorruptionErr{ @@ -450,15 +453,15 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { } } for _, s := range tstones { - for _, itv := range s.intervals { + for _, itv := range s.Intervals { if itv.Maxt < h.minValidTime { continue } - if m := h.series.getByID(s.ref); m == nil { + if m := h.series.getByID(s.Ref); m == nil { unknownRefs++ continue } - allStones.addInterval(s.ref, itv) + allStones.AddInterval(s.Ref, itv) } } default: @@ -482,7 +485,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) { return errors.Wrap(r.Err(), "read records") } - if err := allStones.Iter(func(ref uint64, dranges Intervals) error { + if err := allStones.Iter(func(ref uint64, dranges tombstones.Intervals) error { return h.chunkRewrite(ref, dranges) }); err != nil { return errors.Wrap(r.Err(), "deleting samples from tombstones") @@ -508,8 +511,10 @@ func (h *Head) Init(minValidTime int64) error { level.Info(h.logger).Log("msg", "replaying WAL, this may take awhile") // Backfill the checkpoint first if it exists. - dir, startFrom, err := LastCheckpoint(h.wal.Dir()) - if err != nil && err != ErrNotFound { + dir, startFrom, err := wal.LastCheckpoint(h.wal.Dir()) + // We need to compare err to record.ErrNotFound as that's what + // wal.LastCheckpoint would return, not tsdb.ErrNotFound. + if err != nil && err != record.ErrNotFound { return errors.Wrap(err, "find last checkpoint") } multiRef := map[uint64]uint64{} @@ -629,7 +634,7 @@ func (h *Head) Truncate(mint int64) (err error) { return ok } h.metrics.checkpointCreationTotal.Inc() - if _, err = Checkpoint(h.wal, first, last, keep, mint); err != nil { + if _, err = wal.Checkpoint(h.wal, first, last, keep, mint); err != nil { h.metrics.checkpointCreationFail.Inc() return errors.Wrap(err, "create checkpoint") } @@ -651,7 +656,7 @@ func (h *Head) Truncate(mint int64) (err error) { h.deletedMtx.Unlock() h.metrics.checkpointDeleteTotal.Inc() - if err := DeleteCheckpoints(h.wal.Dir(), last); err != nil { + if err := wal.DeleteCheckpoints(h.wal.Dir(), last); err != nil { // Leftover old checkpoints do not cause problems down the line beyond // occupying disk space. // They will just be ignored since a higher checkpoint exists. @@ -693,7 +698,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) { return h.head.chunksRange(h.mint, h.maxt), nil } -func (h *rangeHead) Tombstones() (TombstoneReader, error) { +func (h *rangeHead) Tombstones() (tombstones.TombstoneReader, error) { return emptyTombstoneReader, nil } @@ -779,6 +784,7 @@ func (h *Head) appender() *headAppender { mint: math.MaxInt64, maxt: math.MinInt64, samples: h.getAppendBuffer(), + sampleSeries: h.getSeriesBuffer(), } } @@ -789,19 +795,32 @@ func max(a, b int64) int64 { return b } -func (h *Head) getAppendBuffer() []RefSample { +func (h *Head) getAppendBuffer() []record.RefSample { b := h.appendPool.Get() if b == nil { - return make([]RefSample, 0, 512) + return make([]record.RefSample, 0, 512) } - return b.([]RefSample) + return b.([]record.RefSample) } -func (h *Head) putAppendBuffer(b []RefSample) { +func (h *Head) putAppendBuffer(b []record.RefSample) { //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. h.appendPool.Put(b[:0]) } +func (h *Head) getSeriesBuffer() []*memSeries { + b := h.seriesPool.Get() + if b == nil { + return make([]*memSeries, 0, 512) + } + return b.([]*memSeries) +} + +func (h *Head) putSeriesBuffer(b []*memSeries) { + //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. + h.seriesPool.Put(b[:0]) +} + func (h *Head) getBytesBuffer() []byte { b := h.bytesPool.Get() if b == nil { @@ -820,8 +839,9 @@ type headAppender struct { minValidTime int64 // No samples below this timestamp are allowed. mint, maxt int64 - series []RefSeries - samples []RefSample + series []record.RefSeries + samples []record.RefSample + sampleSeries []*memSeries } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -834,7 +854,7 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro s, created := a.head.getOrCreate(lset.Hash(), lset) if created { - a.series = append(a.series, RefSeries{ + a.series = append(a.series, record.RefSeries{ Ref: s.ref, Labels: lset, }) @@ -866,12 +886,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { a.maxt = t } - a.samples = append(a.samples, RefSample{ - Ref: ref, - T: t, - V: v, - series: s, + a.samples = append(a.samples, record.RefSample{ + Ref: ref, + T: t, + V: v, }) + a.sampleSeries = append(a.sampleSeries, s) return nil } @@ -884,7 +904,7 @@ func (a *headAppender) log() error { defer func() { a.head.putBytesBuffer(buf) }() var rec []byte - var enc RecordEncoder + var enc record.Encoder if len(a.series) > 0 { rec = enc.Series(a.series, buf) @@ -908,18 +928,20 @@ func (a *headAppender) log() error { func (a *headAppender) Commit() error { defer a.head.metrics.activeAppenders.Dec() defer a.head.putAppendBuffer(a.samples) + defer a.head.putSeriesBuffer(a.sampleSeries) if err := a.log(); err != nil { return errors.Wrap(err, "write to WAL") } + var series *memSeries total := len(a.samples) - - for _, s := range a.samples { - s.series.Lock() - ok, chunkCreated := s.series.append(s.T, s.V) - s.series.pendingCommit = false - s.series.Unlock() + for i, s := range a.samples { + series = a.sampleSeries[i] + series.Lock() + ok, chunkCreated := series.append(s.T, s.V) + series.pendingCommit = false + series.Unlock() if !ok { total-- @@ -938,12 +960,15 @@ func (a *headAppender) Commit() error { func (a *headAppender) Rollback() error { a.head.metrics.activeAppenders.Dec() - for _, s := range a.samples { - s.series.Lock() - s.series.pendingCommit = false - s.series.Unlock() + var series *memSeries + for i := range a.samples { + series = a.sampleSeries[i] + series.Lock() + series.pendingCommit = false + series.Unlock() } a.head.putAppendBuffer(a.samples) + a.head.putSeriesBuffer(a.sampleSeries) // Series are created in the head memory regardless of rollback. Thus we have // to log them to the WAL in any case. @@ -964,7 +989,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { return errors.Wrap(err, "select series") } - var stones []Stone + var stones []tombstones.Stone dirty := false for p.Next() { series := h.series.getByID(p.At()) @@ -976,9 +1001,9 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { // Delete only until the current values and not beyond. t0, t1 = clampInterval(mint, maxt, t0, t1) if h.wal != nil { - stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + stones = append(stones, tombstones.Stone{p.At(), tombstones.Intervals{{t0, t1}}}) } - if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil { + if err := h.chunkRewrite(p.At(), tombstones.Intervals{{t0, t1}}); err != nil { return errors.Wrap(err, "delete samples") } dirty = true @@ -986,7 +1011,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { if p.Err() != nil { return p.Err() } - var enc RecordEncoder + var enc record.Encoder if h.wal != nil { // Although we don't store the stones in the head // we need to write them to the WAL to mark these as deleted @@ -1005,7 +1030,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { // chunkRewrite re-writes the chunks which overlaps with deleted ranges // and removes the samples in the deleted ranges. // Chunks is deleted if no samples are left at the end. -func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) { +func (h *Head) chunkRewrite(ref uint64, dranges tombstones.Intervals) (err error) { if len(dranges) == 0 { return nil } @@ -1097,7 +1122,7 @@ func (h *Head) gc() { } // Tombstones returns a new reader over the head's tombstones -func (h *Head) Tombstones() (TombstoneReader, error) { +func (h *Head) Tombstones() (tombstones.TombstoneReader, error) { return emptyTombstoneReader, nil } @@ -1598,8 +1623,8 @@ type memSeries struct { sync.Mutex ref uint64 - lset labels.Labels chunks []*memChunk + lset labels.Labels headChunk *memChunk chunkRange int64 firstChunkID int diff --git a/head_test.go b/head_test.go index 040ae828..457076ab 100644 --- a/head_test.go +++ b/head_test.go @@ -30,7 +30,9 @@ import ( "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/record" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tombstones" "github.com/prometheus/tsdb/tsdbutil" "github.com/prometheus/tsdb/wal" ) @@ -51,14 +53,14 @@ func BenchmarkCreateSeries(b *testing.B) { } func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { - var enc RecordEncoder + var enc record.Encoder for _, r := range recs { switch v := r.(type) { - case []RefSeries: + case []record.RefSeries: testutil.Ok(t, w.Log(enc.Series(v, nil))) - case []RefSample: + case []record.RefSample: testutil.Ok(t, w.Log(enc.Samples(v, nil))) - case []Stone: + case []tombstones.Stone: testutil.Ok(t, w.Log(enc.Tombstones(v, nil))) } } @@ -69,22 +71,22 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { testutil.Ok(t, err) defer sr.Close() - var dec RecordDecoder + var dec record.Decoder r := wal.NewReader(sr) for r.Next() { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err := dec.Series(rec, nil) testutil.Ok(t, err) recs = append(recs, series) - case RecordSamples: + case record.Samples: samples, err := dec.Samples(rec, nil) testutil.Ok(t, err) recs = append(recs, samples) - case RecordTombstones: + case record.Tombstones: tstones, err := dec.Tombstones(rec, nil) testutil.Ok(t, err) recs = append(recs, tstones) @@ -100,28 +102,28 @@ func TestHead_ReadWAL(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { entries := []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, {Ref: 11, Labels: labels.FromStrings("a", "2")}, {Ref: 100, Labels: labels.FromStrings("a", "3")}, }, - []RefSample{ + []record.RefSample{ {Ref: 0, T: 99, V: 1}, {Ref: 10, T: 100, V: 2}, {Ref: 100, T: 100, V: 3}, }, - []RefSeries{ + []record.RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "4")}, // This series has two refs pointing to it. {Ref: 101, Labels: labels.FromStrings("a", "3")}, }, - []RefSample{ + []record.RefSample{ {Ref: 10, T: 101, V: 5}, {Ref: 50, T: 101, V: 6}, {Ref: 101, T: 101, V: 7}, }, - []Stone{ - {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, + []tombstones.Stone{ + {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, }, } dir, err := ioutil.TempDir("", "test_read_wal") @@ -326,14 +328,14 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { for _, compress := range []bool{false, true} { t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { entries := []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, }, - []RefSample{}, - []RefSeries{ + []record.RefSample{}, + []record.RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "2")}, }, - []RefSample{ + []record.RefSample{ {Ref: 50, T: 80, V: 1}, {Ref: 50, T: 90, V: 1}, }, @@ -371,27 +373,27 @@ func TestHeadDeleteSimple(t *testing.T) { lblDefault := labels.Label{"a", "b"} cases := []struct { - dranges Intervals + dranges tombstones.Intervals smplsExp []sample }{ { - dranges: Intervals{{0, 3}}, + dranges: tombstones.Intervals{{0, 3}}, smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}}, + dranges: tombstones.Intervals{{1, 3}}, smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 7}}, + dranges: tombstones.Intervals{{1, 3}, {4, 7}}, smplsExp: buildSmpls([]int64{0, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 700}}, + dranges: tombstones.Intervals{{1, 3}, {4, 700}}, smplsExp: buildSmpls([]int64{0}), }, { // This case is to ensure that labels and symbols are deleted. - dranges: Intervals{{0, 9}}, + dranges: tombstones.Intervals{{0, 9}}, smplsExp: buildSmpls([]int64{}), }, } @@ -591,7 +593,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { testutil.Ok(t, hb.Close()) // Confirm there's been a checkpoint. - cdir, _, err := LastCheckpoint(dir) + cdir, _, err := wal.LastCheckpoint(dir) testutil.Ok(t, err) // Read in checkpoint and WAL. recs := readTestWAL(t, cdir) @@ -600,11 +602,11 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { var series, samples, stones int for _, rec := range recs { switch rec.(type) { - case []RefSeries: + case []record.RefSeries: series++ - case []RefSample: + case []record.RefSample: samples++ - case []Stone: + case []tombstones.Stone: stones++ default: t.Fatalf("unknown record type") @@ -692,18 +694,18 @@ func TestDelete_e2e(t *testing.T) { // Delete a time-range from each-selector. dels := []struct { ms []labels.Matcher - drange Intervals + drange tombstones.Intervals }{ { ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, - drange: Intervals{{300, 500}, {600, 670}}, + drange: tombstones.Intervals{{300, 500}, {600, 670}}, }, { ms: []labels.Matcher{ labels.NewEqualMatcher("a", "b"), labels.NewEqualMatcher("job", "prom-k8s"), }, - drange: Intervals{{300, 500}, {100, 670}}, + drange: tombstones.Intervals{{300, 500}, {100, 670}}, }, { ms: []labels.Matcher{ @@ -711,7 +713,7 @@ func TestDelete_e2e(t *testing.T) { labels.NewEqualMatcher("instance", "localhost:9090"), labels.NewEqualMatcher("job", "prometheus"), }, - drange: Intervals{{300, 400}, {100, 6700}}, + drange: tombstones.Intervals{{300, 400}, {100, 6700}}, }, // TODO: Add Regexp Matchers. } @@ -794,12 +796,12 @@ func boundedSamples(full []tsdbutil.Sample, mint, maxt int64) []tsdbutil.Sample return full } -func deletedSamples(full []tsdbutil.Sample, dranges Intervals) []tsdbutil.Sample { +func deletedSamples(full []tsdbutil.Sample, dranges tombstones.Intervals) []tsdbutil.Sample { ds := make([]tsdbutil.Sample, 0, len(full)) Outer: for _, s := range full { for _, r := range dranges { - if r.inBounds(s.T()) { + if r.InBounds(s.T()) { continue Outer } } @@ -1055,9 +1057,9 @@ func TestHead_LogRollback(t *testing.T) { testutil.Equals(t, 1, len(recs)) - series, ok := recs[0].([]RefSeries) + series, ok := recs[0].([]record.RefSeries) testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) - testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) + testutil.Equals(t, []record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) }) } } @@ -1065,7 +1067,7 @@ func TestHead_LogRollback(t *testing.T) { // TestWalRepair_DecodingError ensures that a repair is run for an error // when decoding a record. func TestWalRepair_DecodingError(t *testing.T) { - var enc RecordEncoder + var enc record.Encoder for name, test := range map[string]struct { corrFunc func(rec []byte) []byte // Func that applies the corruption to a record. rec []byte @@ -1077,10 +1079,10 @@ func TestWalRepair_DecodingError(t *testing.T) { // Do not modify the base record because it is Logged multiple times. res := make([]byte, len(rec)) copy(res, rec) - res[0] = byte(RecordInvalid) + res[0] = byte(record.Invalid) return res }, - enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), + enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, 5, }, @@ -1088,7 +1090,7 @@ func TestWalRepair_DecodingError(t *testing.T) { func(rec []byte) []byte { return rec[:3] }, - enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), + enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, 5, }, @@ -1096,7 +1098,7 @@ func TestWalRepair_DecodingError(t *testing.T) { func(rec []byte) []byte { return rec[:3] }, - enc.Samples([]RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}), + enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}), 9, 5, }, @@ -1104,7 +1106,7 @@ func TestWalRepair_DecodingError(t *testing.T) { func(rec []byte) []byte { return rec[:3] }, - enc.Tombstones([]Stone{{ref: 1, intervals: Intervals{}}}, []byte{}), + enc.Tombstones([]tombstones.Stone{{Ref: 1, Intervals: tombstones.Intervals{}}}, []byte{}), 9, 5, }, @@ -1135,10 +1137,9 @@ func TestWalRepair_DecodingError(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) - err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. _, corrErr := err.(*wal.CorruptionErr) - testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") + testutil.Assert(t, corrErr, fmt.Sprintf("reading the wal didn't return corruption error: %s", err)) testutil.Ok(t, w.Close()) } diff --git a/mocks_test.go b/mocks_test.go index 35f5ffec..c5bc0f6b 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/tombstones" ) type mockIndexWriter struct { @@ -72,7 +73,9 @@ type mockBReader struct { maxt int64 } -func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } -func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } -func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil } -func (r *mockBReader) Meta() BlockMeta { return BlockMeta{MinTime: r.mint, MaxTime: r.maxt} } +func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } +func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } +func (r *mockBReader) Tombstones() (tombstones.TombstoneReader, error) { + return tombstones.NewMemTombstones(), nil +} +func (r *mockBReader) Meta() BlockMeta { return BlockMeta{MinTime: r.mint, MaxTime: r.maxt} } diff --git a/querier.go b/querier.go index fbd9493f..a2ef40d5 100644 --- a/querier.go +++ b/querier.go @@ -25,6 +25,7 @@ import ( tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/tombstones" ) // Querier provides querying access over time series data of a fixed @@ -204,7 +205,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { type blockQuerier struct { index IndexReader chunks ChunkReader - tombstones TombstoneReader + tombstones tombstones.TombstoneReader closed bool @@ -670,7 +671,7 @@ func (s *mergedVerticalSeriesSet) Next() bool { // actual series itself. type ChunkSeriesSet interface { Next() bool - At() (labels.Labels, []chunks.Meta, Intervals) + At() (labels.Labels, []chunks.Meta, tombstones.Intervals) Err() error } @@ -679,19 +680,19 @@ type ChunkSeriesSet interface { type baseChunkSeries struct { p index.Postings index IndexReader - tombstones TombstoneReader + tombstones tombstones.TombstoneReader lset labels.Labels chks []chunks.Meta - intervals Intervals + intervals tombstones.Intervals err error } // LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet // over them. It drops chunks based on tombstones in the given reader. -func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { +func LookupChunkSeries(ir IndexReader, tr tombstones.TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = newMemTombstones() + tr = tombstones.NewMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { @@ -704,7 +705,7 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) }, nil } -func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) { +func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return s.lset, s.chks, s.intervals } @@ -740,7 +741,7 @@ func (s *baseChunkSeries) Next() bool { // Only those chunks that are not entirely deleted. chks := make([]chunks.Meta, 0, len(s.chks)) for _, chk := range s.chks { - if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) { + if !(tombstones.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(s.intervals)) { chks = append(chks, chk) } } @@ -767,10 +768,10 @@ type populatedChunkSeries struct { err error chks []chunks.Meta lset labels.Labels - intervals Intervals + intervals tombstones.Intervals } -func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) { +func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return s.lset, s.chks, s.intervals } @@ -865,7 +866,7 @@ type chunkSeries struct { mint, maxt int64 - intervals Intervals + intervals tombstones.Intervals } func (s *chunkSeries) Labels() labels.Labels { @@ -1066,10 +1067,10 @@ type chunkSeriesIterator struct { maxt, mint int64 - intervals Intervals + intervals tombstones.Intervals } -func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []chunks.Meta, dranges tombstones.Intervals, mint, maxt int64) *chunkSeriesIterator { csi := &chunkSeriesIterator{ chunks: cs, i: 0, @@ -1168,7 +1169,7 @@ func (it *chunkSeriesIterator) Err() error { type deletedIterator struct { it chunkenc.Iterator - intervals Intervals + intervals tombstones.Intervals } func (it *deletedIterator) At() (int64, float64) { @@ -1181,7 +1182,7 @@ Outer: ts, _ := it.it.At() for _, tr := range it.intervals { - if tr.inBounds(ts) { + if tr.InBounds(ts) { continue Outer } diff --git a/querier_test.go b/querier_test.go index 2be48fcd..cbbfcdab 100644 --- a/querier_test.go +++ b/querier_test.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tombstones" "github.com/prometheus/tsdb/tsdbutil" ) @@ -368,7 +369,7 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: newMemTombstones(), + tombstones: tombstones.NewMemTombstones(), mint: c.mint, maxt: c.maxt, @@ -412,10 +413,15 @@ func TestBlockQuerierDelete(t *testing.T) { exp SeriesSet } + tstones := tombstones.NewMemTombstones() + tstones.AddInterval(1, tombstones.Interval{1, 3}) + tstones.AddInterval(2, tombstones.Interval{1, 3}, tombstones.Interval{6, 10}) + tstones.AddInterval(3, tombstones.Interval{6, 10}) + cases := struct { data []seriesSamples - tombstones TombstoneReader + tombstones tombstones.TombstoneReader queries []query }{ data: []seriesSamples{ @@ -460,11 +466,7 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: &memTombstones{intvlGroups: map[uint64]Intervals{ - 1: Intervals{{1, 3}}, - 2: Intervals{{1, 3}, {6, 10}}, - 3: Intervals{{6, 10}}, - }}, + tombstones: tstones, queries: []query{ { mint: 2, @@ -637,7 +639,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: index.NewListPostings(tc.postings), index: mi, - tombstones: newMemTombstones(), + tombstones: tombstones.NewMemTombstones(), } i := 0 @@ -1159,7 +1161,7 @@ func (m *mockChunkSeriesSet) Next() bool { return m.i < len(m.l) } -func (m *mockChunkSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) { +func (m *mockChunkSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) { return m.l[m.i], m.cm[m.i], nil } @@ -1254,18 +1256,18 @@ func TestDeletedIterator(t *testing.T) { } cases := []struct { - r Intervals + r tombstones.Intervals }{ - {r: Intervals{{1, 20}}}, - {r: Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}}, - {r: Intervals{{1, 10}, {12, 20}, {20, 30}}}, - {r: Intervals{{1, 10}, {12, 23}, {25, 30}}}, - {r: Intervals{{1, 23}, {12, 20}, {25, 30}}}, - {r: Intervals{{1, 23}, {12, 20}, {25, 3000}}}, - {r: Intervals{{0, 2000}}}, - {r: Intervals{{500, 2000}}}, - {r: Intervals{{0, 200}}}, - {r: Intervals{{1000, 20000}}}, + {r: tombstones.Intervals{{1, 20}}}, + {r: tombstones.Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}}, + {r: tombstones.Intervals{{1, 10}, {12, 20}, {20, 30}}}, + {r: tombstones.Intervals{{1, 10}, {12, 23}, {25, 30}}}, + {r: tombstones.Intervals{{1, 23}, {12, 20}, {25, 30}}}, + {r: tombstones.Intervals{{1, 23}, {12, 20}, {25, 3000}}}, + {r: tombstones.Intervals{{0, 2000}}}, + {r: tombstones.Intervals{{500, 2000}}}, + {r: tombstones.Intervals{{0, 200}}}, + {r: tombstones.Intervals{{1000, 20000}}}, } for _, c := range cases { @@ -1275,7 +1277,7 @@ func TestDeletedIterator(t *testing.T) { for it.Next() { i++ for _, tr := range ranges { - if tr.inBounds(i) { + if tr.InBounds(i) { i = tr.Maxt + 1 ranges = ranges[1:] } @@ -1290,7 +1292,7 @@ func TestDeletedIterator(t *testing.T) { // There has been an extra call to Next(). i++ for _, tr := range ranges { - if tr.inBounds(i) { + if tr.InBounds(i) { i = tr.Maxt + 1 ranges = ranges[1:] } diff --git a/record.go b/record/record.go similarity index 65% rename from record.go rename to record/record.go index 8d9c5751..2bc2bc4f 100644 --- a/record.go +++ b/record/record.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package record import ( "math" @@ -21,45 +21,64 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/tombstones" ) -// RecordType represents the data type of a record. -type RecordType uint8 +// Type represents the data type of a record. +type Type uint8 const ( - // RecordInvalid is returned for unrecognised WAL record types. - RecordInvalid RecordType = 255 - // RecordSeries is used to match WAL records of type Series. - RecordSeries RecordType = 1 - // RecordSamples is used to match WAL records of type Samples. - RecordSamples RecordType = 2 - // RecordTombstones is used to match WAL records of type Tombstones. - RecordTombstones RecordType = 3 + // Invalid is returned for unrecognised WAL record types. + Invalid Type = 255 + // Series is used to match WAL records of type Series. + Series Type = 1 + // Samples is used to match WAL records of type Samples. + Samples Type = 2 + // Tombstones is used to match WAL records of type Tombstones. + Tombstones Type = 3 ) -// RecordDecoder decodes series, sample, and tombstone records. +var ( + // ErrNotFound is returned if a looked up resource was not found. Duplicate ErrNotFound from head.go. + ErrNotFound = errors.New("not found") +) + +// RefSeries is the series labels with the series ID. +type RefSeries struct { + Ref uint64 + Labels labels.Labels +} + +// RefSample is a timestamp/value pair associated with a reference to a series. +type RefSample struct { + Ref uint64 + T int64 + V float64 +} + +// Decoder decodes series, sample, and tombstone records. // The zero value is ready to use. -type RecordDecoder struct { +type Decoder struct { } // Type returns the type of the record. -// Return RecordInvalid if no valid record type is found. -func (d *RecordDecoder) Type(rec []byte) RecordType { +// Return Invalid if no valid record type is found. +func (d *Decoder) Type(rec []byte) Type { if len(rec) < 1 { - return RecordInvalid + return Invalid } - switch t := RecordType(rec[0]); t { - case RecordSeries, RecordSamples, RecordTombstones: + switch t := Type(rec[0]); t { + case Series, Samples, Tombstones: return t } - return RecordInvalid + return Invalid } // Series appends series in rec to the given slice. -func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { +func (d *Decoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) { dec := encoding.Decbuf{B: rec} - if RecordType(dec.Byte()) != RecordSeries { + if Type(dec.Byte()) != Series { return nil, errors.New("invalid record type") } for len(dec.B) > 0 && dec.Err() == nil { @@ -88,10 +107,10 @@ func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, err } // Samples appends samples in rec to the given slice. -func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { +func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { dec := encoding.Decbuf{B: rec} - if RecordType(dec.Byte()) != RecordSamples { + if Type(dec.Byte()) != Samples { return nil, errors.New("invalid record type") } if dec.Len() == 0 { @@ -123,16 +142,16 @@ func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, e } // Tombstones appends tombstones in rec to the given slice. -func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) { +func (d *Decoder) Tombstones(rec []byte, tstones []tombstones.Stone) ([]tombstones.Stone, error) { dec := encoding.Decbuf{B: rec} - if RecordType(dec.Byte()) != RecordTombstones { + if Type(dec.Byte()) != Tombstones { return nil, errors.New("invalid record type") } for dec.Len() > 0 && dec.Err() == nil { - tstones = append(tstones, Stone{ - ref: dec.Be64(), - intervals: Intervals{ + tstones = append(tstones, tombstones.Stone{ + Ref: dec.Be64(), + Intervals: tombstones.Intervals{ {Mint: dec.Varint64(), Maxt: dec.Varint64()}, }, }) @@ -146,15 +165,15 @@ func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) return tstones, nil } -// RecordEncoder encodes series, sample, and tombstones records. +// Encoder encodes series, sample, and tombstones records. // The zero value is ready to use. -type RecordEncoder struct { +type Encoder struct { } // Series appends the encoded series to b and returns the resulting slice. -func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { +func (e *Encoder) Series(series []RefSeries, b []byte) []byte { buf := encoding.Encbuf{B: b} - buf.PutByte(byte(RecordSeries)) + buf.PutByte(byte(Series)) for _, s := range series { buf.PutBE64(s.Ref) @@ -169,9 +188,9 @@ func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte { } // Samples appends the encoded samples to b and returns the resulting slice. -func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { +func (e *Encoder) Samples(samples []RefSample, b []byte) []byte { buf := encoding.Encbuf{B: b} - buf.PutByte(byte(RecordSamples)) + buf.PutByte(byte(Samples)) if len(samples) == 0 { return buf.Get() @@ -193,13 +212,13 @@ func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte { } // Tombstones appends the encoded tombstones to b and returns the resulting slice. -func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte { +func (e *Encoder) Tombstones(tstones []tombstones.Stone, b []byte) []byte { buf := encoding.Encbuf{B: b} - buf.PutByte(byte(RecordTombstones)) + buf.PutByte(byte(Tombstones)) for _, s := range tstones { - for _, iv := range s.intervals { - buf.PutBE64(s.ref) + for _, iv := range s.Intervals { + buf.PutBE64(s.Ref) buf.PutVarint64(iv.Mint) buf.PutVarint64(iv.Maxt) } diff --git a/record_test.go b/record/record_test.go similarity index 81% rename from record_test.go rename to record/record_test.go index 8316ccf3..304fa2b2 100644 --- a/record_test.go +++ b/record/record_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package record import ( "testing" @@ -21,11 +21,12 @@ import ( "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tombstones" ) func TestRecord_EncodeDecode(t *testing.T) { - var enc RecordEncoder - var dec RecordDecoder + var enc Encoder + var dec Decoder series := []RefSeries{ { @@ -54,31 +55,31 @@ func TestRecord_EncodeDecode(t *testing.T) { // Intervals get split up into single entries. So we don't get back exactly // what we put in. - tstones := []Stone{ - {ref: 123, intervals: Intervals{ + tstones := []tombstones.Stone{ + {Ref: 123, Intervals: tombstones.Intervals{ {Mint: -1000, Maxt: 1231231}, {Mint: 5000, Maxt: 0}, }}, - {ref: 13, intervals: Intervals{ + {Ref: 13, Intervals: tombstones.Intervals{ {Mint: -1000, Maxt: -11}, {Mint: 5000, Maxt: 1000}, }}, } decTstones, err := dec.Tombstones(enc.Tombstones(tstones, nil), nil) testutil.Ok(t, err) - testutil.Equals(t, []Stone{ - {ref: 123, intervals: Intervals{{Mint: -1000, Maxt: 1231231}}}, - {ref: 123, intervals: Intervals{{Mint: 5000, Maxt: 0}}}, - {ref: 13, intervals: Intervals{{Mint: -1000, Maxt: -11}}}, - {ref: 13, intervals: Intervals{{Mint: 5000, Maxt: 1000}}}, + testutil.Equals(t, []tombstones.Stone{ + {Ref: 123, Intervals: tombstones.Intervals{{Mint: -1000, Maxt: 1231231}}}, + {Ref: 123, Intervals: tombstones.Intervals{{Mint: 5000, Maxt: 0}}}, + {Ref: 13, Intervals: tombstones.Intervals{{Mint: -1000, Maxt: -11}}}, + {Ref: 13, Intervals: tombstones.Intervals{{Mint: 5000, Maxt: 1000}}}, }, decTstones) } // TestRecord_Corruputed ensures that corrupted records return the correct error. // Bugfix check for pull/521 and pull/523. func TestRecord_Corruputed(t *testing.T) { - var enc RecordEncoder - var dec RecordDecoder + var enc Encoder + var dec Decoder t.Run("Test corrupted series record", func(t *testing.T) { series := []RefSeries{ @@ -104,8 +105,8 @@ func TestRecord_Corruputed(t *testing.T) { }) t.Run("Test corrupted tombstone record", func(t *testing.T) { - tstones := []Stone{ - {ref: 123, intervals: Intervals{ + tstones := []tombstones.Stone{ + {Ref: 123, Intervals: tombstones.Intervals{ {Mint: -1000, Maxt: 1231231}, {Mint: 5000, Maxt: 0}, }}, diff --git a/tombstones.go b/tombstones/tombstones.go similarity index 78% rename from tombstones.go rename to tombstones/tombstones.go index d7b76230..e82bccb0 100644 --- a/tombstones.go +++ b/tombstones/tombstones.go @@ -11,11 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package tombstones import ( "encoding/binary" "fmt" + "hash" + "hash/crc32" "io" "io/ioutil" "os" @@ -30,7 +32,7 @@ import ( "github.com/prometheus/tsdb/fileutil" ) -const tombstoneFilename = "tombstones" +const TombstoneFilename = "tombstones" const ( // MagicTombstone is 4 bytes at the head of a tombstone file. @@ -39,6 +41,21 @@ const ( tombstoneFormatV1 = 1 ) +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table + +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +} + +// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the +// polynomial may be easily changed in one location at a later time, if necessary. +func newCRC32() hash.Hash32 { + return crc32.New(castagnoliTable) +} + // TombstoneReader gives access to tombstone intervals by series reference. type TombstoneReader interface { // Get returns deletion intervals for the series with the given reference. @@ -54,8 +71,8 @@ type TombstoneReader interface { Close() error } -func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) { - path := filepath.Join(dir, tombstoneFilename) +func WriteTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) { + path := filepath.Join(dir, TombstoneFilename) tmp := path + ".tmp" hash := newCRC32() var size int @@ -129,14 +146,14 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int6 // Stone holds the information on the posting and time-range // that is deleted. type Stone struct { - ref uint64 - intervals Intervals + Ref uint64 + Intervals Intervals } -func readTombstones(dir string) (TombstoneReader, int64, error) { - b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) +func ReadTombstones(dir string) (TombstoneReader, int64, error) { + b, err := ioutil.ReadFile(filepath.Join(dir, TombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), 0, nil + return NewMemTombstones(), 0, nil } else if err != nil { return nil, 0, err } @@ -166,7 +183,7 @@ func readTombstones(dir string) (TombstoneReader, int64, error) { return nil, 0, errors.New("checksum did not match") } - stonesMap := newMemTombstones() + stonesMap := NewMemTombstones() for d.Len() > 0 { k := d.Uvarint64() @@ -176,7 +193,7 @@ func readTombstones(dir string) (TombstoneReader, int64, error) { return nil, 0, d.Err() } - stonesMap.addInterval(k, Interval{mint, maxt}) + stonesMap.AddInterval(k, Interval{mint, maxt}) } return stonesMap, int64(len(b)), nil @@ -187,9 +204,9 @@ type memTombstones struct { mtx sync.RWMutex } -// newMemTombstones creates new in memory TombstoneReader +// NewMemTombstones creates new in memory TombstoneReader // that allows adding new intervals. -func newMemTombstones() *memTombstones { +func NewMemTombstones() *memTombstones { return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } @@ -221,12 +238,12 @@ func (t *memTombstones) Total() uint64 { return total } -// addInterval to an existing memTombstones -func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { +// AddInterval to an existing MemTombstones +func (t *memTombstones) AddInterval(ref uint64, itvs ...Interval) { t.mtx.Lock() defer t.mtx.Unlock() for _, itv := range itvs { - t.intvlGroups[ref] = t.intvlGroups[ref].add(itv) + t.intvlGroups[ref] = t.intvlGroups[ref].Add(itv) } } @@ -239,13 +256,13 @@ type Interval struct { Mint, Maxt int64 } -func (tr Interval) inBounds(t int64) bool { +func (tr Interval) InBounds(t int64) bool { return t >= tr.Mint && t <= tr.Maxt } -func (tr Interval) isSubrange(dranges Intervals) bool { +func (tr Interval) IsSubrange(dranges Intervals) bool { for _, r := range dranges { - if r.inBounds(tr.Mint) && r.inBounds(tr.Maxt) { + if r.InBounds(tr.Mint) && r.InBounds(tr.Maxt) { return true } } @@ -256,12 +273,12 @@ func (tr Interval) isSubrange(dranges Intervals) bool { // Intervals represents a set of increasing and non-overlapping time-intervals. type Intervals []Interval -// add the new time-range to the existing ones. +// Add the new time-range to the existing ones. // The existing ones must be sorted. -func (itvs Intervals) add(n Interval) Intervals { +func (itvs Intervals) Add(n Interval) Intervals { for i, r := range itvs { // TODO(gouthamve): Make this codepath easier to digest. - if r.inBounds(n.Mint-1) || r.inBounds(n.Mint) { + if r.InBounds(n.Mint-1) || r.InBounds(n.Mint) { if n.Maxt > r.Maxt { itvs[i].Maxt = n.Maxt } @@ -282,7 +299,7 @@ func (itvs Intervals) add(n Interval) Intervals { return itvs } - if r.inBounds(n.Maxt+1) || r.inBounds(n.Maxt) { + if r.InBounds(n.Maxt+1) || r.InBounds(n.Maxt) { if n.Mint < r.Maxt { itvs[i].Mint = n.Mint } diff --git a/tombstones_test.go b/tombstones/tombstones_test.go similarity index 88% rename from tombstones_test.go rename to tombstones/tombstones_test.go index 33ebb3bc..d30eff23 100644 --- a/tombstones_test.go +++ b/tombstones/tombstones_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package tombstones import ( "io/ioutil" @@ -33,7 +33,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := newMemTombstones() + stones := NewMemTombstones() // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -41,16 +41,16 @@ func TestWriteAndReadbackTombStones(t *testing.T) { dranges := make(Intervals, 0, numRanges) mint := rand.Int63n(time.Now().UnixNano()) for j := 0; j < numRanges; j++ { - dranges = dranges.add(Interval{mint, mint + rand.Int63n(1000)}) + dranges = dranges.Add(Interval{mint, mint + rand.Int63n(1000)}) mint += rand.Int63n(1000) + 1 } - stones.addInterval(ref, dranges...) + stones.AddInterval(ref, dranges...) } - _, err := writeTombstoneFile(log.NewNopLogger(), tmpdir, stones) + _, err := WriteTombstoneFile(log.NewNopLogger(), tmpdir, stones) testutil.Ok(t, err) - restr, _, err := readTombstones(tmpdir) + restr, _, err := ReadTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers. @@ -122,20 +122,20 @@ func TestAddingNewIntervals(t *testing.T) { for _, c := range cases { - testutil.Equals(t, c.exp, c.exist.add(c.new)) + testutil.Equals(t, c.exp, c.exist.Add(c.new)) } } // TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. func TestMemTombstonesConcurrency(t *testing.T) { - tomb := newMemTombstones() + tomb := NewMemTombstones() totalRuns := 100 var wg sync.WaitGroup wg.Add(2) go func() { for x := 0; x < totalRuns; x++ { - tomb.addInterval(uint64(x), Interval{int64(x), int64(x)}) + tomb.AddInterval(uint64(x), Interval{int64(x), int64(x)}) } wg.Done() }() diff --git a/wal.go b/wal.go index 49f55fe4..abcfc88d 100644 --- a/wal.go +++ b/wal.go @@ -34,6 +34,8 @@ import ( "github.com/prometheus/tsdb/encoding" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/record" + "github.com/prometheus/tsdb/tombstones" "github.com/prometheus/tsdb/wal" ) @@ -89,9 +91,9 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { // DEPRECATED: use wal pkg combined with the record codex instead. type WAL interface { Reader() WALReader - LogSeries([]RefSeries) error - LogSamples([]RefSample) error - LogDeletes([]Stone) error + LogSeries([]record.RefSeries) error + LogSamples([]record.RefSample) error + LogDeletes([]tombstones.Stone) error Truncate(mint int64, keep func(uint64) bool) error Close() error } @@ -99,27 +101,12 @@ type WAL interface { // WALReader reads entries from a WAL. type WALReader interface { Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), + seriesf func([]record.RefSeries), + samplesf func([]record.RefSample), + deletesf func([]tombstones.Stone), ) error } -// RefSeries is the series labels with the series ID. -type RefSeries struct { - Ref uint64 - Labels labels.Labels -} - -// RefSample is a timestamp/value pair associated with a reference to a series. -type RefSample struct { - Ref uint64 - T int64 - V float64 - - series *memSeries -} - // segmentFile wraps a file object of a segment and tracks the highest timestamp // it contains. During WAL truncating, all segments with no higher timestamp than // the truncation threshold can be compacted. @@ -240,9 +227,9 @@ type repairingWALReader struct { } func (r *repairingWALReader) Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), + seriesf func([]record.RefSeries), + samplesf func([]record.RefSample), + deletesf func([]tombstones.Stone), ) error { err := r.r.Read(seriesf, samplesf, deletesf) if err == nil { @@ -348,8 +335,8 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { var ( csf = newSegmentFile(f) crc32 = newCRC32() - decSeries = []RefSeries{} - activeSeries = []RefSeries{} + decSeries = []record.RefSeries{} + activeSeries = []record.RefSeries{} ) for r.next() { @@ -427,7 +414,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { // LogSeries writes a batch of new series labels to the log. // The series have to be ordered. -func (w *SegmentWAL) LogSeries(series []RefSeries) error { +func (w *SegmentWAL) LogSeries(series []record.RefSeries) error { buf := w.getBuffer() flag := w.encodeSeries(buf, series) @@ -454,7 +441,7 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { } // LogSamples writes a batch of new samples to the log. -func (w *SegmentWAL) LogSamples(samples []RefSample) error { +func (w *SegmentWAL) LogSamples(samples []record.RefSample) error { buf := w.getBuffer() flag := w.encodeSamples(buf, samples) @@ -480,7 +467,7 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { } // LogDeletes write a batch of new deletes to the log. -func (w *SegmentWAL) LogDeletes(stones []Stone) error { +func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error { buf := w.getBuffer() flag := w.encodeDeletes(buf, stones) @@ -498,7 +485,7 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { tf := w.head() for _, s := range stones { - for _, iv := range s.intervals { + for _, iv := range s.Intervals { if tf.maxTime < iv.Maxt { tf.maxTime = iv.Maxt } @@ -791,7 +778,7 @@ const ( walDeletesSimple = 1 ) -func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []RefSeries) uint8 { +func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []record.RefSeries) uint8 { for _, s := range series { buf.PutBE64(s.Ref) buf.PutUvarint(len(s.Labels)) @@ -804,7 +791,7 @@ func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []RefSeries) uint return walSeriesSimple } -func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []RefSample) uint8 { +func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []record.RefSample) uint8 { if len(samples) == 0 { return walSamplesSimple } @@ -825,10 +812,10 @@ func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []RefSample) ui return walSamplesSimple } -func (w *SegmentWAL) encodeDeletes(buf *encoding.Encbuf, stones []Stone) uint8 { +func (w *SegmentWAL) encodeDeletes(buf *encoding.Encbuf, stones []tombstones.Stone) uint8 { for _, s := range stones { - for _, iv := range s.intervals { - buf.PutBE64(s.ref) + for _, iv := range s.Intervals { + buf.PutBE64(s.Ref) buf.PutVarint64(iv.Mint) buf.PutVarint64(iv.Maxt) } @@ -871,9 +858,9 @@ func (r *walReader) Err() error { } func (r *walReader) Read( - seriesf func([]RefSeries), - samplesf func([]RefSample), - deletesf func([]Stone), + seriesf func([]record.RefSeries), + samplesf func([]record.RefSample), + deletesf func([]tombstones.Stone), ) error { // Concurrency for replaying the WAL is very limited. We at least split out decoding and // processing into separate threads. @@ -892,19 +879,19 @@ func (r *walReader) Read( for x := range datac { switch v := x.(type) { - case []RefSeries: + case []record.RefSeries: if seriesf != nil { seriesf(v) } //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. seriesPool.Put(v[:0]) - case []RefSample: + case []record.RefSample: if samplesf != nil { samplesf(v) } //lint:ignore SA6002 safe to ignore and actually fixing it has some performance penalty. samplePool.Put(v[:0]) - case []Stone: + case []tombstones.Stone: if deletesf != nil { deletesf(v) } @@ -925,11 +912,11 @@ func (r *walReader) Read( // Those should generally be catched by entry decoding before. switch et { case WALEntrySeries: - var series []RefSeries + var series []record.RefSeries if v := seriesPool.Get(); v == nil { - series = make([]RefSeries, 0, 512) + series = make([]record.RefSeries, 0, 512) } else { - series = v.([]RefSeries) + series = v.([]record.RefSeries) } err = r.decodeSeries(flag, b, &series) @@ -946,11 +933,11 @@ func (r *walReader) Read( } } case WALEntrySamples: - var samples []RefSample + var samples []record.RefSample if v := samplePool.Get(); v == nil { - samples = make([]RefSample, 0, 512) + samples = make([]record.RefSample, 0, 512) } else { - samples = v.([]RefSample) + samples = v.([]record.RefSample) } err = r.decodeSamples(flag, b, &samples) @@ -968,11 +955,11 @@ func (r *walReader) Read( } } case WALEntryDeletes: - var deletes []Stone + var deletes []tombstones.Stone if v := deletePool.Get(); v == nil { - deletes = make([]Stone, 0, 512) + deletes = make([]tombstones.Stone, 0, 512) } else { - deletes = v.([]Stone) + deletes = v.([]tombstones.Stone) } err = r.decodeDeletes(flag, b, &deletes) @@ -985,7 +972,7 @@ func (r *walReader) Read( // Update the times for the WAL segment file. cf := r.current() for _, s := range deletes { - for _, iv := range s.intervals { + for _, iv := range s.Intervals { if cf.maxTime < iv.Maxt { cf.maxTime = iv.Maxt } @@ -1122,7 +1109,7 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { +func (r *walReader) decodeSeries(flag byte, b []byte, res *[]record.RefSeries) error { dec := encoding.Decbuf{B: b} for len(dec.B) > 0 && dec.Err() == nil { @@ -1136,7 +1123,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { } sort.Sort(lset) - *res = append(*res, RefSeries{ + *res = append(*res, record.RefSeries{ Ref: ref, Labels: lset, }) @@ -1150,7 +1137,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error { return nil } -func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { +func (r *walReader) decodeSamples(flag byte, b []byte, res *[]record.RefSample) error { if len(b) == 0 { return nil } @@ -1166,7 +1153,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { dtime := dec.Varint64() val := dec.Be64() - *res = append(*res, RefSample{ + *res = append(*res, record.RefSample{ Ref: uint64(int64(baseRef) + dref), T: baseTime + dtime, V: math.Float64frombits(val), @@ -1182,13 +1169,13 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error { return nil } -func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { +func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]tombstones.Stone) error { dec := &encoding.Decbuf{B: b} for dec.Len() > 0 && dec.Err() == nil { - *res = append(*res, Stone{ - ref: dec.Be64(), - intervals: Intervals{ + *res = append(*res, tombstones.Stone{ + Ref: dec.Be64(), + Intervals: tombstones.Intervals{ {Mint: dec.Varint64(), Maxt: dec.Varint64()}, }, }) @@ -1268,23 +1255,23 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { rdr := w.Reader() var ( - enc RecordEncoder + enc record.Encoder b []byte ) decErr := rdr.Read( - func(s []RefSeries) { + func(s []record.RefSeries) { if err != nil { return } err = repl.Log(enc.Series(s, b[:0])) }, - func(s []RefSample) { + func(s []record.RefSample) { if err != nil { return } err = repl.Log(enc.Samples(s, b[:0])) }, - func(s []Stone) { + func(s []tombstones.Stone) { if err != nil { return } diff --git a/checkpoint.go b/wal/checkpoint.go similarity index 88% rename from checkpoint.go rename to wal/checkpoint.go index eccfa62b..130c8357 100644 --- a/checkpoint.go +++ b/wal/checkpoint.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package wal import ( "fmt" @@ -27,7 +27,8 @@ import ( "github.com/pkg/errors" tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/fileutil" - "github.com/prometheus/tsdb/wal" + "github.com/prometheus/tsdb/record" + "github.com/prometheus/tsdb/tombstones" ) // CheckpointStats returns stats about a created checkpoint. @@ -63,7 +64,7 @@ func LastCheckpoint(dir string) (string, int, error) { } return filepath.Join(dir, fi.Name()), idx, nil } - return "", 0, ErrNotFound + return "", 0, record.ErrNotFound } // DeleteCheckpoints deletes all checkpoints in a directory below a given index. @@ -99,15 +100,15 @@ const checkpointPrefix = "checkpoint." // segmented format as the original WAL itself. // This makes it easy to read it through the WAL package and concatenate // it with the original WAL. -func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { +func Checkpoint(w *WAL, from, to int, keep func(id uint64) bool, mint int64) (*CheckpointStats, error) { stats := &CheckpointStats{} var sgmReader io.ReadCloser { - var sgmRange []wal.SegmentRange + var sgmRange []SegmentRange dir, idx, err := LastCheckpoint(w.Dir()) - if err != nil && err != ErrNotFound { + if err != nil && err != record.ErrNotFound { return nil, errors.Wrap(err, "find last checkpoint") } last := idx + 1 @@ -118,11 +119,11 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) // Ignore WAL files below the checkpoint. They shouldn't exist to begin with. from = last - sgmRange = append(sgmRange, wal.SegmentRange{Dir: dir, Last: math.MaxInt32}) + sgmRange = append(sgmRange, SegmentRange{Dir: dir, Last: math.MaxInt32}) } - sgmRange = append(sgmRange, wal.SegmentRange{Dir: w.Dir(), First: from, Last: to}) - sgmReader, err = wal.NewSegmentsRangeReader(sgmRange...) + sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to}) + sgmReader, err = NewSegmentsRangeReader(sgmRange...) if err != nil { return nil, errors.Wrap(err, "create segment reader") } @@ -135,7 +136,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) if err := os.MkdirAll(cpdirtmp, 0777); err != nil { return nil, errors.Wrap(err, "create checkpoint dir") } - cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled()) + cp, err := New(nil, nil, cpdirtmp, w.CompressionEnabled()) if err != nil { return nil, errors.Wrap(err, "open checkpoint") } @@ -146,14 +147,14 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) os.RemoveAll(cpdirtmp) }() - r := wal.NewReader(sgmReader) + r := NewReader(sgmReader) var ( - series []RefSeries - samples []RefSample - tstones []Stone - dec RecordDecoder - enc RecordEncoder + series []record.RefSeries + samples []record.RefSample + tstones []tombstones.Stone + dec record.Decoder + enc record.Encoder buf []byte recs [][]byte ) @@ -167,7 +168,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err = dec.Series(rec, series) if err != nil { return nil, errors.Wrap(err, "decode series") @@ -185,7 +186,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) stats.TotalSeries += len(series) stats.DroppedSeries += len(series) - len(repl) - case RecordSamples: + case record.Samples: samples, err = dec.Samples(rec, samples) if err != nil { return nil, errors.Wrap(err, "decode samples") @@ -203,7 +204,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) stats.TotalSamples += len(samples) stats.DroppedSamples += len(samples) - len(repl) - case RecordTombstones: + case record.Tombstones: tstones, err = dec.Tombstones(rec, tstones) if err != nil { return nil, errors.Wrap(err, "decode deletes") @@ -211,7 +212,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64) // Drop irrelevant tombstones in place. repl := tstones[:0] for _, s := range tstones { - for _, iv := range s.intervals { + for _, iv := range s.Intervals { if iv.Maxt >= mint { repl = append(repl, s) break diff --git a/checkpoint_test.go b/wal/checkpoint_test.go similarity index 89% rename from checkpoint_test.go rename to wal/checkpoint_test.go index 0779894b..1d431ae7 100644 --- a/checkpoint_test.go +++ b/wal/checkpoint_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package wal import ( "fmt" @@ -25,8 +25,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/record" "github.com/prometheus/tsdb/testutil" - "github.com/prometheus/tsdb/wal" ) func TestLastCheckpoint(t *testing.T) { @@ -37,7 +37,7 @@ func TestLastCheckpoint(t *testing.T) { }() _, _, err = LastCheckpoint(dir) - testutil.Equals(t, ErrNotFound, err) + testutil.Equals(t, record.ErrNotFound, err) testutil.Ok(t, os.MkdirAll(filepath.Join(dir, "checkpoint.0000"), 0777)) s, k, err := LastCheckpoint(dir) @@ -94,18 +94,18 @@ func TestCheckpoint(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - var enc RecordEncoder + var enc record.Encoder // Create a dummy segment to bump the initial number. - seg, err := wal.CreateSegment(dir, 100) + seg, err := CreateSegment(dir, 100) testutil.Ok(t, err) testutil.Ok(t, seg.Close()) // Manually create checkpoint for 99 and earlier. - w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress) + w, err := New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress) testutil.Ok(t, err) // Add some data we expect to be around later. - err = w.Log(enc.Series([]RefSeries{ + err = w.Log(enc.Series([]record.RefSeries{ {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, }, nil)) @@ -113,7 +113,7 @@ func TestCheckpoint(t *testing.T) { testutil.Ok(t, w.Close()) // Start a WAL and write records to it as usual. - w, err = wal.NewSize(nil, nil, dir, 64*1024, compress) + w, err = NewSize(nil, nil, dir, 64*1024, compress) testutil.Ok(t, err) var last int64 @@ -125,7 +125,7 @@ func TestCheckpoint(t *testing.T) { } // Write some series initially. if i == 0 { - b := enc.Series([]RefSeries{ + b := enc.Series([]record.RefSeries{ {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, @@ -136,7 +136,7 @@ func TestCheckpoint(t *testing.T) { // Write samples until the WAL has enough segments. // Make them have drifting timestamps within a record to see that they // get filtered properly. - b := enc.Samples([]RefSample{ + b := enc.Samples([]record.RefSample{ {Ref: 0, T: last, V: float64(i)}, {Ref: 1, T: last + 10000, V: float64(i)}, {Ref: 2, T: last + 20000, V: float64(i)}, @@ -161,22 +161,22 @@ func TestCheckpoint(t *testing.T) { testutil.Equals(t, 1, len(files)) testutil.Equals(t, "checkpoint.000106", files[0]) - sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) + sr, err := NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) testutil.Ok(t, err) defer sr.Close() - var dec RecordDecoder - var series []RefSeries - r := wal.NewReader(sr) + var dec record.Decoder + var series []record.RefSeries + r := NewReader(sr) for r.Next() { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: series, err = dec.Series(rec, series) testutil.Ok(t, err) - case RecordSamples: + case record.Samples: samples, err := dec.Samples(rec, nil) testutil.Ok(t, err) for _, s := range samples { @@ -185,7 +185,7 @@ func TestCheckpoint(t *testing.T) { } } testutil.Ok(t, r.Err()) - testutil.Equals(t, []RefSeries{ + testutil.Equals(t, []record.RefSeries{ {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, @@ -201,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.NewSize(nil, nil, dir, 64*1024, false) + w, err := NewSize(nil, nil, dir, 64*1024, false) testutil.Ok(t, err) testutil.Ok(t, w.Log([]byte{99})) w.Close() diff --git a/wal/reader_test.go b/wal/reader_test.go index 96d15225..0bb0cb13 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -41,7 +41,7 @@ type reader interface { Offset() int64 } -type record struct { +type rec struct { t recType b []byte } @@ -59,13 +59,13 @@ var readerConstructors = map[string]func(io.Reader) reader{ var data = make([]byte, 100000) var testReaderCases = []struct { - t []record + t []rec exp [][]byte fail bool }{ // Sequence of valid records. { - t: []record{ + t: []rec{ {recFull, data[0:200]}, {recFirst, data[200:300]}, {recLast, data[300:400]}, @@ -89,7 +89,7 @@ var testReaderCases = []struct { }, // Exactly at the limit of one page minus the header size { - t: []record{ + t: []rec{ {recFull, data[0 : pageSize-recordHeaderSize]}, }, exp: [][]byte{ @@ -99,7 +99,7 @@ var testReaderCases = []struct { // More than a full page, this exceeds our buffer and can never happen // when written by the WAL. { - t: []record{ + t: []rec{ {recFull, data[0 : pageSize+1]}, }, fail: true, @@ -108,7 +108,7 @@ var testReaderCases = []struct { // NB currently the non-live reader succeeds on this. I think this is a bug. // but we've seen it in production. { - t: []record{ + t: []rec{ {recFull, data[:pageSize/2]}, {recFull, data[:pageSize/2]}, }, @@ -119,22 +119,22 @@ var testReaderCases = []struct { }, // Invalid orders of record types. { - t: []record{{recMiddle, data[:200]}}, + t: []rec{{recMiddle, data[:200]}}, fail: true, }, { - t: []record{{recLast, data[:200]}}, + t: []rec{{recLast, data[:200]}}, fail: true, }, { - t: []record{ + t: []rec{ {recFirst, data[:200]}, {recFull, data[200:400]}, }, fail: true, }, { - t: []record{ + t: []rec{ {recFirst, data[:100]}, {recMiddle, data[100:200]}, {recFull, data[200:400]}, @@ -143,7 +143,7 @@ var testReaderCases = []struct { }, // Non-zero data after page termination. { - t: []record{ + t: []rec{ {recFull, data[:100]}, {recPageTerm, append(make([]byte, pageSize-recordHeaderSize-102), 1)}, }, diff --git a/wal/watcher.go b/wal/watcher.go new file mode 100644 index 00000000..fb6e3c64 --- /dev/null +++ b/wal/watcher.go @@ -0,0 +1,580 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "fmt" + "io" + "math" + "os" + "path" + "sort" + "strconv" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/record" +) + +const ( + readPeriod = 10 * time.Millisecond + checkpointPeriod = 5 * time.Second + segmentCheckPeriod = 100 * time.Millisecond + consumer = "consumer" +) + +// fromTime returns a new millisecond timestamp from a time. +// This function is copied from prometheus/prometheus/pkg/timestamp to avoid adding vendor to TSDB repo. +func fromTime(t time.Time) int64 { + return t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond) +} + +type WriteTo interface { + Append([]record.RefSample) bool + StoreSeries([]record.RefSeries, int) + SeriesReset(int) +} + +type WatcherMetrics struct { + recordsRead *prometheus.CounterVec + recordDecodeFails *prometheus.CounterVec + samplesSentPreTailing *prometheus.CounterVec + currentSegment *prometheus.GaugeVec +} + +// Watcher watches the TSDB WAL for a given WriteTo. +type Watcher struct { + name string + writer WriteTo + logger log.Logger + walDir string + lastCheckpoint string + metrics *WatcherMetrics + readerMetrics *liveReaderMetrics + + startTime int64 + + recordsReadMetric *prometheus.CounterVec + recordDecodeFailsMetric prometheus.Counter + samplesSentPreTailing prometheus.Counter + currentSegmentMetric prometheus.Gauge + + quit chan struct{} + done chan struct{} + + // For testing, stop when we hit this segment. + maxSegment int +} + +func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { + m := &WatcherMetrics{ + recordsRead: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "records_read_total", + Help: "Number of records read by the WAL watcher from the WAL.", + }, + []string{consumer, "type"}, + ), + recordDecodeFails: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "record_decode_failures_total", + Help: "Number of records read by the WAL watcher that resulted in an error when decoding.", + }, + []string{consumer}, + ), + samplesSentPreTailing: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "samples_sent_pre_tailing_total", + Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.", + }, + []string{consumer}, + ), + currentSegment: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "current_segment", + Help: "Current segment the WAL watcher is reading records from.", + }, + []string{consumer}, + ), + } + + if reg != nil { + reg.MustRegister(m.recordsRead) + reg.MustRegister(m.recordDecodeFails) + reg.MustRegister(m.samplesSentPreTailing) + reg.MustRegister(m.currentSegment) + } + + return m +} + +// NewWatcher creates a new WAL watcher for a given WriteTo. +func NewWatcher(reg prometheus.Registerer, metrics *WatcherMetrics, logger log.Logger, name string, writer WriteTo, walDir string) *Watcher { + if logger == nil { + logger = log.NewNopLogger() + } + + w := Watcher{ + logger: logger, + writer: writer, + metrics: metrics, + readerMetrics: NewLiveReaderMetrics(reg), + walDir: path.Join(walDir, "wal"), + name: name, + quit: make(chan struct{}), + done: make(chan struct{}), + + maxSegment: -1, + } + return &w +} + +func (w *Watcher) setMetrics() { + // Setup the WAL Watchers metrics. We do this here rather than in the + // constructor because of the ordering of creating Queue Managers's, + // stopping them, and then starting new ones in storage/remote/storage.go ApplyConfig. + w.recordsReadMetric = w.metrics.recordsRead.MustCurryWith(prometheus.Labels{consumer: w.name}) + w.recordDecodeFailsMetric = w.metrics.recordDecodeFails.WithLabelValues(w.name) + w.samplesSentPreTailing = w.metrics.samplesSentPreTailing.WithLabelValues(w.name) + w.currentSegmentMetric = w.metrics.currentSegment.WithLabelValues(w.name) +} + +// Start the Watcher. +func (w *Watcher) Start() { + w.setMetrics() + level.Info(w.logger).Log("msg", "starting WAL watcher", "consumer", w.name) + + go w.loop() +} + +// Stop the Watcher. +func (w *Watcher) Stop() { + close(w.quit) + <-w.done + + // Records read metric has series and samples. + w.metrics.recordsRead.DeleteLabelValues(w.name, "series") + w.metrics.recordsRead.DeleteLabelValues(w.name, "samples") + w.metrics.recordDecodeFails.DeleteLabelValues(w.name) + w.metrics.samplesSentPreTailing.DeleteLabelValues(w.name) + w.metrics.currentSegment.DeleteLabelValues(w.name) + + level.Info(w.logger).Log("msg", "WAL watcher stopped", "queue", w.name) +} + +func (w *Watcher) loop() { + defer close(w.done) + + // We may encourter failures processing the WAL; we should wait and retry. + for !isClosed(w.quit) { + w.startTime = fromTime(time.Now()) + if err := w.run(); err != nil { + level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) + } + + select { + case <-w.quit: + return + case <-time.After(5 * time.Second): + } + } +} + +func (w *Watcher) run() error { + _, lastSegment, err := w.firstAndLast() + if err != nil { + return errors.Wrap(err, "wal.Segments") + } + + // Backfill from the checkpoint first if it exists. + lastCheckpoint, checkpointIndex, err := LastCheckpoint(w.walDir) + if err != nil && err != record.ErrNotFound { + return errors.Wrap(err, "LastCheckpoint") + } + + if err == nil { + if err = w.readCheckpoint(lastCheckpoint); err != nil { + return errors.Wrap(err, "readCheckpoint") + } + } + w.lastCheckpoint = lastCheckpoint + + currentSegment, err := w.findSegmentForIndex(checkpointIndex) + if err != nil { + return err + } + + level.Debug(w.logger).Log("msg", "tailing WAL", "lastCheckpoint", lastCheckpoint, "checkpointIndex", checkpointIndex, "currentSegment", currentSegment, "lastSegment", lastSegment) + for !isClosed(w.quit) { + w.currentSegmentMetric.Set(float64(currentSegment)) + level.Debug(w.logger).Log("msg", "processing segment", "currentSegment", currentSegment) + + // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. + // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. + if err := w.watch(currentSegment, currentSegment >= lastSegment); err != nil { + return err + } + + // For testing: stop when you hit a specific segment. + if currentSegment == w.maxSegment { + return nil + } + + currentSegment++ + } + + return nil +} + +// findSegmentForIndex finds the first segment greater than or equal to index. +func (w *Watcher) findSegmentForIndex(index int) (int, error) { + refs, err := w.segments(w.walDir) + if err != nil { + return -1, err + } + + for _, r := range refs { + if r >= index { + return r, nil + } + } + + return -1, errors.New("failed to find segment for index") +} + +func (w *Watcher) firstAndLast() (int, int, error) { + refs, err := w.segments(w.walDir) + if err != nil { + return -1, -1, err + } + + if len(refs) == 0 { + return -1, -1, nil + } + return refs[0], refs[len(refs)-1], nil +} + +// Copied from tsdb/wal/wal.go so we do not have to open a WAL. +// Plan is to move WAL watcher to TSDB and dedupe these implementations. +func (w *Watcher) segments(dir string) ([]int, error) { + files, err := fileutil.ReadDir(dir) + if err != nil { + return nil, err + } + + var refs []int + var last int + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil { + continue + } + if len(refs) > 0 && k > last+1 { + return nil, errors.New("segments are not sequential") + } + refs = append(refs, k) + last = k + } + sort.Ints(refs) + + return refs, nil +} + +// Use tail true to indicate that the reader is currently on a segment that is +// actively being written to. If false, assume it's a full segment and we're +// replaying it on start to cache the series records. +func (w *Watcher) watch(segmentNum int, tail bool) error { + segment, err := OpenReadSegment(SegmentName(w.walDir, segmentNum)) + if err != nil { + return err + } + defer segment.Close() + + reader := NewLiveReader(w.logger, w.readerMetrics, segment) + + readTicker := time.NewTicker(readPeriod) + defer readTicker.Stop() + + checkpointTicker := time.NewTicker(checkpointPeriod) + defer checkpointTicker.Stop() + + segmentTicker := time.NewTicker(segmentCheckPeriod) + defer segmentTicker.Stop() + + // If we're replaying the segment we need to know the size of the file to know + // when to return from watch and move on to the next segment. + size := int64(math.MaxInt64) + if !tail { + segmentTicker.Stop() + checkpointTicker.Stop() + var err error + size, err = getSegmentSize(w.walDir, segmentNum) + if err != nil { + return errors.Wrap(err, "getSegmentSize") + } + } + + for { + select { + case <-w.quit: + return nil + + case <-checkpointTicker.C: + // Periodically check if there is a new checkpoint so we can garbage + // collect labels. As this is considered an optimisation, we ignore + // errors during checkpoint processing. + if err := w.garbageCollectSeries(segmentNum); err != nil { + level.Warn(w.logger).Log("msg", "error process checkpoint", "err", err) + } + + case <-segmentTicker.C: + _, last, err := w.firstAndLast() + if err != nil { + return errors.Wrap(err, "segments") + } + + // Check if new segments exists. + if last <= segmentNum { + continue + } + + err = w.readSegment(reader, segmentNum, tail) + + // Ignore errors reading to end of segment whilst replaying the WAL. + if !tail { + if err != nil && err != io.EOF { + level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "err", err) + } else if reader.Offset() != size { + level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) + } + return nil + } + + // Otherwise, when we are tailing, non-EOFs are fatal. + if err != io.EOF { + return err + } + + return nil + + case <-readTicker.C: + err = w.readSegment(reader, segmentNum, tail) + + // Ignore all errors reading to end of segment whilst replaying the WAL. + if !tail { + if err != nil && err != io.EOF { + level.Warn(w.logger).Log("msg", "ignoring error reading to end of segment, may have dropped data", "segment", segmentNum, "err", err) + } else if reader.Offset() != size { + level.Warn(w.logger).Log("msg", "expected to have read whole segment, may have dropped data", "segment", segmentNum, "read", reader.Offset(), "size", size) + } + return nil + } + + // Otherwise, when we are tailing, non-EOFs are fatal. + if err != io.EOF { + return err + } + } + } +} + +func (w *Watcher) garbageCollectSeries(segmentNum int) error { + dir, _, err := LastCheckpoint(w.walDir) + if err != nil && err != record.ErrNotFound { + return errors.Wrap(err, "LastCheckpoint") + } + + if dir == "" || dir == w.lastCheckpoint { + return nil + } + w.lastCheckpoint = dir + + index, err := checkpointNum(dir) + if err != nil { + return errors.Wrap(err, "error parsing checkpoint filename") + } + + if index >= segmentNum { + level.Debug(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", segmentNum), "checkpoint", dir) + return nil + } + + level.Debug(w.logger).Log("msg", "new checkpoint detected", "new", dir, "currentSegment", segmentNum) + + if err = w.readCheckpoint(dir); err != nil { + return errors.Wrap(err, "readCheckpoint") + } + + // Clear series with a checkpoint or segment index # lower than the checkpoint we just read. + w.writer.SeriesReset(index) + return nil +} + +func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { + var ( + dec record.Decoder + series []record.RefSeries + samples []record.RefSample + send []record.RefSample + ) + + for r.Next() && !isClosed(w.quit) { + rec := r.Record() + w.recordsReadMetric.WithLabelValues(recordType(dec.Type(rec))).Inc() + + switch dec.Type(rec) { + case record.Series: + series, err := dec.Series(rec, series[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.StoreSeries(series, segmentNum) + + case record.Samples: + // If we're not tailing a segment we can ignore any samples records we see. + // This speeds up replay of the WAL by > 10x. + if !tail { + break + } + samples, err := dec.Samples(rec, samples[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + for _, s := range samples { + if s.T > w.startTime { + send = append(send, s) + } + } + if len(send) > 0 { + // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). + w.writer.Append(send) + send = send[:0] + } + + case record.Tombstones: + // noop + case record.Invalid: + return errors.New("invalid record") + + default: + w.recordDecodeFailsMetric.Inc() + return errors.New("unknown TSDB record type") + } + } + return r.Err() +} + +func recordType(rt record.Type) string { + switch rt { + case record.Invalid: + return "invalid" + case record.Series: + return "series" + case record.Samples: + return "samples" + case record.Tombstones: + return "tombstones" + default: + return "unknown" + } +} + +// Read all the series records from a Checkpoint directory. +func (w *Watcher) readCheckpoint(checkpointDir string) error { + level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", checkpointDir) + index, err := checkpointNum(checkpointDir) + if err != nil { + return errors.Wrap(err, "checkpointNum") + } + + // Ensure we read the whole contents of every segment in the checkpoint dir. + segs, err := w.segments(checkpointDir) + if err != nil { + return errors.Wrap(err, "Unable to get segments checkpoint dir") + } + for _, seg := range segs { + size, err := getSegmentSize(checkpointDir, seg) + if err != nil { + return errors.Wrap(err, "getSegmentSize") + } + + sr, err := OpenReadSegment(SegmentName(checkpointDir, seg)) + if err != nil { + return errors.Wrap(err, "unable to open segment") + } + defer sr.Close() + + r := NewLiveReader(w.logger, w.readerMetrics, sr) + if err := w.readSegment(r, index, false); err != io.EOF && err != nil { + return errors.Wrap(err, "readSegment") + } + + if r.Offset() != size { + return fmt.Errorf("readCheckpoint wasn't able to read all data from the checkpoint %s/%08d, size: %d, totalRead: %d", checkpointDir, seg, size, r.Offset()) + } + } + + level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) + return nil +} + +func checkpointNum(dir string) (int, error) { + // Checkpoint dir names are in the format checkpoint.000001 + // dir may contain a hidden directory, so only check the base directory + chunks := strings.Split(path.Base(dir), ".") + if len(chunks) != 2 { + return 0, errors.Errorf("invalid checkpoint dir string: %s", dir) + } + + result, err := strconv.Atoi(chunks[1]) + if err != nil { + return 0, errors.Errorf("invalid checkpoint dir string: %s", dir) + } + + return result, nil +} + +// Get size of segment. +func getSegmentSize(dir string, index int) (int64, error) { + i := int64(-1) + fi, err := os.Stat(SegmentName(dir, index)) + if err == nil { + i = fi.Size() + } + return i, err +} + +func isClosed(c chan struct{}) bool { + select { + case <-c: + return true + default: + return false + } +} diff --git a/wal/watcher_test.go b/wal/watcher_test.go new file mode 100644 index 00000000..e559464f --- /dev/null +++ b/wal/watcher_test.go @@ -0,0 +1,536 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package wal + +import ( + "fmt" + "io/ioutil" + "math/rand" + "os" + "path" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/record" + "github.com/prometheus/tsdb/testutil" +) + +var defaultRetryInterval = 100 * time.Millisecond +var defaultRetries = 100 +var wMetrics = NewWatcherMetrics(prometheus.DefaultRegisterer) + +// retry executes f() n times at each interval until it returns true. +func retry(t *testing.T, interval time.Duration, n int, f func() bool) { + t.Helper() + ticker := time.NewTicker(interval) + for i := 0; i <= n; i++ { + if f() { + return + } + <-ticker.C + } + ticker.Stop() + t.Logf("function returned false") +} + +type writeToMock struct { + samplesAppended int + seriesLock sync.Mutex + seriesSegmentIndexes map[uint64]int +} + +func (wtm *writeToMock) Append(s []record.RefSample) bool { + wtm.samplesAppended += len(s) + return true +} + +func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { + wtm.seriesLock.Lock() + defer wtm.seriesLock.Unlock() + for _, s := range series { + wtm.seriesSegmentIndexes[s.Ref] = index + } +} + +func (wtm *writeToMock) SeriesReset(index int) { + // Check for series that are in segments older than the checkpoint + // that were not also present in the checkpoint. + wtm.seriesLock.Lock() + defer wtm.seriesLock.Unlock() + for k, v := range wtm.seriesSegmentIndexes { + if v < index { + delete(wtm.seriesSegmentIndexes, k) + } + } +} + +func (wtm *writeToMock) checkNumLabels() int { + wtm.seriesLock.Lock() + defer wtm.seriesLock.Unlock() + return len(wtm.seriesSegmentIndexes) +} + +func newWriteToMock() *writeToMock { + return &writeToMock{ + seriesSegmentIndexes: make(map[uint64]int), + } +} + +func TestTailSamples(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + now := time.Now() + + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + testutil.Ok(t, err) + + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]record.RefSeries{ + record.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + record.RefSample{ + Ref: uint64(inner), + T: int64(now.UnixNano()) + 1, + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + // Start read after checkpoint, no more data written. + first, last, err := w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWatcher(prometheus.DefaultRegisterer, wMetrics, nil, "", wt, dir) + watcher.startTime = now.UnixNano() + + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() + for i := first; i <= last; i++ { + segment, err := OpenReadSegment(SegmentName(watcher.walDir, i)) + testutil.Ok(t, err) + defer segment.Close() + + reader := NewLiveReader(nil, watcher.readerMetrics, segment) + // Use tail true so we can ensure we got the right number of samples. + watcher.readSegment(reader, i, true) + } + + expectedSeries := seriesCount + expectedSamples := seriesCount * samplesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expectedSeries + }) + testutil.Equals(t, expectedSeries, wt.checkNumLabels()) + testutil.Equals(t, expectedSamples, wt.samplesAppended) + }) + } +} + +func TestReadToEndNoCheckpoint(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readToEnd_noCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + testutil.Ok(t, err) + + var recs [][]byte + + enc := record.Encoder{} + + for i := 0; i < seriesCount; i++ { + series := enc.Series([]record.RefSeries{ + record.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + recs = append(recs, series) + for j := 0; j < samplesCount; j++ { + sample := enc.Samples([]record.RefSample{ + record.RefSample{ + Ref: uint64(j), + T: int64(i), + V: float64(i), + }, + }, nil) + + recs = append(recs, sample) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + } + testutil.Ok(t, w.Log(recs...)) + + _, _, err = w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWatcher(prometheus.DefaultRegisterer, wMetrics, nil, "", wt, dir) + go watcher.Start() + + expected := seriesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expected + }) + watcher.Stop() + testutil.Equals(t, expected, wt.checkNumLabels()) + }) + } +} + +func TestReadToEndWithCheckpoint(t *testing.T) { + segmentSize := 32 * 1024 + // We need something similar to this # of series and samples + // in order to get enough segments for us to checkpoint. + const seriesCount = 10 + const samplesCount = 250 + + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readToEnd_withCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, segmentSize, compress) + testutil.Ok(t, err) + + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]record.RefSeries{ + record.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + record.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + Checkpoint(w, 0, 1, func(x uint64) bool { return true }, 0) + w.Truncate(1) + + // Write more records after checkpointing. + for i := 0; i < seriesCount; i++ { + series := enc.Series([]record.RefSeries{ + record.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + sample := enc.Samples([]record.RefSample{ + record.RefSample{ + Ref: uint64(j), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + _, _, err = w.Segments() + testutil.Ok(t, err) + wt := newWriteToMock() + watcher := NewWatcher(prometheus.DefaultRegisterer, wMetrics, nil, "", wt, dir) + go watcher.Start() + + expected := seriesCount * 2 + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expected + }) + watcher.Stop() + testutil.Equals(t, expected, wt.checkNumLabels()) + }) + } +} + +func TestReadCheckpoint(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + os.Create(SegmentName(wdir, 30)) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, 128*pageSize, compress) + testutil.Ok(t, err) + + // Write to the initial segment then checkpoint. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]record.RefSeries{ + record.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + record.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) + w.Truncate(32) + + // Start read after checkpoint, no more data written. + _, _, err = w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWatcher(prometheus.DefaultRegisterer, wMetrics, nil, "", wt, dir) + // watcher. + go watcher.Start() + + expectedSeries := seriesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expectedSeries + }) + watcher.Stop() + testutil.Equals(t, expectedSeries, wt.checkNumLabels()) + }) + } +} + +func TestReadCheckpointMultipleSegments(t *testing.T) { + pageSize := 32 * 1024 + + const segments = 1 + const seriesCount = 20 + const samplesCount = 300 + + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, pageSize, compress) + testutil.Ok(t, err) + + // Write a bunch of data. + for i := 0; i < segments; i++ { + for j := 0; j < seriesCount; j++ { + ref := j + (i * 100) + series := enc.Series([]record.RefSeries{ + record.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", j)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for k := 0; k < samplesCount; k++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + record.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + } + + Checkpoint(w, 0, 4, func(id uint64) bool { + return true + }, 0) + + wt := newWriteToMock() + watcher := NewWatcher(prometheus.DefaultRegisterer, wMetrics, nil, "", wt, dir) + watcher.maxSegment = -1 + + // Set the Watcher's metrics so they're not nil pointers. + watcher.setMetrics() + + lastCheckpoint, _, err := LastCheckpoint(watcher.walDir) + testutil.Ok(t, err) + + err = watcher.readCheckpoint(lastCheckpoint) + testutil.Ok(t, err) + }) + } +} + +func TestCheckpointSeriesReset(t *testing.T) { + segmentSize := 32 * 1024 + // We need something similar to this # of series and samples + // in order to get enough segments for us to checkpoint. + const seriesCount = 20 + const samplesCount = 350 + testCases := []struct { + compress bool + segments int + }{ + {compress: false, segments: 14}, + {compress: true, segments: 13}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("compress=%t", tc.compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "seriesReset") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + enc := record.Encoder{} + w, err := NewSize(nil, nil, wdir, segmentSize, tc.compress) + testutil.Ok(t, err) + + // Write to the initial segment, then checkpoint later. + for i := 0; i < seriesCount; i++ { + ref := i + 100 + series := enc.Series([]record.RefSeries{ + record.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]record.RefSample{ + record.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + _, _, err = w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + watcher := NewWatcher(prometheus.DefaultRegisterer, wMetrics, nil, "", wt, dir) + watcher.maxSegment = -1 + go watcher.Start() + + expected := seriesCount + retry(t, defaultRetryInterval, defaultRetries, func() bool { + return wt.checkNumLabels() >= expected + }) + testutil.Equals(t, seriesCount, wt.checkNumLabels()) + + _, err = Checkpoint(w, 2, 4, func(x uint64) bool { return true }, 0) + testutil.Ok(t, err) + + err = w.Truncate(5) + testutil.Ok(t, err) + + _, cpi, err := LastCheckpoint(path.Join(dir, "wal")) + testutil.Ok(t, err) + err = watcher.garbageCollectSeries(cpi + 1) + testutil.Ok(t, err) + + watcher.Stop() + // If you modify the checkpoint and truncate segment #'s run the test to see how + // many series records you end up with and change the last Equals check accordingly + // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) + testutil.Equals(t, tc.segments, wt.checkNumLabels()) + }) + } +} diff --git a/wal_test.go b/wal_test.go index 0fed5b41..1a18e2d2 100644 --- a/wal_test.go +++ b/wal_test.go @@ -29,7 +29,9 @@ import ( "github.com/go-kit/kit/log" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/record" "github.com/prometheus/tsdb/testutil" + "github.com/prometheus/tsdb/tombstones" "github.com/prometheus/tsdb/wal" ) @@ -95,10 +97,10 @@ func TestSegmentWAL_Truncate(t *testing.T) { w.segmentSize = 10000 for i := 0; i < numMetrics; i += batch { - var rs []RefSeries + var rs []record.RefSeries for j, s := range series[i : i+batch] { - rs = append(rs, RefSeries{Labels: s, Ref: uint64(i+j) + 1}) + rs = append(rs, record.RefSeries{Labels: s, Ref: uint64(i+j) + 1}) } err := w.LogSeries(rs) testutil.Ok(t, err) @@ -125,11 +127,11 @@ func TestSegmentWAL_Truncate(t *testing.T) { err = w.Truncate(1000, keepf) testutil.Ok(t, err) - var expected []RefSeries + var expected []record.RefSeries for i := 1; i <= numMetrics; i++ { if i%2 == 1 || uint64(i) >= boundarySeries { - expected = append(expected, RefSeries{Ref: uint64(i), Labels: series[i-1]}) + expected = append(expected, record.RefSeries{Ref: uint64(i), Labels: series[i-1]}) } } @@ -143,10 +145,10 @@ func TestSegmentWAL_Truncate(t *testing.T) { w, err = OpenSegmentWAL(dir, nil, 0, nil) testutil.Ok(t, err) - var readSeries []RefSeries + var readSeries []record.RefSeries r := w.Reader() - testutil.Ok(t, r.Read(func(s []RefSeries) { + testutil.Ok(t, r.Read(func(s []record.RefSeries) { readSeries = append(readSeries, s...) }, nil, nil)) @@ -172,9 +174,9 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { }() var ( - recordedSeries [][]RefSeries - recordedSamples [][]RefSample - recordedDeletes [][]Stone + recordedSeries [][]record.RefSeries + recordedSamples [][]record.RefSample + recordedDeletes [][]tombstones.Stone ) var totalSamples int @@ -190,29 +192,29 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { r := w.Reader() var ( - resultSeries [][]RefSeries - resultSamples [][]RefSample - resultDeletes [][]Stone + resultSeries [][]record.RefSeries + resultSamples [][]record.RefSample + resultDeletes [][]tombstones.Stone ) - serf := func(series []RefSeries) { + serf := func(series []record.RefSeries) { if len(series) > 0 { - clsets := make([]RefSeries, len(series)) + clsets := make([]record.RefSeries, len(series)) copy(clsets, series) resultSeries = append(resultSeries, clsets) } } - smplf := func(smpls []RefSample) { + smplf := func(smpls []record.RefSample) { if len(smpls) > 0 { - csmpls := make([]RefSample, len(smpls)) + csmpls := make([]record.RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } } - delf := func(stones []Stone) { + delf := func(stones []tombstones.Stone) { if len(stones) > 0 { - cst := make([]Stone, len(stones)) + cst := make([]tombstones.Stone, len(stones)) copy(cst, stones) resultDeletes = append(resultDeletes, cst) } @@ -228,11 +230,11 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { // Insert in batches and generate different amounts of samples for each. for i := 0; i < len(series); i += stepSize { - var samples []RefSample - var stones []Stone + var samples []record.RefSample + var stones []tombstones.Stone for j := 0; j < i*10; j++ { - samples = append(samples, RefSample{ + samples = append(samples, record.RefSample{ Ref: uint64(j % 10000), T: int64(j * 2), V: rand.Float64(), @@ -241,13 +243,13 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { for j := 0; j < i*20; j++ { ts := rand.Int63() - stones = append(stones, Stone{rand.Uint64(), Intervals{{ts, ts + rand.Int63n(10000)}}}) + stones = append(stones, tombstones.Stone{rand.Uint64(), tombstones.Intervals{{ts, ts + rand.Int63n(10000)}}}) } lbls := series[i : i+stepSize] - series := make([]RefSeries, 0, len(series)) + series := make([]record.RefSeries, 0, len(series)) for j, l := range lbls { - series = append(series, RefSeries{ + series = append(series, record.RefSeries{ Ref: uint64(i + j), Labels: l, }) @@ -382,8 +384,8 @@ func TestWALRestoreCorrupted(t *testing.T) { w, err := OpenSegmentWAL(dir, nil, 0, nil) testutil.Ok(t, err) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 1, V: 2}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 2, V: 3}})) testutil.Ok(t, w.cut()) @@ -392,8 +394,8 @@ func TestWALRestoreCorrupted(t *testing.T) { // Hopefully cut will complete by 2 seconds. time.Sleep(2 * time.Second) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) - testutil.Ok(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 3, V: 4}})) + testutil.Ok(t, w.LogSamples([]record.RefSample{{T: 5, V: 6}})) testutil.Ok(t, w.Close()) @@ -414,24 +416,24 @@ func TestWALRestoreCorrupted(t *testing.T) { r := w2.Reader() - serf := func(l []RefSeries) { + serf := func(l []record.RefSeries) { testutil.Equals(t, 0, len(l)) } // Weird hack to check order of reads. i := 0 - samplf := func(s []RefSample) { + samplf := func(s []record.RefSample) { if i == 0 { - testutil.Equals(t, []RefSample{{T: 1, V: 2}}, s) + testutil.Equals(t, []record.RefSample{{T: 1, V: 2}}, s) i++ } else { - testutil.Equals(t, []RefSample{{T: 99, V: 100}}, s) + testutil.Equals(t, []record.RefSample{{T: 99, V: 100}}, s) } } testutil.Ok(t, r.Read(serf, samplf, nil)) - testutil.Ok(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) + testutil.Ok(t, w2.LogSamples([]record.RefSample{{T: 99, V: 100}})) testutil.Ok(t, w2.Close()) // We should see the first valid entry and the new one, everything after @@ -482,23 +484,23 @@ func TestMigrateWAL_Fuzz(t *testing.T) { testutil.Ok(t, err) // Write some data. - testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + testutil.Ok(t, oldWAL.LogSeries([]record.RefSeries{ {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, })) - testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + testutil.Ok(t, oldWAL.LogSamples([]record.RefSample{ {Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}, })) - testutil.Ok(t, oldWAL.LogSeries([]RefSeries{ + testutil.Ok(t, oldWAL.LogSeries([]record.RefSeries{ {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, })) - testutil.Ok(t, oldWAL.LogSamples([]RefSample{ + testutil.Ok(t, oldWAL.LogSamples([]record.RefSample{ {Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}, })) - testutil.Ok(t, oldWAL.LogDeletes([]Stone{ - {ref: 1, intervals: []Interval{{100, 200}}}, + testutil.Ok(t, oldWAL.LogDeletes([]tombstones.Stone{ + {Ref: 1, Intervals: []tombstones.Interval{{100, 200}}}, })) testutil.Ok(t, oldWAL.Close()) @@ -510,8 +512,8 @@ func TestMigrateWAL_Fuzz(t *testing.T) { testutil.Ok(t, err) // We can properly write some new data after migration. - var enc RecordEncoder - testutil.Ok(t, w.Log(enc.Samples([]RefSample{ + var enc record.Encoder + testutil.Ok(t, w.Log(enc.Samples([]record.RefSample{ {Ref: 500, T: 1, V: 1}, }, nil))) @@ -523,21 +525,21 @@ func TestMigrateWAL_Fuzz(t *testing.T) { r := wal.NewReader(sr) var res []interface{} - var dec RecordDecoder + var dec record.Decoder for r.Next() { rec := r.Record() switch dec.Type(rec) { - case RecordSeries: + case record.Series: s, err := dec.Series(rec, nil) testutil.Ok(t, err) res = append(res, s) - case RecordSamples: + case record.Samples: s, err := dec.Samples(rec, nil) testutil.Ok(t, err) res = append(res, s) - case RecordTombstones: + case record.Tombstones: s, err := dec.Tombstones(rec, nil) testutil.Ok(t, err) res = append(res, s) @@ -548,17 +550,17 @@ func TestMigrateWAL_Fuzz(t *testing.T) { testutil.Ok(t, r.Err()) testutil.Equals(t, []interface{}{ - []RefSeries{ + []record.RefSeries{ {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, }, - []RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, - []RefSeries{ + []record.RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, + []record.RefSeries{ {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, }, - []RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, - []Stone{{ref: 1, intervals: []Interval{{100, 200}}}}, - []RefSample{{Ref: 500, T: 1, V: 1}}, + []record.RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, + []tombstones.Stone{{Ref: 1, Intervals: []tombstones.Interval{{100, 200}}}}, + []record.RefSample{{Ref: 500, T: 1, V: 1}}, }, res) // Migrating an already migrated WAL shouldn't do anything.