diff --git a/block.go b/block.go index 7dc3af9d..9375fb13 100644 --- a/block.go +++ b/block.go @@ -130,12 +130,6 @@ type BlockReader interface { Tombstones() (TombstoneReader, error) } -// Appendable defines an entity to which data can be appended. -type Appendable interface { - // Appender returns a new Appender against an underlying store. - Appender() Appender -} - // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/db.go b/db.go index 489f3a12..d5ddcf9d 100644 --- a/db.go +++ b/db.go @@ -355,6 +355,7 @@ func (a dbAppender) Commit() error { default: } } + return err } @@ -683,7 +684,9 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { sq := &querier{ blocks: make([]Querier, 0, len(blocks)), + db: db, } + for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) if err == nil { diff --git a/db_test.go b/db_test.go index 3fbbef74..5f3b3316 100644 --- a/db_test.go +++ b/db_test.go @@ -19,11 +19,13 @@ import ( "math/rand" "os" "sort" + "strconv" "testing" "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/stretchr/testify/require" ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { @@ -55,6 +57,10 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam } testutil.Ok(t, it.Err()) + if len(samples) == 0 { + continue + } + name := series.Labels().String() result[name] = samples } @@ -892,3 +898,128 @@ func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) { return result, ss.Err() } + +func TestDBCannotSeePartialCommits(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + defer db.Close() + + stop := make(chan struct{}) + firstInsert := make(chan struct{}) + + // Insert data in batches. + go func() { + iter := 0 + for { + app := db.Appender() + + for j := 0; j < 100; j++ { + _, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) + require.NoError(t, err) + } + err = app.Commit() + require.NoError(t, err) + + if iter == 0 { + close(firstInsert) + } + iter++ + + select { + case <-stop: + return + default: + } + } + }() + + <-firstInsert + + // This is a race condition, so do a few tests to tickle it. + // Usually most will fail. + inconsistencies := 0 + for i := 0; i < 10; i++ { + func() { + querier, err := db.Querier(0, 1000000) + testutil.Ok(t, err) + defer querier.Close() + + ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + seriesSet := readSeriesSet(t, ss) + + require.NoError(t, err) + values := map[float64]struct{}{} + for _, series := range seriesSet { + values[series[len(series)-1].v] = struct{}{} + } + if len(values) != 1 { + inconsistencies++ + } + }() + } + stop <- struct{}{} + + require.Equal(t, 0, inconsistencies, "Some queries saw inconsistent results.") +} + +func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + defer db.Close() + + querier, err := db.Querier(0, 1000000) + testutil.Ok(t, err) + defer querier.Close() + + app := db.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + require.NoError(t, err) + // This commit is after the querier is created, so should not be returned. + err = app.Commit() + require.NoError(t, err) + + ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + seriesSet := readSeriesSet(t, ss) + require.Equal(t, map[string][]sample{}, seriesSet) + + querier, err = db.Querier(0, 1000000) + testutil.Ok(t, err) + defer querier.Close() + + ss, err = querier.Select(labels.NewEqualMatcher("foo", "bar")) + seriesSet = readSeriesSet(t, ss) + require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) +} + +func readSeriesSet(t *testing.T, ss SeriesSet) map[string][]sample { + seriesSet := make(map[string][]sample) + for ss.Next() { + series := ss.At() + + samples := []sample{} + it := series.Iterator() + for it.Next() { + t, v := it.At() + samples = append(samples, sample{t: t, v: v}) + } + if len(samples) == 0 { + continue + } + + name := series.Labels().String() + seriesSet[name] = samples + } + testutil.Ok(t, ss.Err()) + + return seriesSet +} diff --git a/head.go b/head.go index c76c139d..a855414d 100644 --- a/head.go +++ b/head.go @@ -70,6 +70,8 @@ type Head struct { postings *index.MemPostings // postings lists for terms tombstones memTombstones + + iso *isolation } type headMetrics struct { @@ -86,6 +88,9 @@ type headMetrics struct { maxTime prometheus.GaugeFunc samplesAppended prometheus.Counter walTruncateDuration prometheus.Summary + + lowWatermark prometheus.GaugeFunc + highWatermark prometheus.GaugeFunc } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -147,6 +152,20 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", }) + m.lowWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_isolation_low_watermark", + Help: "The lowest write id that is still referenced.", + }, func() float64 { + return float64(h.iso.lowWatermark()) + }) + m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_isolation_high_watermark", + Help: "The highest write id that has been given out.", + }, func() float64 { + h.iso.writeMtx.Lock() + defer h.iso.writeMtx.Unlock() + return float64(h.iso.lastWriteID) + }) if r != nil { r.MustRegister( @@ -163,6 +182,8 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.gcDuration, m.walTruncateDuration, m.samplesAppended, + m.lowWatermark, + m.highWatermark, ) } return m @@ -179,6 +200,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } + h := &Head{ wal: wal, logger: l, @@ -190,6 +212,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), tombstones: memTombstones{}, + + iso: newIsolation(), } h.metrics = newHeadMetrics(h, r) @@ -216,7 +240,7 @@ func (h *Head) processWALSamples( unknownRefs++ continue } - _, chunkCreated := ms.append(s.T, s.V) + _, chunkCreated := ms.append(s.T, s.V, 0) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -390,7 +414,7 @@ func (h *rangeHead) Index() (IndexReader, error) { } func (h *rangeHead) Chunks() (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt), nil + return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil } func (h *rangeHead) Tombstones() (TombstoneReader, error) { @@ -402,6 +426,9 @@ func (h *rangeHead) Tombstones() (TombstoneReader, error) { type initAppender struct { app Appender head *Head + + writeID uint64 + cleanupWriteIDsBelow uint64 } func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -409,7 +436,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender() + a.app = a.head.appender(a.writeID, a.cleanupWriteIDsBelow) return a.app.Add(lset, t, v) } @@ -439,20 +466,25 @@ func (a *initAppender) Rollback() error { func (h *Head) Appender() Appender { h.metrics.activeAppenders.Inc() + writeID := h.iso.newWriteID() + cleanupWriteIDsBelow := h.iso.lowWatermark() + // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. if h.MinTime() == math.MinInt64 { - return &initAppender{head: h} + return &initAppender{head: h, writeID: writeID, cleanupWriteIDsBelow: cleanupWriteIDsBelow} } - return h.appender() + return h.appender(writeID, cleanupWriteIDsBelow) } -func (h *Head) appender() *headAppender { +func (h *Head) appender(writeID, cleanupWriteIDsBelow uint64) *headAppender { return &headAppender{ - head: h, - mint: h.MaxTime() - h.chunkRange/2, - samples: h.getAppendBuffer(), - highTimestamp: math.MinInt64, + head: h, + mint: h.MaxTime() - h.chunkRange/2, + samples: h.getAppendBuffer(), + highTimestamp: math.MinInt64, + writeID: writeID, + cleanupWriteIDsBelow: cleanupWriteIDsBelow, } } @@ -475,6 +507,9 @@ type headAppender struct { series []RefSeries samples []RefSample highTimestamp int64 + + writeID uint64 + cleanupWriteIDsBelow uint64 } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -535,7 +570,8 @@ func (a *headAppender) Commit() error { for _, s := range a.samples { s.series.Lock() - ok, chunkCreated := s.series.append(s.T, s.V) + ok, chunkCreated := s.series.append(s.T, s.V, a.writeID) + s.series.cleanupWriteIDsBelow(a.cleanupWriteIDsBelow) s.series.Unlock() if !ok { @@ -559,6 +595,8 @@ func (a *headAppender) Commit() error { } } + a.head.iso.closeWrite(a.writeID) + return nil } @@ -666,14 +704,14 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { // Chunks returns a ChunkReader against the block. func (h *Head) Chunks() (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64), nil + return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil } -func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { +func (h *Head) chunksRange(mint, maxt int64, isoState *IsolationState) *headChunkReader { if hmin := h.MinTime(); hmin > mint { mint = hmin } - return &headChunkReader{head: h, mint: mint, maxt: maxt} + return &headChunkReader{head: h, mint: mint, maxt: maxt, isoState: isoState} } // MinTime returns the lowest time bound on visible data in the head. @@ -694,9 +732,12 @@ func (h *Head) Close() error { type headChunkReader struct { head *Head mint, maxt int64 + + isoState *IsolationState } func (h *headChunkReader) Close() error { + h.isoState.Close() return nil } @@ -746,6 +787,8 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { Chunk: c.chunk, s: s, cid: int(cid), + + isoState: h.isoState, }, nil } @@ -753,11 +796,13 @@ type safeChunk struct { chunkenc.Chunk s *memSeries cid int + + isoState *IsolationState } func (c *safeChunk) Iterator() chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid) + it := c.s.iterator(c.cid, c.isoState) c.s.Unlock() return it } @@ -1006,6 +1051,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { for _, series := range all { series.Lock() rmChunks += series.truncateChunksBefore(mint) + series.cleanupExtraWriteIds() if len(series.chunks) > 0 { series.Unlock() @@ -1103,6 +1149,8 @@ type memSeries struct { sampleBuf [4]sample app chunkenc.Appender // Current appender for the chunk. + + txs *txRing } func (s *memSeries) minTime() int64 { @@ -1139,6 +1187,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { ref: id, chunkRange: chunkRange, nextAt: math.MinInt64, + txs: newTxRing(4), } return s } @@ -1193,7 +1242,7 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { } // append adds the sample (t, v) to the series. -func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, writeID uint64) (success, chunkCreated bool) { const samplesPerChunk = 120 c := s.head() @@ -1229,9 +1278,26 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} + s.txs.add(writeID) + return true, chunkCreated } +// cleanupWriteIDsBelow cleans up older writeIds. Has to be called after acquiring +// lock. +func (s *memSeries) cleanupWriteIDsBelow(bound uint64) { + s.txs.cleanupWriteIDsBelow(bound) +} + +func (s *memSeries) cleanupExtraWriteIds() { + totalSamples := 0 + for _, c := range s.chunks { + totalSamples += c.chunk.NumSamples() + } + + s.txs.cutoffN(totalSamples) +} + // computeChunkEndTime estimates the end timestamp based the beginning of a chunk, // its current timestamp and the upper bound up to which we insert data. // It assumes that the time range is 1/4 full. @@ -1243,8 +1309,9 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int) chunkenc.Iterator { +func (s *memSeries) iterator(id int, isoState *IsolationState) chunkenc.Iterator { c := s.chunk(id) + // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, // which got then garbage collected before it got accessed. // We must ensure to not garbage collect as long as any readers still hold a reference. @@ -1252,16 +1319,64 @@ func (s *memSeries) iterator(id int) chunkenc.Iterator { return chunkenc.NewNopIterator() } + ix := id - s.firstChunkID + + numSamples := c.chunk.NumSamples() + stopAfter := numSamples + + if isoState != nil { + totalSamples := 0 // totalSamples in this series. + previousSamples := 0 // Samples before this chunk. + + for j, d := range s.chunks { + totalSamples += d.chunk.NumSamples() + if j < ix { + previousSamples += d.chunk.NumSamples() + } + } + + // Removing the extra transactionIDs that are relevant for samples that + // come after this chunk, from the total transactionIDs. + writeIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + c.chunk.NumSamples())) + + // Iterate over the writeIDs, find the first one that the isolation state says not + // to return. + it := s.txs.iterator() + for index := 0; index < writeIDsToConsider; index++ { + writeID := it.At() + if _, ok := isoState.incompleteWrites[writeID]; ok || writeID > isoState.maxWriteID { + // If found limit the number of samples being iterated over. + stopAfter = c.chunk.NumSamples() - (writeIDsToConsider - index) + + if stopAfter < 0 { + stopAfter = 0 // Stopped in a previous chunk. + } + break + } + + it.Next() + } + } + if id-s.firstChunkID < len(s.chunks)-1 { - return c.chunk.Iterator() + it := &memSafeIterator{ + Iterator: c.chunk.Iterator(), + i: -1, + total: numSamples, + stopAfter: stopAfter, + } + return it } + // Serve the last 4 samples for the last chunk from the sample buffer // as their compressed bytes may be mutated by added samples. it := &memSafeIterator{ - Iterator: c.chunk.Iterator(), - i: -1, - total: c.chunk.NumSamples(), - buf: s.sampleBuf, + Iterator: c.chunk.Iterator(), + i: -1, + total: numSamples, + stopAfter: stopAfter, + bufferedSamples: 4, + buf: s.sampleBuf, } return it } @@ -1281,27 +1396,29 @@ type memChunk struct { type memSafeIterator struct { chunkenc.Iterator - i int - total int - buf [4]sample + i int + total int + stopAfter int + bufferedSamples int + buf [4]sample } func (it *memSafeIterator) Next() bool { - if it.i+1 >= it.total { + if it.i+1 >= it.stopAfter { return false } it.i++ - if it.total-it.i > 4 { + if it.total-it.i > it.bufferedSamples { return it.Iterator.Next() } return true } func (it *memSafeIterator) At() (int64, float64) { - if it.total-it.i > 4 { + if it.total-it.i > it.bufferedSamples { return it.Iterator.At() } - s := it.buf[4-(it.total-it.i)] + s := it.buf[it.bufferedSamples-(it.total-it.i)] return s.t, s.v } diff --git a/head_test.go b/head_test.go index 36a97542..1b306462 100644 --- a/head_test.go +++ b/head_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "math" "math/rand" "testing" @@ -22,6 +23,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/stretchr/testify/require" ) func BenchmarkCreateSeries(b *testing.B) { @@ -113,10 +115,10 @@ func TestHead_ReadWAL(t *testing.T) { return x } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil))) testutil.Equals(t, 0, len(s11.chunks)) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) - testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil))) + testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0, nil))) } func TestHead_Truncate(t *testing.T) { @@ -132,18 +134,18 @@ func TestHead_Truncate(t *testing.T) { s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) s1.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, - {minTime: 2000, maxTime: 2999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, } s2.chunks = []*memChunk{ - {minTime: 1000, maxTime: 1999}, - {minTime: 2000, maxTime: 2999}, - {minTime: 3000, maxTime: 3999}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, } s3.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, } s4.chunks = []*memChunk{} @@ -153,12 +155,12 @@ func TestHead_Truncate(t *testing.T) { testutil.Ok(t, h.Truncate(2000)) testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, }, h.series.getByID(s1.ref).chunks) testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999}, - {minTime: 3000, maxTime: 3999}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, }, h.series.getByID(s2.ref).chunks) testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") @@ -199,7 +201,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i)) + ok, _ := s.append(int64(i), float64(i), 0) testutil.Assert(t, ok == true, "sample append failed") } @@ -221,13 +223,14 @@ func TestMemSeries_truncateChunks(t *testing.T) { // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.chunkID(len(s.chunks) - 1)) + it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil) _, ok := it1.(*memSafeIterator) testutil.Assert(t, ok == true, "") - it2 := s.iterator(s.chunkID(len(s.chunks) - 2)) - _, ok = it2.(*memSafeIterator) - testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") + it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil) + it2MSI, ok := it2.(*memSafeIterator) + testutil.Assert(t, ok == true, "") + testutil.Assert(t, it2MSI.bufferedSamples == 0, "non-last chunk incorrectly wrapped with sample buffer") } func TestHeadDeleteSimple(t *testing.T) { @@ -238,7 +241,6 @@ func TestHeadDeleteSimple(t *testing.T) { defer head.Close() app := head.Appender() - smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() @@ -629,19 +631,19 @@ func TestMemSeries_append(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1) + ok, chunkCreated := s.append(998, 1, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2) + ok, chunkCreated = s.append(999, 2, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3) + ok, chunkCreated = s.append(1000, 3, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, ok, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4) + ok, chunkCreated = s.append(1001, 4, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") @@ -651,7 +653,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i)) + ok, _ := s.append(1001+int64(i), float64(i), 0) testutil.Assert(t, ok, "append failed") } @@ -673,8 +675,8 @@ func TestGCChunkAccess(t *testing.T) { s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, } idx := h.indexRange(0, 1500) @@ -689,7 +691,7 @@ func TestGCChunkAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 1500) + cr := h.chunksRange(0, 1500, nil) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) @@ -713,8 +715,8 @@ func TestGCSeriesAccess(t *testing.T) { s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, } idx := h.indexRange(0, 2000) @@ -729,7 +731,7 @@ func TestGCSeriesAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 2000) + cr := h.chunksRange(0, 2000, nil) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) @@ -744,3 +746,126 @@ func TestGCSeriesAccess(t *testing.T) { _, err = cr.Chunk(chunks[1].Ref) testutil.Equals(t, ErrNotFound, err) } + +func TestMemSeriesIsolation(t *testing.T) { + // Put a series, select it. GC it and then access it. + hb, err := NewHead(nil, nil, NopWAL(), 1000) + testutil.Ok(t, err) + defer hb.Close() + + lastValue := func(maxWriteId uint64) int { + idx, err := hb.Index() + testutil.Ok(t, err) + + iso := hb.iso.State() + iso.maxWriteID = maxWriteId + + querier := &blockQuerier{ + mint: 0, + maxt: 10000, + index: idx, + chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso), + tombstones: emptyTombstoneReader, + } + + testutil.Ok(t, err) + defer querier.Close() + + ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + seriesSet := readSeriesSet(t, ss) + for _, series := range seriesSet { + return int(series[len(series)-1].v) + } + return -1 + } + + i := 0 + for ; i <= 1000; i++ { + var app Appender + // To initialise bounds. + if hb.MinTime() == math.MinInt64 { + app = &initAppender{head: hb, writeID: uint64(i), cleanupWriteIDsBelow: 0} + } else { + app = hb.appender(uint64(i), 0) + } + + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + } + + // Test simple cases in different chunks when no writeId cleanup has been performed. + require.Equal(t, 10, lastValue(10)) + require.Equal(t, 130, lastValue(130)) + require.Equal(t, 160, lastValue(160)) + require.Equal(t, 240, lastValue(240)) + require.Equal(t, 500, lastValue(500)) + require.Equal(t, 750, lastValue(750)) + require.Equal(t, 995, lastValue(995)) + require.Equal(t, 999, lastValue(999)) + + // Cleanup writeIds below 500. + app := hb.appender(uint64(i), 500) + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + i++ + + // We should not get queries with a maxWriteId below 500 after the cleanup, + // but they only take the remaining writeIds into account. + require.Equal(t, 499, lastValue(10)) + require.Equal(t, 499, lastValue(130)) + require.Equal(t, 499, lastValue(160)) + require.Equal(t, 499, lastValue(240)) + require.Equal(t, 500, lastValue(500)) + require.Equal(t, 995, lastValue(995)) + require.Equal(t, 999, lastValue(999)) + + // Cleanup writeIds below 1000, which means the sample buffer is + // the only thing with writeIds. + app = hb.appender(uint64(i), 1000) + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + i++ + require.Equal(t, 999, lastValue(998)) + require.Equal(t, 999, lastValue(999)) + require.Equal(t, 1000, lastValue(1000)) + require.Equal(t, 1001, lastValue(1001)) + require.Equal(t, 1002, lastValue(1002)) + require.Equal(t, 1002, lastValue(1003)) +} + +func TestHead_Truncate_WriteIDs(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) + + chk := chunkenc.NewXORChunk() + app, err := chk.Appender() + testutil.Ok(t, err) + + app.Append(1, 0) + app.Append(2, 0) + app.Append(3, 0) + + s1.chunks = []*memChunk{ + {minTime: 0, maxTime: 999, chunk: chk}, + {minTime: 1000, maxTime: 1999, chunk: chk}, + } + + s1.txs.txIDs = []uint64{2, 3, 4, 5, 0, 0, 0, 1} + s1.txs.txIDFirst = 7 + s1.txs.txIDCount = 5 + + testutil.Ok(t, h.Truncate(1000)) + testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.txs.txIDs) + testutil.Equals(t, 0, s1.txs.txIDFirst) + testutil.Equals(t, 3, s1.txs.txIDCount) +} diff --git a/isolation.go b/isolation.go new file mode 100644 index 00000000..7c12fdcd --- /dev/null +++ b/isolation.go @@ -0,0 +1,208 @@ +package tsdb + +import "sync" + +// IsolationState holds the isolation information. +type IsolationState struct { + // We will ignore all writes above the max, or that are incomplete. + maxWriteID uint64 + incompleteWrites map[uint64]struct{} + lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId. + isolation *isolation + + // Doubly linked list of active reads. + next *IsolationState + prev *IsolationState +} + +// Close closes the state. +func (i *IsolationState) Close() { + i.isolation.readMtx.Lock() + i.next.prev = i.prev + i.prev.next = i.next + i.isolation.readMtx.Unlock() +} + +// isolation is the global isolation state. +type isolation struct { + // Mutex for accessing writeLastId and writesOpen. + writeMtx sync.Mutex + // Each write is given an internal id. + lastWriteID uint64 + // Which writes are currently in progress. + writesOpen map[uint64]struct{} + // Mutex for accessing readsOpen. + // If taking both writeMtx and readMtx, take writeMtx first. + readMtx sync.Mutex + // All current in use isolationStates. This is a doubly-linked list. + readsOpen *IsolationState +} + +func newIsolation() *isolation { + isoState := &IsolationState{} + isoState.next = isoState + isoState.prev = isoState + + return &isolation{ + writesOpen: map[uint64]struct{}{}, + readsOpen: isoState, + } +} + +// lowWatermark returns the writeId below which +// we no longer need to track which writes were from +// which writeId. +func (i *isolation) lowWatermark() uint64 { + i.writeMtx.Lock() // Take writeMtx first. + defer i.writeMtx.Unlock() + i.readMtx.Lock() + defer i.readMtx.Unlock() + if i.readsOpen.prev == i.readsOpen { + return i.lastWriteID + } + return i.readsOpen.prev.lowWaterMark +} + +// State returns an object used to control isolation +// between a query and writes. Must be closed when complete. +func (i *isolation) State() *IsolationState { + i.writeMtx.Lock() // Take write mutex before read mutex. + defer i.writeMtx.Unlock() + isoState := &IsolationState{ + maxWriteID: i.lastWriteID, + lowWaterMark: i.lastWriteID, + incompleteWrites: make(map[uint64]struct{}, len(i.writesOpen)), + isolation: i, + } + for k := range i.writesOpen { + isoState.incompleteWrites[k] = struct{}{} + if k < isoState.lowWaterMark { + isoState.lowWaterMark = k + } + } + + i.readMtx.Lock() + defer i.readMtx.Unlock() + isoState.prev = i.readsOpen + isoState.next = i.readsOpen.next + i.readsOpen.next.prev = isoState + i.readsOpen.next = isoState + return isoState +} + +// newWriteID increments the transaction counter and returns a new transaction ID. +func (i *isolation) newWriteID() uint64 { + i.writeMtx.Lock() + i.lastWriteID++ + writeID := i.lastWriteID + i.writesOpen[writeID] = struct{}{} + i.writeMtx.Unlock() + + return writeID +} + +func (i *isolation) closeWrite(writeID uint64) { + i.writeMtx.Lock() + delete(i.writesOpen, writeID) + i.writeMtx.Unlock() +} + +// The transactionID ring buffer. +type txRing struct { + txIDs []uint64 + txIDFirst int // Position of the first id in the ring. + txIDCount int // How many ids in the ring. +} + +func newTxRing(cap int) *txRing { + return &txRing{ + txIDs: make([]uint64, cap), + } +} + +func (txr *txRing) add(writeID uint64) { + if txr.txIDCount == len(txr.txIDs) { + // Ring buffer is full, expand by doubling. + newRing := make([]uint64, txr.txIDCount*2) + idx := copy(newRing[:], txr.txIDs[txr.txIDFirst%len(txr.txIDs):]) + copy(newRing[idx:], txr.txIDs[:txr.txIDFirst%len(txr.txIDs)]) + txr.txIDs = newRing + txr.txIDFirst = 0 + } + + txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = writeID + txr.txIDCount++ +} + +func (txr *txRing) cleanupWriteIDsBelow(bound uint64) { + pos := txr.txIDFirst + + for txr.txIDCount > 0 { + if txr.txIDs[pos] < bound { + txr.txIDFirst++ + txr.txIDCount-- + } else { + break + } + + pos++ + if pos == len(txr.txIDs) { + pos = 0 + } + } + + txr.txIDFirst = txr.txIDFirst % len(txr.txIDs) +} + +// cutoffN will only keep the latest N transactions in the ring. +// Will also downsize the ring if possible. +func (txr *txRing) cutoffN(n int) { + if txr.txIDCount <= n { + return + } + + txr.txIDFirst += (txr.txIDCount - n) + txr.txIDCount = n + + newBufSize := len(txr.txIDs) + for n < newBufSize/2 { + newBufSize = newBufSize / 2 + } + + if newBufSize == len(txr.txIDs) { + return + } + + newRing := make([]uint64, newBufSize) + idx := copy(newRing[:], txr.txIDs[txr.txIDFirst%len(txr.txIDs):]) + copy(newRing[idx:], txr.txIDs[:txr.txIDFirst%len(txr.txIDs)]) + + txr.txIDs = newRing + txr.txIDFirst = 0 +} + +func (txr *txRing) iterator() *txRingIterator { + return &txRingIterator{ + pos: txr.txIDFirst % len(txr.txIDs), + ids: txr.txIDs, + } +} + +// txRingIterator lets you iterate over the ring. It doesn't terminate, +// it DOESNT terminate. +type txRingIterator struct { + ids []uint64 + + pos int +} + +func (it *txRingIterator) At() uint64 { + return it.ids[it.pos] +} + +func (it *txRingIterator) Next() { + it.pos++ + if it.pos == len(it.ids) { + it.pos = 0 + } +} diff --git a/querier.go b/querier.go index d4dad930..9acb8486 100644 --- a/querier.go +++ b/querier.go @@ -53,6 +53,7 @@ type Series interface { // querier aggregates querying results from time blocks within // a single partition. type querier struct { + db *DB blocks []Querier } @@ -114,6 +115,7 @@ func (q *querier) Close() error { for _, bq := range q.blocks { merr.Add(bq.Close()) } + return merr.Err() }