From db6bab513b893d1565498e9110e402e04db7384a Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Fri, 16 Jun 2017 16:56:25 +0100 Subject: [PATCH 01/15] Add information we need for isolation on the write side. --- block.go | 5 ++++- db.go | 25 ++++++++++++++++++++++++- head.go | 23 ++++++++++++++++------- head_test.go | 15 +++++++-------- 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/block.go b/block.go index 7dc3af9d..b076fdb6 100644 --- a/block.go +++ b/block.go @@ -133,7 +133,10 @@ type BlockReader interface { // Appendable defines an entity to which data can be appended. type Appendable interface { // Appender returns a new Appender against an underlying store. - Appender() Appender + Appender(writeId uint64) Appender + + // Busy returns whether there are any currently active appenders. + Busy() bool } // BlockMeta provides meta information about a block. diff --git a/db.go b/db.go index 489f3a12..fb7180af 100644 --- a/db.go +++ b/db.go @@ -115,6 +115,14 @@ type DB struct { // cmtx is used to control compactions and deletions. cmtx sync.Mutex compactionsEnabled bool + + // Mutex for accessing writeWatermark and writesOpen. + // block layout. + writeMtx sync.Mutex + // Each write is given an internal id. + writeLastId uint64 + // Which writes are currently in progress. + writesOpen map[uint64]struct{} } type dbMetrics struct { @@ -197,6 +205,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db dir: dir, logger: l, opts: opts, + writesOpen: map[uint64]struct{}{}, compactc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), @@ -334,7 +343,13 @@ func (db *DB) retentionCutoff() (b bool, err error) { // Appender opens a new appender against the database. func (db *DB) Appender() Appender { - return dbAppender{db: db, Appender: db.head.Appender()} + db.writeMtx.Lock() + id := db.writeLastId + db.writeLastId++ + db.writesOpen[id] = struct{}{} + db.writeMtx.Unlock() + + return dbAppender{db: db, Appender: db.head.Appender(id), writeId: id} } // dbAppender wraps the DB's head appender and triggers compactions on commit @@ -342,6 +357,8 @@ func (db *DB) Appender() Appender { type dbAppender struct { Appender db *DB + + writeId uint64 } func (a dbAppender) Commit() error { @@ -355,6 +372,11 @@ func (a dbAppender) Commit() error { default: } } + + a.db.writeMtx.Lock() + delete(a.db.writesOpen, a.writeId) + a.db.writeMtx.Unlock() + return err } @@ -684,6 +706,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { sq := &querier{ blocks: make([]Querier, 0, len(blocks)), } + for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) if err == nil { diff --git a/head.go b/head.go index c76c139d..e467bdd0 100644 --- a/head.go +++ b/head.go @@ -216,7 +216,7 @@ func (h *Head) processWALSamples( unknownRefs++ continue } - _, chunkCreated := ms.append(s.T, s.V) + _, chunkCreated := ms.append(s.T, s.V, 0) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -409,7 +409,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender() + a.app = a.head.appender(0) return a.app.Add(lset, t, v) } @@ -436,7 +436,7 @@ func (a *initAppender) Rollback() error { } // Appender returns a new Appender on the database. -func (h *Head) Appender() Appender { +func (h *Head) Appender(writeId uint64) Appender { h.metrics.activeAppenders.Inc() // The head cache might not have a starting point yet. The init appender @@ -444,15 +444,16 @@ func (h *Head) Appender() Appender { if h.MinTime() == math.MinInt64 { return &initAppender{head: h} } - return h.appender() + return h.appender(writeId) } -func (h *Head) appender() *headAppender { +func (h *Head) appender(writeId uint64) *headAppender { return &headAppender{ head: h, mint: h.MaxTime() - h.chunkRange/2, samples: h.getAppendBuffer(), highTimestamp: math.MinInt64, + writeId: writeId, } } @@ -475,6 +476,8 @@ type headAppender struct { series []RefSeries samples []RefSample highTimestamp int64 + + writeId uint64 } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -535,7 +538,7 @@ func (a *headAppender) Commit() error { for _, s := range a.samples { s.series.Lock() - ok, chunkCreated := s.series.append(s.T, s.V) + ok, chunkCreated := s.series.append(s.T, s.V, a.writeId) s.series.Unlock() if !ok { @@ -1103,6 +1106,9 @@ type memSeries struct { sampleBuf [4]sample app chunkenc.Appender // Current appender for the chunk. + + // Write ids of the tail of the list of samples. + writeIds []uint64 } func (s *memSeries) minTime() int64 { @@ -1139,6 +1145,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { ref: id, chunkRange: chunkRange, nextAt: math.MinInt64, + writeIds: []uint64{}, } return s } @@ -1193,7 +1200,7 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { } // append adds the sample (t, v) to the series. -func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCreated bool) { const samplesPerChunk = 120 c := s.head() @@ -1229,6 +1236,8 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} + s.writeIds = append(s.writeIds, writeId) + return true, chunkCreated } diff --git a/head_test.go b/head_test.go index 36a97542..6f3bbb95 100644 --- a/head_test.go +++ b/head_test.go @@ -199,7 +199,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i)) + ok, _ := s.append(int64(i), float64(i), 0) testutil.Assert(t, ok == true, "sample append failed") } @@ -237,8 +237,7 @@ func TestHeadDeleteSimple(t *testing.T) { testutil.Ok(t, err) defer head.Close() - app := head.Appender() - + app := head.Appender(0) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() @@ -629,19 +628,19 @@ func TestMemSeries_append(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1) + ok, chunkCreated := s.append(998, 1, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2) + ok, chunkCreated = s.append(999, 2, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3) + ok, chunkCreated = s.append(1000, 3, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, ok, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4) + ok, chunkCreated = s.append(1001, 4, 0) testutil.Assert(t, ok, "append failed") testutil.Assert(t, !chunkCreated, "second sample should use same chunk") @@ -651,7 +650,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i)) + ok, _ := s.append(1001+int64(i), float64(i), 0) testutil.Assert(t, ok, "append failed") } From 55307e46a6b2666e4720e8535d0be611be2aee3f Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 22 Jun 2017 13:23:42 +0100 Subject: [PATCH 02/15] Pass down isolation information to query iterators. --- block.go | 10 ++++++++-- compact.go | 2 +- compact_test.go | 6 +++--- db.go | 10 ++++++++++ head.go | 22 ++++++++++++++-------- head_test.go | 14 +++++++------- querier.go | 2 +- 7 files changed, 44 insertions(+), 22 deletions(-) diff --git a/block.go b/block.go index b076fdb6..28e3d94d 100644 --- a/block.go +++ b/block.go @@ -124,7 +124,7 @@ type BlockReader interface { Index() (IndexReader, error) // Chunks returns a ChunkReader over the block's data. - Chunks() (ChunkReader, error) + Chunks(*IsolationState) (ChunkReader, error) // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() (TombstoneReader, error) @@ -139,6 +139,12 @@ type Appendable interface { Busy() bool } +type IsolationState struct { + // We will ignore all writes above the max, and that are incomplete. + maxWriteId uint64 + incompleteWrites map[uint64]struct{} +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -328,7 +334,7 @@ func (pb *Block) Index() (IndexReader, error) { } // Chunks returns a new ChunkReader against the block data. -func (pb *Block) Chunks() (ChunkReader, error) { +func (pb *Block) Chunks(_ *IsolationState) (ChunkReader, error) { if err := pb.startRead(); err != nil { return nil, err } diff --git a/compact.go b/compact.go index 16a3bd74..b991fb8e 100644 --- a/compact.go +++ b/compact.go @@ -521,7 +521,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } closers = append(closers, indexr) - chunkr, err := b.Chunks() + chunkr, err := b.Chunks(nil) if err != nil { return errors.Wrapf(err, "open chunk reader for block %s", b) } diff --git a/compact_test.go b/compact_test.go index 42e38b5c..3fda41b9 100644 --- a/compact_test.go +++ b/compact_test.go @@ -398,6 +398,6 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { type erringBReader struct{} -func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } -func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } -func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } +func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks(*IsolationState) (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } diff --git a/db.go b/db.go index fb7180af..a0601edd 100644 --- a/db.go +++ b/db.go @@ -693,6 +693,16 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { db.mtx.RLock() defer db.mtx.RUnlock() + db.writeMtx.Lock() + isolation := &IsolationState{ + maxWriteId: db.writeLastId, + incompleteWrites: make(map[uint64]struct{}, len(db.writesOpen)), + } + for k, _ := range db.writesOpen { + isolation.incompleteWrites[k] = struct{}{} + } + db.writeMtx.Unlock() + for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { diff --git a/head.go b/head.go index e467bdd0..64510ac0 100644 --- a/head.go +++ b/head.go @@ -389,8 +389,8 @@ func (h *rangeHead) Index() (IndexReader, error) { return h.head.indexRange(h.mint, h.maxt), nil } -func (h *rangeHead) Chunks() (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt), nil +func (h *rangeHead) Chunks(isolation *IsolationState) (ChunkReader, error) { + return h.head.chunksRange(h.mint, h.maxt, isolation), nil } func (h *rangeHead) Tombstones() (TombstoneReader, error) { @@ -668,15 +668,15 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { } // Chunks returns a ChunkReader against the block. -func (h *Head) Chunks() (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64), nil +func (h *Head) Chunks(isolation *IsolationState) (ChunkReader, error) { + return h.chunksRange(math.MinInt64, math.MaxInt64, isolation), nil } -func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { +func (h *Head) chunksRange(mint, maxt int64, isolation *IsolationState) *headChunkReader { if hmin := h.MinTime(); hmin > mint { mint = hmin } - return &headChunkReader{head: h, mint: mint, maxt: maxt} + return &headChunkReader{head: h, mint: mint, maxt: maxt, isolation: isolation} } // MinTime returns the lowest time bound on visible data in the head. @@ -697,6 +697,8 @@ func (h *Head) Close() error { type headChunkReader struct { head *Head mint, maxt int64 + + isolation *IsolationState } func (h *headChunkReader) Close() error { @@ -749,6 +751,8 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { Chunk: c.chunk, s: s, cid: int(cid), + + isolation: h.isolation, }, nil } @@ -756,11 +760,13 @@ type safeChunk struct { chunkenc.Chunk s *memSeries cid int + + isolation *IsolationState } func (c *safeChunk) Iterator() chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid) + it := c.s.iterator(c.cid, c.isolation) c.s.Unlock() return it } @@ -1252,7 +1258,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int) chunkenc.Iterator { +func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterator { c := s.chunk(id) // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, // which got then garbage collected before it got accessed. diff --git a/head_test.go b/head_test.go index 6f3bbb95..b196be5f 100644 --- a/head_test.go +++ b/head_test.go @@ -113,10 +113,10 @@ func TestHead_ReadWAL(t *testing.T) { return x } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil))) testutil.Equals(t, 0, len(s11.chunks)) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) - testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil))) + testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0, nil))) } func TestHead_Truncate(t *testing.T) { @@ -221,11 +221,11 @@ func TestMemSeries_truncateChunks(t *testing.T) { // Validate that the series' sample buffer is applied correctly to the last chunk // after truncation. - it1 := s.iterator(s.chunkID(len(s.chunks) - 1)) + it1 := s.iterator(s.chunkID(len(s.chunks)-1), nil) _, ok := it1.(*memSafeIterator) testutil.Assert(t, ok == true, "") - it2 := s.iterator(s.chunkID(len(s.chunks) - 2)) + it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil) _, ok = it2.(*memSafeIterator) testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") } @@ -688,7 +688,7 @@ func TestGCChunkAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 1500) + cr := h.chunksRange(0, 1500, nil) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) @@ -728,7 +728,7 @@ func TestGCSeriesAccess(t *testing.T) { }}, lset) testutil.Equals(t, 2, len(chunks)) - cr := h.chunksRange(0, 2000) + cr := h.chunksRange(0, 2000, nil) _, err = cr.Chunk(chunks[0].Ref) testutil.Ok(t, err) _, err = cr.Chunk(chunks[1].Ref) diff --git a/querier.go b/querier.go index d4dad930..b8cf9c31 100644 --- a/querier.go +++ b/querier.go @@ -123,7 +123,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { if err != nil { return nil, errors.Wrapf(err, "open index reader") } - chunkr, err := b.Chunks() + chunkr, err := b.Chunks(nil) if err != nil { indexr.Close() return nil, errors.Wrapf(err, "open chunk reader") From cbaa1131debcecb9e44cfe1cc105dd64149e6aa4 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 22 Jun 2017 17:10:11 +0100 Subject: [PATCH 03/15] Add unittest for isolation failure. --- db_test.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/db_test.go b/db_test.go index 3fbbef74..d29bd0b6 100644 --- a/db_test.go +++ b/db_test.go @@ -19,11 +19,13 @@ import ( "math/rand" "os" "sort" + "strconv" "testing" "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/stretchr/testify/require" ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { @@ -892,3 +894,85 @@ func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) { return result, ss.Err() } + +func TestDBCannotSeePartialCommits(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + defer db.Close() + + stop := make(chan struct{}) + firstInsert := make(chan struct{}) + + // Insert data in batches. + go func() { + iter := 0 + for { + app := db.Appender() + + for j := 0; j < 100; j++ { + _, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) + require.NoError(t, err) + } + if iter == 0 { + close(firstInsert) + } + iter++ + + err = app.Commit() + require.NoError(t, err) + + select { + case <-stop: + return + default: + } + } + }() + + <-firstInsert + + // This is a race condition, so do a few tests to tickle it. + // Usually most will fail. + inconsistencies := 0 + for i := 0; i < 10; i++ { + func() { + querier, err := db.Querier(0, 1000000) + testutil.Ok(t, err) + defer querier.Close() + + ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + seriesSet := make(map[string][]sample) + for ss.Next() { + series := ss.At() + + samples := []sample{} + it := series.Iterator() + for it.Next() { + t, v := it.At() + samples = append(samples, sample{t: t, v: v}) + } + + name := series.Labels().String() + seriesSet[name] = samples + } + testutil.Ok(t, ss.Err()) + + require.NoError(t, err) + values := map[float64]struct{}{} + for _, series := range seriesSet { + values[series[len(series)-1].v] = struct{}{} + } + if len(values) != 1 { + inconsistencies++ + } + }() + } + stop <- struct{}{} + + require.Equal(t, 0, inconsistencies, "Some queries saw inconsistent results.") +} From 40f180c3a8287ba58b34ff21189063add85b646e Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 22 Jun 2017 18:35:38 +0100 Subject: [PATCH 04/15] Use isolation information when reading head blocks. --- db_test.go | 10 +++++++--- head.go | 52 ++++++++++++++++++++++++++++++++++++++++------------ head_test.go | 5 +++-- 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/db_test.go b/db_test.go index d29bd0b6..753d7c05 100644 --- a/db_test.go +++ b/db_test.go @@ -57,6 +57,10 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam } testutil.Ok(t, it.Err()) + if len(samples) == 0 { + continue + } + name := series.Labels().String() result[name] = samples } @@ -916,14 +920,14 @@ func TestDBCannotSeePartialCommits(t *testing.T) { _, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter)) require.NoError(t, err) } + err = app.Commit() + require.NoError(t, err) + if iter == 0 { close(firstInsert) } iter++ - err = app.Commit() - require.NoError(t, err) - select { case <-stop: return diff --git a/head.go b/head.go index 64510ac0..5c2024dc 100644 --- a/head.go +++ b/head.go @@ -1267,16 +1267,42 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato return chunkenc.NewNopIterator() } + numSamples := c.chunk.NumSamples() + stopAfter := numSamples + + if isolation != nil { + for index, writeId := range s.writeIds { + if _, ok := isolation.incompleteWrites[writeId]; ok || writeId > isolation.maxWriteId { + for _, c2 := range s.chunks[:id] { + index -= c2.chunk.NumSamples() + } + if index < numSamples { + stopAfter = index + } + break + } + } + } + if id-s.firstChunkID < len(s.chunks)-1 { - return c.chunk.Iterator() + it := &memSafeIterator{ + Iterator: c.chunk.Iterator(), + i: -1, + total: numSamples, + stopAfter: stopAfter, + } + return it } + // Serve the last 4 samples for the last chunk from the sample buffer // as their compressed bytes may be mutated by added samples. it := &memSafeIterator{ - Iterator: c.chunk.Iterator(), - i: -1, - total: c.chunk.NumSamples(), - buf: s.sampleBuf, + Iterator: c.chunk.Iterator(), + i: -1, + total: numSamples, + stopAfter: stopAfter, + bufferedSamples: 4, + buf: s.sampleBuf, } return it } @@ -1296,27 +1322,29 @@ type memChunk struct { type memSafeIterator struct { chunkenc.Iterator - i int - total int - buf [4]sample + i int + total int + stopAfter int + bufferedSamples int + buf [4]sample } func (it *memSafeIterator) Next() bool { - if it.i+1 >= it.total { + if it.i+1 >= it.stopAfter { return false } it.i++ - if it.total-it.i > 4 { + if it.total-it.i > it.bufferedSamples { return it.Iterator.Next() } return true } func (it *memSafeIterator) At() (int64, float64) { - if it.total-it.i > 4 { + if it.total-it.i > it.bufferedSamples { return it.Iterator.At() } - s := it.buf[4-(it.total-it.i)] + s := it.buf[it.bufferedSamples-(it.total-it.i)] return s.t, s.v } diff --git a/head_test.go b/head_test.go index b196be5f..19bcf0db 100644 --- a/head_test.go +++ b/head_test.go @@ -226,8 +226,9 @@ func TestMemSeries_truncateChunks(t *testing.T) { testutil.Assert(t, ok == true, "") it2 := s.iterator(s.chunkID(len(s.chunks)-2), nil) - _, ok = it2.(*memSafeIterator) - testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer") + it2MSI, ok := it2.(*memSafeIterator) + testutil.Assert(t, ok == true, "") + testutil.Assert(t, it2MSI.bufferedSamples == 0, "non-last chunk incorrectly wrapped with sample buffer") } func TestHeadDeleteSimple(t *testing.T) { From caf58e56467faf98f412f7ec5561a204b9008f1d Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 22 Jun 2017 18:43:35 +0100 Subject: [PATCH 05/15] Add test for not seeing commits after querier is created. --- db_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/db_test.go b/db_test.go index 753d7c05..9c41dd42 100644 --- a/db_test.go +++ b/db_test.go @@ -980,3 +980,33 @@ func TestDBCannotSeePartialCommits(t *testing.T) { require.Equal(t, 0, inconsistencies, "Some queries saw inconsistent results.") } + +func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + defer db.Close() + + querier := db.Querier(0, 1000000) + defer querier.Close() + + app := db.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0) + require.NoError(t, err) + // This commit is after the querier is created, so should not be returned. + err = app.Commit() + require.NoError(t, err) + + seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) + require.NoError(t, err) + require.Equal(t, map[string][]sample{}, seriesSet) + + querier = db.Querier(0, 1000000) + defer querier.Close() + seriesSet, err = readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) + require.NoError(t, err) + require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) + +} From 52aa41009b0bf243b2e6b236922363577790b7fb Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 28 Jun 2017 15:37:26 +0100 Subject: [PATCH 06/15] Track what reads are in progress. --- block.go | 2 ++ db.go | 38 +++++++++++++++++++++++++++++++++++--- db_test.go | 52 +++++++++++++++++++++++++++++++--------------------- querier.go | 35 ++++++++++++++++++++++++++++++++++- 4 files changed, 102 insertions(+), 25 deletions(-) diff --git a/block.go b/block.go index 28e3d94d..7beaa994 100644 --- a/block.go +++ b/block.go @@ -143,6 +143,8 @@ type IsolationState struct { // We will ignore all writes above the max, and that are incomplete. maxWriteId uint64 incompleteWrites map[uint64]struct{} + readId uint64 + db *DB } // BlockMeta provides meta information about a block. diff --git a/db.go b/db.go index a0601edd..fa19751e 100644 --- a/db.go +++ b/db.go @@ -116,13 +116,18 @@ type DB struct { cmtx sync.Mutex compactionsEnabled bool - // Mutex for accessing writeWatermark and writesOpen. - // block layout. + // Mutex for accessing writeLastId and writesOpen. writeMtx sync.Mutex // Each write is given an internal id. writeLastId uint64 // Which writes are currently in progress. writesOpen map[uint64]struct{} + // Mutex for accessing readLastId. + readMtx sync.Mutex + // Each isolated read is given an internal id. + readLastId uint64 + // All current in use isolationStates. + readsOpen map[uint64]*IsolationState } type dbMetrics struct { @@ -206,6 +211,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db logger: l, opts: opts, writesOpen: map[uint64]struct{}{}, + readsOpen: map[uint64]*IsolationState{}, compactc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), @@ -714,7 +720,9 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } sq := &querier{ - blocks: make([]Querier, 0, len(blocks)), + blocks: make([]Querier, 0, len(blocks)), + db: db, + isolation: db.IsolationState(), } for _, b := range blocks { @@ -737,6 +745,30 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { return mint, mint + width } +// readLowWatermark returns the writeId below which +// we no longer need to track which writes were from +// which writeId. +// TODO: Optimise this, needs to be O(1). +func (db *DB) readLowWatermark() uint64 { + db.writeMtx.Lock() + id := db.writeLastId + db.writeMtx.Unlock() + + db.readMtx.Lock() + for _, isolation := range db.readsOpen { + if isolation.maxWriteId < id { + id = isolation.maxWriteId + } + for i := range isolation.incompleteWrites { + if i < id { + id = i + } + } + } + db.readMtx.Unlock() + return id +} + // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() diff --git a/db_test.go b/db_test.go index 9c41dd42..fe365b15 100644 --- a/db_test.go +++ b/db_test.go @@ -950,21 +950,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) { ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) testutil.Ok(t, err) - seriesSet := make(map[string][]sample) - for ss.Next() { - series := ss.At() - - samples := []sample{} - it := series.Iterator() - for it.Next() { - t, v := it.At() - samples = append(samples, sample{t: t, v: v}) - } - - name := series.Labels().String() - seriesSet[name] = samples - } - testutil.Ok(t, ss.Err()) + seriesSet := readSeriesSet(t, ss) require.NoError(t, err) values := map[float64]struct{}{} @@ -989,7 +975,8 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { require.NoError(t, err) defer db.Close() - querier := db.Querier(0, 1000000) + querier, err := db.Querier(0, 1000000) + testutil.Ok(t, err) defer querier.Close() app := db.Appender() @@ -999,14 +986,37 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { err = app.Commit() require.NoError(t, err) - seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) - require.NoError(t, err) + ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + seriesSet := readSeriesSet(t, ss) require.Equal(t, map[string][]sample{}, seriesSet) - querier = db.Querier(0, 1000000) + querier, err = db.Querier(0, 1000000) + testutil.Ok(t, err) defer querier.Close() - seriesSet, err = readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar"))) - require.NoError(t, err) + + ss, err = querier.Select(labels.NewEqualMatcher("foo", "bar")) + seriesSet = readSeriesSet(t, ss) require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) +} + +func readSeriesSet(t *testing.T, ss SeriesSet) map[string][]sample { + seriesSet := make(map[string][]sample) + for ss.Next() { + series := ss.At() + + samples := []sample{} + it := series.Iterator() + for it.Next() { + t, v := it.At() + samples = append(samples, sample{t: t, v: v}) + } + + name := series.Labels().String() + seriesSet[name] = samples + } + testutil.Ok(t, ss.Err()) + return seriesSet } diff --git a/querier.go b/querier.go index b8cf9c31..d843f878 100644 --- a/querier.go +++ b/querier.go @@ -53,7 +53,37 @@ type Series interface { // querier aggregates querying results from time blocks within // a single partition. type querier struct { - blocks []Querier + db *DB + isolation *IsolationState + blocks []Querier +} + +// IsolationState returns an objet used to control isolation +// between a query and writes. Must be closed when complete. +func (s *DB) IsolationState() *IsolationState { + s.writeMtx.Lock() + isolation := &IsolationState{ + maxWriteId: s.writeLastId, + incompleteWrites: make(map[uint64]struct{}, len(s.writesOpen)), + db: s, + } + for k, _ := range s.writesOpen { + isolation.incompleteWrites[k] = struct{}{} + } + s.writeMtx.Unlock() + + s.readMtx.Lock() + isolation.readId = s.readLastId + s.readLastId++ + s.readsOpen[isolation.readId] = isolation + s.readMtx.Unlock() + return isolation +} + +func (i *IsolationState) Close() { + i.db.readMtx.Lock() + delete(i.db.readsOpen, i.readId) + i.db.readMtx.Unlock() } func (q *querier) LabelValues(n string) ([]string, error) { @@ -114,6 +144,9 @@ func (q *querier) Close() error { for _, bq := range q.blocks { merr.Add(bq.Close()) } + + q.isolation.Close() + return merr.Err() } From 2c125d1a84014fcb5b45b95d33a5603fd4e6fb2b Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 29 Jun 2017 14:18:52 +0100 Subject: [PATCH 07/15] Cleanup old writeIds at append time. --- block.go | 4 +-- db.go | 2 +- head.go | 69 +++++++++++++++++++++++++++++++++++++--------------- head_test.go | 3 ++- 4 files changed, 55 insertions(+), 23 deletions(-) diff --git a/block.go b/block.go index 7beaa994..dc91b5c9 100644 --- a/block.go +++ b/block.go @@ -133,14 +133,14 @@ type BlockReader interface { // Appendable defines an entity to which data can be appended. type Appendable interface { // Appender returns a new Appender against an underlying store. - Appender(writeId uint64) Appender + Appender(writeId, cleanupWriteIdsBelow uint64) Appender // Busy returns whether there are any currently active appenders. Busy() bool } type IsolationState struct { - // We will ignore all writes above the max, and that are incomplete. + // We will ignore all writes above the max, or that are incomplete. maxWriteId uint64 incompleteWrites map[uint64]struct{} readId uint64 diff --git a/db.go b/db.go index fa19751e..64e1088f 100644 --- a/db.go +++ b/db.go @@ -355,7 +355,7 @@ func (db *DB) Appender() Appender { db.writesOpen[id] = struct{}{} db.writeMtx.Unlock() - return dbAppender{db: db, Appender: db.head.Appender(id), writeId: id} + return dbAppender{db: db, Appender: db.head.Appender(id, db.readLowWatermark()), writeId: id} } // dbAppender wraps the DB's head appender and triggers compactions on commit diff --git a/head.go b/head.go index 5c2024dc..a924ef0b 100644 --- a/head.go +++ b/head.go @@ -409,7 +409,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender(0) + a.app = a.head.appender(0, 0) return a.app.Add(lset, t, v) } @@ -436,7 +436,7 @@ func (a *initAppender) Rollback() error { } // Appender returns a new Appender on the database. -func (h *Head) Appender(writeId uint64) Appender { +func (h *Head) Appender(writeId, cleanupWriteIdsBelow uint64) Appender { h.metrics.activeAppenders.Inc() // The head cache might not have a starting point yet. The init appender @@ -444,16 +444,17 @@ func (h *Head) Appender(writeId uint64) Appender { if h.MinTime() == math.MinInt64 { return &initAppender{head: h} } - return h.appender(writeId) + return h.appender(writeId, cleanupWriteIdsBelow) } -func (h *Head) appender(writeId uint64) *headAppender { +func (h *Head) appender(writeId, cleanupWriteIdsBelow uint64) *headAppender { return &headAppender{ - head: h, - mint: h.MaxTime() - h.chunkRange/2, - samples: h.getAppendBuffer(), - highTimestamp: math.MinInt64, - writeId: writeId, + head: h, + mint: h.MaxTime() - h.chunkRange/2, + samples: h.getAppendBuffer(), + highTimestamp: math.MinInt64, + writeId: writeId, + cleanupWriteIdsBelow: cleanupWriteIdsBelow, } } @@ -477,7 +478,8 @@ type headAppender struct { samples []RefSample highTimestamp int64 - writeId uint64 + writeId uint64 + cleanupWriteIdsBelow uint64 } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -1113,7 +1115,7 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. - // Write ids of the tail of the list of samples. + // Write ids of most recent samples. writeIds []uint64 } @@ -1247,6 +1249,25 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr return true, chunkCreated } +func (s *memSeries) cleanupWriteIdsBelow(bound uint64) { + s.Lock() + defer s.Unlock() + + toRemove := 0 + for _, id := range s.writeIds { + if id < bound { + toRemove++ + } else { + break + } + } + + if toRemove != 0 { + // XXX This doesn't free the RAM. + s.writeIds = s.writeIds[toRemove:] + } +} + // computeChunkEndTime estimates the end timestamp based the beginning of a chunk, // its current timestamp and the upper bound up to which we insert data. // It assumes that the time range is 1/4 full. @@ -1271,15 +1292,25 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato stopAfter := numSamples if isolation != nil { - for index, writeId := range s.writeIds { - if _, ok := isolation.incompleteWrites[writeId]; ok || writeId > isolation.maxWriteId { - for _, c2 := range s.chunks[:id] { - index -= c2.chunk.NumSamples() - } - if index < numSamples { - stopAfter = index + totalSamples := 0 + previousSamples := 0 // Samples before this chunk. + for j, d := range s.chunks { + totalSamples += d.chunk.NumSamples() + if j < id { + previousSamples += d.chunk.NumSamples() + } + } + writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - len(s.writeIds)) + if writeIdsToConsider > 0 { + // writeIds extends back to at least this chunk. + for index, writeId := range s.writeIds[:writeIdsToConsider] { + if _, ok := isolation.incompleteWrites[writeId]; ok || writeId > isolation.maxWriteId { + stopAfter = index - (writeIdsToConsider - c.chunk.NumSamples()) + if stopAfter < 0 { + stopAfter = 0 // Stopped in a previous chunk. + } + break } - break } } } diff --git a/head_test.go b/head_test.go index 19bcf0db..66b07d42 100644 --- a/head_test.go +++ b/head_test.go @@ -63,6 +63,7 @@ func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), de } } return nil + } func TestHead_ReadWAL(t *testing.T) { @@ -238,7 +239,7 @@ func TestHeadDeleteSimple(t *testing.T) { testutil.Ok(t, err) defer head.Close() - app := head.Appender(0) + app := head.Appender(0, 0) smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() From 5046ed1c7a0ee4aa509073c3fe62be75e2d13e36 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 29 Jun 2017 15:09:06 +0100 Subject: [PATCH 08/15] Add test for processing of isolation at chunk iteration. --- db.go | 2 +- head.go | 6 ++--- head_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++-- querier.go | 4 +-- 4 files changed, 79 insertions(+), 8 deletions(-) diff --git a/db.go b/db.go index 64e1088f..a8392016 100644 --- a/db.go +++ b/db.go @@ -726,7 +726,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } for _, b := range blocks { - q, err := NewBlockQuerier(b, mint, maxt) + q, err := NewBlockQuerier(b, mint, maxt, isolation) if err == nil { sq.blocks = append(sq.blocks, q) continue diff --git a/head.go b/head.go index a924ef0b..1156008c 100644 --- a/head.go +++ b/head.go @@ -541,6 +541,7 @@ func (a *headAppender) Commit() error { for _, s := range a.samples { s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V, a.writeId) + s.series.cleanupWriteIdsBelow(a.cleanupWriteIdsBelow) s.series.Unlock() if !ok { @@ -1249,10 +1250,9 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr return true, chunkCreated } +// cleanupWriteIdsBelow cleans up older writeIds. Has to be called after acquiring +// lock. func (s *memSeries) cleanupWriteIdsBelow(bound uint64) { - s.Lock() - defer s.Unlock() - toRemove := 0 for _, id := range s.writeIds { if id < bound { diff --git a/head_test.go b/head_test.go index 66b07d42..0c8f55ba 100644 --- a/head_test.go +++ b/head_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" + "github.com/stretchr/testify/require" ) func BenchmarkCreateSeries(b *testing.B) { @@ -63,7 +64,6 @@ func (w *memoryWAL) Read(series func([]RefSeries), samples func([]RefSample), de } } return nil - } func TestHead_ReadWAL(t *testing.T) { @@ -284,7 +284,7 @@ Outer: } // Compare the result. - q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime(), nil) testutil.Ok(t, err) res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) @@ -745,3 +745,74 @@ func TestGCSeriesAccess(t *testing.T) { _, err = cr.Chunk(chunks[1].Ref) testutil.Equals(t, ErrNotFound, err) } + +func TestMemSeriesIsolation(t *testing.T) { + // Put a series, select it. GC it and then access it. + hb, err := NewHead(nil, nil, NopWAL(), 1000) + testutil.Ok(t, err) + defer hb.Close() + + lastValue := func(maxWriteId uint64) int { + querier, err := NewBlockQuerier(hb, 0, 10000, &IsolationState{maxWriteId: maxWriteId}) + testutil.Ok(t, err) + defer querier.Close() + + ss, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + testutil.Ok(t, err) + + seriesSet := readSeriesSet(t, ss) + for _, series := range seriesSet { + return int(series[len(series)-1].v) + } + return -1 + } + + i := 0 + for ; i <= 1000; i++ { + app := hb.Appender(uint64(i), 0) + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + } + + // Test simple cases in different chunks when no writeId cleanup has been performed. + require.Equal(t, 10, lastValue(10)) + require.Equal(t, 130, lastValue(130)) + require.Equal(t, 160, lastValue(160)) + require.Equal(t, 240, lastValue(240)) + require.Equal(t, 500, lastValue(500)) + require.Equal(t, 750, lastValue(750)) + require.Equal(t, 995, lastValue(995)) + require.Equal(t, 999, lastValue(999)) + + // Cleanup writeIds below 500. + app := hb.Appender(uint64(i), 500) + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + i++ + + // We should not get queries with a maxWriteId below 500 after the cleanup, + // but they only take the remaining writeIds into account. + require.Equal(t, 499, lastValue(10)) + require.Equal(t, 499, lastValue(130)) + require.Equal(t, 499, lastValue(160)) + require.Equal(t, 499, lastValue(240)) + require.Equal(t, 500, lastValue(500)) + require.Equal(t, 995, lastValue(995)) + require.Equal(t, 999, lastValue(999)) + + // Cleanup writeIds below 1000, which means the sample buffer is + // the only thing with writeIds. + app = hb.Appender(uint64(i), 1000) + _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) + require.NoError(t, err, "Failed to add sample") + require.NoError(t, app.Commit(), "Unexpected error committing appender") + i++ + require.Equal(t, 999, lastValue(998)) + require.Equal(t, 999, lastValue(999)) + require.Equal(t, 1000, lastValue(1000)) + require.Equal(t, 1001, lastValue(1001)) + require.Equal(t, 1002, lastValue(1002)) + require.Equal(t, 1002, lastValue(1003)) +} diff --git a/querier.go b/querier.go index d843f878..d8e862ff 100644 --- a/querier.go +++ b/querier.go @@ -151,12 +151,12 @@ func (q *querier) Close() error { } // NewBlockQuerier returns a queries against the readers. -func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { +func NewBlockQuerier(b BlockReader, mint, maxt int64, isolation *IsolationState) (Querier, error) { indexr, err := b.Index() if err != nil { return nil, errors.Wrapf(err, "open index reader") } - chunkr, err := b.Chunks(nil) + chunkr, err := b.Chunks(isolation) if err != nil { indexr.Close() return nil, errors.Wrapf(err, "open chunk reader") From 6d62bef89917a5eded9187cf9f3739427ab9b8eb Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Thu, 29 Jun 2017 16:20:33 +0100 Subject: [PATCH 09/15] Make writeIds a ring buffer. --- head.go | 64 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/head.go b/head.go index 1156008c..253302c8 100644 --- a/head.go +++ b/head.go @@ -1116,8 +1116,10 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. - // Write ids of most recent samples. - writeIds []uint64 + // Write ids of most recent samples. This is a ring buffer. + writeIds []uint64 + writeIdFirst int // Position of first id in the ring. + writeIdCount int // How many ids in the ring. } func (s *memSeries) minTime() int64 { @@ -1154,7 +1156,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { ref: id, chunkRange: chunkRange, nextAt: math.MinInt64, - writeIds: []uint64{}, + writeIds: make([]uint64, 4), } return s } @@ -1245,7 +1247,16 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - s.writeIds = append(s.writeIds, writeId) + if s.writeIdCount == len(s.writeIds) { + // Ring buffer is full, expand by doubling. + newRing := make([]uint64, s.writeIdCount*2) + copy(newRing[s.writeIdCount+s.writeIdFirst:], s.writeIds[s.writeIdFirst:]) + copy(newRing, s.writeIds[:s.writeIdFirst]) + s.writeIds = newRing + s.writeIdFirst += s.writeIdCount + } + s.writeIds[(s.writeIdFirst+s.writeIdCount)%len(s.writeIds)] = writeId + s.writeIdCount++ return true, chunkCreated } @@ -1253,18 +1264,22 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr // cleanupWriteIdsBelow cleans up older writeIds. Has to be called after acquiring // lock. func (s *memSeries) cleanupWriteIdsBelow(bound uint64) { - toRemove := 0 - for _, id := range s.writeIds { - if id < bound { - toRemove++ + pos := s.writeIdFirst + + for s.writeIdCount > 0 { + if s.writeIds[pos] < bound { + s.writeIdFirst++ + s.writeIdCount-- } else { break } + pos++ + if pos == len(s.writeIds) { + pos = 0 + } } - - if toRemove != 0 { - // XXX This doesn't free the RAM. - s.writeIds = s.writeIds[toRemove:] + if s.writeIdFirst >= len(s.writeIds) { + s.writeIdFirst -= len(s.writeIds) } } @@ -1300,17 +1315,22 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato previousSamples += d.chunk.NumSamples() } } - writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - len(s.writeIds)) - if writeIdsToConsider > 0 { - // writeIds extends back to at least this chunk. - for index, writeId := range s.writeIds[:writeIdsToConsider] { - if _, ok := isolation.incompleteWrites[writeId]; ok || writeId > isolation.maxWriteId { - stopAfter = index - (writeIdsToConsider - c.chunk.NumSamples()) - if stopAfter < 0 { - stopAfter = 0 // Stopped in a previous chunk. - } - break + writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - s.writeIdCount) + // Iterate over the ring, find the first one that the isolation state says not + // to return. + pos := s.writeIdFirst + for index := 0; index < writeIdsToConsider; index++ { + writeId := s.writeIds[pos] + if _, ok := isolation.incompleteWrites[writeId]; ok || writeId > isolation.maxWriteId { + stopAfter = index - (writeIdsToConsider - c.chunk.NumSamples()) + if stopAfter < 0 { + stopAfter = 0 // Stopped in a previous chunk. } + break + } + pos++ + if pos == len(s.writeIds) { + pos = 0 } } } From efd5bf19ed536b549d4e853973899965a7d4694a Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Fri, 30 Jun 2017 13:00:23 +0100 Subject: [PATCH 10/15] Use a double linked list to track active reads --- block.go | 9 +++++---- db.go | 35 ++++++++++++++--------------------- querier.go | 20 +++++++++++++------- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/block.go b/block.go index dc91b5c9..b51d3329 100644 --- a/block.go +++ b/block.go @@ -134,17 +134,18 @@ type BlockReader interface { type Appendable interface { // Appender returns a new Appender against an underlying store. Appender(writeId, cleanupWriteIdsBelow uint64) Appender - - // Busy returns whether there are any currently active appenders. - Busy() bool } type IsolationState struct { // We will ignore all writes above the max, or that are incomplete. maxWriteId uint64 incompleteWrites map[uint64]struct{} - readId uint64 + lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId. db *DB + + // Doubly linked list of active reads. + next *IsolationState + prev *IsolationState } // BlockMeta provides meta information about a block. diff --git a/db.go b/db.go index a8392016..41fd6520 100644 --- a/db.go +++ b/db.go @@ -123,11 +123,10 @@ type DB struct { // Which writes are currently in progress. writesOpen map[uint64]struct{} // Mutex for accessing readLastId. + // If taking both writeMtx and readMtx, take writeMtx first. readMtx sync.Mutex - // Each isolated read is given an internal id. - readLastId uint64 - // All current in use isolationStates. - readsOpen map[uint64]*IsolationState + // All current in use isolationStates. This is a doubly-linked list. + readsOpen *IsolationState } type dbMetrics struct { @@ -206,12 +205,16 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, err } + head := &IsolationState{} + head.next = head + head.prev = head + db = &DB{ dir: dir, logger: l, opts: opts, writesOpen: map[uint64]struct{}{}, - readsOpen: map[uint64]*IsolationState{}, + readsOpen: head, compactc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), @@ -748,25 +751,15 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { // readLowWatermark returns the writeId below which // we no longer need to track which writes were from // which writeId. -// TODO: Optimise this, needs to be O(1). func (db *DB) readLowWatermark() uint64 { - db.writeMtx.Lock() - id := db.writeLastId - db.writeMtx.Unlock() - + db.writeMtx.Lock() // Take writeMtx first. + defer db.writeMtx.Unlock() db.readMtx.Lock() - for _, isolation := range db.readsOpen { - if isolation.maxWriteId < id { - id = isolation.maxWriteId - } - for i := range isolation.incompleteWrites { - if i < id { - id = i - } - } + defer db.readMtx.Unlock() + if db.readsOpen.prev == db.readsOpen { + return db.writeLastId } - db.readMtx.Unlock() - return id + return db.readsOpen.prev.lowWaterMark } // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. diff --git a/querier.go b/querier.go index d8e862ff..10a5e7f8 100644 --- a/querier.go +++ b/querier.go @@ -61,28 +61,34 @@ type querier struct { // IsolationState returns an objet used to control isolation // between a query and writes. Must be closed when complete. func (s *DB) IsolationState() *IsolationState { - s.writeMtx.Lock() + s.writeMtx.Lock() // Take write mutex before read mutex. + defer s.writeMtx.Unlock() isolation := &IsolationState{ maxWriteId: s.writeLastId, + lowWaterMark: s.writeLastId, incompleteWrites: make(map[uint64]struct{}, len(s.writesOpen)), db: s, } for k, _ := range s.writesOpen { isolation.incompleteWrites[k] = struct{}{} + if k < isolation.lowWaterMark { + isolation.lowWaterMark = k + } } - s.writeMtx.Unlock() s.readMtx.Lock() - isolation.readId = s.readLastId - s.readLastId++ - s.readsOpen[isolation.readId] = isolation - s.readMtx.Unlock() + defer s.readMtx.Unlock() + isolation.prev = s.readsOpen + isolation.next = s.readsOpen.next + s.readsOpen.next.prev = isolation + s.readsOpen.next = isolation return isolation } func (i *IsolationState) Close() { i.db.readMtx.Lock() - delete(i.db.readsOpen, i.readId) + i.next.prev = i.prev + i.prev.next = i.next i.db.readMtx.Unlock() } From 59ebdbd688393715294b6c31358cac132fb4406f Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Fri, 7 Jul 2017 13:37:55 +0100 Subject: [PATCH 11/15] Add metrics for isolation. --- db.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/db.go b/db.go index 41fd6520..011c18b0 100644 --- a/db.go +++ b/db.go @@ -131,6 +131,8 @@ type DB struct { type dbMetrics struct { loadedBlocks prometheus.GaugeFunc + lowWatermark prometheus.GaugeFunc + highWatermark prometheus.GaugeFunc reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter @@ -150,6 +152,20 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { defer db.mtx.RUnlock() return float64(len(db.blocks)) }) + m.lowWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_isolation_low_watermark", + Help: "The lowest write id that is still referenced.", + }, func() float64 { + return float64(db.readLowWatermark()) + }) + m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_isolation_high_watermark", + Help: "The highest write id that has been given out.", + }, func() float64 { + db.writeMtx.Lock() + defer db.writeMtx.Unlock() + return float64(db.writeLastId) + }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", @@ -178,6 +194,8 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { if r != nil { r.MustRegister( m.loadedBlocks, + m.lowWatermark, + m.highWatermark, m.reloads, m.reloadsFailed, m.cutoffs, From 4690f0046845f45f2769db84f9077fe5468831c9 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 20 Mar 2018 20:36:11 +0530 Subject: [PATCH 12/15] Fix tests after rebase Signed-off-by: Goutham Veeramachaneni --- db.go | 2 +- db_test.go | 3 +++ head.go | 12 +++++++++--- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/db.go b/db.go index 011c18b0..eb1d8161 100644 --- a/db.go +++ b/db.go @@ -371,8 +371,8 @@ func (db *DB) retentionCutoff() (b bool, err error) { // Appender opens a new appender against the database. func (db *DB) Appender() Appender { db.writeMtx.Lock() - id := db.writeLastId db.writeLastId++ + id := db.writeLastId db.writesOpen[id] = struct{}{} db.writeMtx.Unlock() diff --git a/db_test.go b/db_test.go index fe365b15..5f3b3316 100644 --- a/db_test.go +++ b/db_test.go @@ -1012,6 +1012,9 @@ func readSeriesSet(t *testing.T, ss SeriesSet) map[string][]sample { t, v := it.At() samples = append(samples, sample{t: t, v: v}) } + if len(samples) == 0 { + continue + } name := series.Labels().String() seriesSet[name] = samples diff --git a/head.go b/head.go index 253302c8..4ac6eaa7 100644 --- a/head.go +++ b/head.go @@ -402,6 +402,9 @@ func (h *rangeHead) Tombstones() (TombstoneReader, error) { type initAppender struct { app Appender head *Head + + writeId uint64 + cleanupWriteIdsBelow uint64 } func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -409,7 +412,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender(0, 0) + a.app = a.head.appender(a.writeId, a.cleanupWriteIdsBelow) return a.app.Add(lset, t, v) } @@ -442,7 +445,7 @@ func (h *Head) Appender(writeId, cleanupWriteIdsBelow uint64) Appender { // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. if h.MinTime() == math.MinInt64 { - return &initAppender{head: h} + return &initAppender{head: h, writeId: writeId, cleanupWriteIdsBelow: cleanupWriteIdsBelow} } return h.appender(writeId, cleanupWriteIdsBelow) } @@ -1296,6 +1299,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterator { c := s.chunk(id) + // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, // which got then garbage collected before it got accessed. // We must ensure to not garbage collect as long as any readers still hold a reference. @@ -1303,6 +1307,8 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato return chunkenc.NewNopIterator() } + ix := id - s.firstChunkID + numSamples := c.chunk.NumSamples() stopAfter := numSamples @@ -1311,7 +1317,7 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato previousSamples := 0 // Samples before this chunk. for j, d := range s.chunks { totalSamples += d.chunk.NumSamples() - if j < id { + if j < ix { previousSamples += d.chunk.NumSamples() } } From f7c4253be45b8f337ab6382d91232e77e68de41f Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 20 Mar 2018 22:53:45 +0530 Subject: [PATCH 13/15] Make GC cleanup writeIds Signed-off-by: Goutham Veeramachaneni --- head.go | 37 ++++++++++++++++++++++++++++--- head_test.go | 62 +++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/head.go b/head.go index 4ac6eaa7..c0672cbc 100644 --- a/head.go +++ b/head.go @@ -1021,6 +1021,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { for _, series := range all { series.Lock() rmChunks += series.truncateChunksBefore(mint) + series.cleanupExtraWriteIds() if len(series.chunks) > 0 { series.Unlock() @@ -1253,10 +1254,10 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr if s.writeIdCount == len(s.writeIds) { // Ring buffer is full, expand by doubling. newRing := make([]uint64, s.writeIdCount*2) - copy(newRing[s.writeIdCount+s.writeIdFirst:], s.writeIds[s.writeIdFirst:]) - copy(newRing, s.writeIds[:s.writeIdFirst]) + idx := copy(newRing[:], s.writeIds[s.writeIdFirst%len(s.writeIds):]) + copy(newRing[idx:], s.writeIds[:s.writeIdFirst%len(s.writeIds)]) s.writeIds = newRing - s.writeIdFirst += s.writeIdCount + s.writeIdFirst = 0 } s.writeIds[(s.writeIdFirst+s.writeIdCount)%len(s.writeIds)] = writeId s.writeIdCount++ @@ -1286,6 +1287,36 @@ func (s *memSeries) cleanupWriteIdsBelow(bound uint64) { } } +func (s *memSeries) cleanupExtraWriteIds() { + totalSamples := 0 + for _, c := range s.chunks { + totalSamples += c.chunk.NumSamples() + } + + if s.writeIdCount <= totalSamples { + return + } + + s.writeIdFirst += (s.writeIdCount - totalSamples) + s.writeIdCount = totalSamples + + newBufSize := len(s.writeIds) + for totalSamples < newBufSize/2 { + newBufSize = newBufSize / 2 + } + + if newBufSize == len(s.writeIds) { + return + } + + newRing := make([]uint64, newBufSize) + idx := copy(newRing[:], s.writeIds[s.writeIdFirst%len(s.writeIds):]) + copy(newRing[idx:], s.writeIds[:s.writeIdFirst%len(s.writeIds)]) + + s.writeIds = newRing + s.writeIdFirst = 0 +} + // computeChunkEndTime estimates the end timestamp based the beginning of a chunk, // its current timestamp and the upper bound up to which we insert data. // It assumes that the time range is 1/4 full. diff --git a/head_test.go b/head_test.go index 0c8f55ba..c81a120a 100644 --- a/head_test.go +++ b/head_test.go @@ -133,18 +133,18 @@ func TestHead_Truncate(t *testing.T) { s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) s1.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, - {minTime: 2000, maxTime: 2999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, } s2.chunks = []*memChunk{ - {minTime: 1000, maxTime: 1999}, - {minTime: 2000, maxTime: 2999}, - {minTime: 3000, maxTime: 3999}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, } s3.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, } s4.chunks = []*memChunk{} @@ -154,12 +154,12 @@ func TestHead_Truncate(t *testing.T) { testutil.Ok(t, h.Truncate(2000)) testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, }, h.series.getByID(s1.ref).chunks) testutil.Equals(t, []*memChunk{ - {minTime: 2000, maxTime: 2999}, - {minTime: 3000, maxTime: 3999}, + {minTime: 2000, maxTime: 2999, chunk: chunkenc.NewXORChunk()}, + {minTime: 3000, maxTime: 3999, chunk: chunkenc.NewXORChunk()}, }, h.series.getByID(s2.ref).chunks) testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") @@ -674,8 +674,8 @@ func TestGCChunkAccess(t *testing.T) { s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, } idx := h.indexRange(0, 1500) @@ -714,8 +714,8 @@ func TestGCSeriesAccess(t *testing.T) { s, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s.chunks = []*memChunk{ - {minTime: 0, maxTime: 999}, - {minTime: 1000, maxTime: 1999}, + {minTime: 0, maxTime: 999, chunk: chunkenc.NewXORChunk()}, + {minTime: 1000, maxTime: 1999, chunk: chunkenc.NewXORChunk()}, } idx := h.indexRange(0, 2000) @@ -816,3 +816,35 @@ func TestMemSeriesIsolation(t *testing.T) { require.Equal(t, 1002, lastValue(1002)) require.Equal(t, 1002, lastValue(1003)) } + +func TestHead_Truncate_WriteIDs(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) + + chk := chunkenc.NewXORChunk() + app, err := chk.Appender() + testutil.Ok(t, err) + + app.Append(1, 0) + app.Append(2, 0) + app.Append(3, 0) + + s1.chunks = []*memChunk{ + {minTime: 0, maxTime: 999, chunk: chk}, + {minTime: 1000, maxTime: 1999, chunk: chk}, + } + + s1.writeIds = []uint64{2, 3, 4, 5, 0, 0, 0, 1} + s1.writeIdFirst = 7 + s1.writeIdCount = 5 + + testutil.Ok(t, h.Truncate(1000)) + testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.writeIds) + testutil.Equals(t, 0, s1.writeIdFirst) + testutil.Equals(t, 3, s1.writeIdCount) +} From d8db95c5f87fdd096a263e95cdb0e452822e04cf Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 26 Mar 2018 00:46:36 +0530 Subject: [PATCH 14/15] Refactor to have no impact on interfaces Signed-off-by: Goutham Veeramachaneni --- block.go | 15 ++--- compact.go | 2 +- compact_test.go | 6 +- db.go | 81 ++--------------------- head.go | 173 +++++++++++++++++++++++++++++++++--------------- head_test.go | 45 +++++++++---- querier.go | 45 ++++++------- 7 files changed, 186 insertions(+), 181 deletions(-) diff --git a/block.go b/block.go index b51d3329..502cfd62 100644 --- a/block.go +++ b/block.go @@ -124,24 +124,19 @@ type BlockReader interface { Index() (IndexReader, error) // Chunks returns a ChunkReader over the block's data. - Chunks(*IsolationState) (ChunkReader, error) + Chunks() (ChunkReader, error) // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() (TombstoneReader, error) } -// Appendable defines an entity to which data can be appended. -type Appendable interface { - // Appender returns a new Appender against an underlying store. - Appender(writeId, cleanupWriteIdsBelow uint64) Appender -} - +// IsolationState holds the isolation information. type IsolationState struct { // We will ignore all writes above the max, or that are incomplete. - maxWriteId uint64 + maxWriteID uint64 incompleteWrites map[uint64]struct{} lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId. - db *DB + head *Head // Doubly linked list of active reads. next *IsolationState @@ -337,7 +332,7 @@ func (pb *Block) Index() (IndexReader, error) { } // Chunks returns a new ChunkReader against the block data. -func (pb *Block) Chunks(_ *IsolationState) (ChunkReader, error) { +func (pb *Block) Chunks() (ChunkReader, error) { if err := pb.startRead(); err != nil { return nil, err } diff --git a/compact.go b/compact.go index b991fb8e..16a3bd74 100644 --- a/compact.go +++ b/compact.go @@ -521,7 +521,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } closers = append(closers, indexr) - chunkr, err := b.Chunks(nil) + chunkr, err := b.Chunks() if err != nil { return errors.Wrapf(err, "open chunk reader for block %s", b) } diff --git a/compact_test.go b/compact_test.go index 3fda41b9..42e38b5c 100644 --- a/compact_test.go +++ b/compact_test.go @@ -398,6 +398,6 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { type erringBReader struct{} -func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } -func (erringBReader) Chunks(*IsolationState) (ChunkReader, error) { return nil, errors.New("chunks") } -func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } +func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } diff --git a/db.go b/db.go index eb1d8161..d5ddcf9d 100644 --- a/db.go +++ b/db.go @@ -115,24 +115,10 @@ type DB struct { // cmtx is used to control compactions and deletions. cmtx sync.Mutex compactionsEnabled bool - - // Mutex for accessing writeLastId and writesOpen. - writeMtx sync.Mutex - // Each write is given an internal id. - writeLastId uint64 - // Which writes are currently in progress. - writesOpen map[uint64]struct{} - // Mutex for accessing readLastId. - // If taking both writeMtx and readMtx, take writeMtx first. - readMtx sync.Mutex - // All current in use isolationStates. This is a doubly-linked list. - readsOpen *IsolationState } type dbMetrics struct { loadedBlocks prometheus.GaugeFunc - lowWatermark prometheus.GaugeFunc - highWatermark prometheus.GaugeFunc reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter @@ -152,20 +138,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { defer db.mtx.RUnlock() return float64(len(db.blocks)) }) - m.lowWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_isolation_low_watermark", - Help: "The lowest write id that is still referenced.", - }, func() float64 { - return float64(db.readLowWatermark()) - }) - m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "tsdb_isolation_high_watermark", - Help: "The highest write id that has been given out.", - }, func() float64 { - db.writeMtx.Lock() - defer db.writeMtx.Unlock() - return float64(db.writeLastId) - }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", @@ -194,8 +166,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { if r != nil { r.MustRegister( m.loadedBlocks, - m.lowWatermark, - m.highWatermark, m.reloads, m.reloadsFailed, m.cutoffs, @@ -223,16 +193,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, err } - head := &IsolationState{} - head.next = head - head.prev = head - db = &DB{ dir: dir, logger: l, opts: opts, - writesOpen: map[uint64]struct{}{}, - readsOpen: head, compactc: make(chan struct{}, 1), donec: make(chan struct{}), stopc: make(chan struct{}), @@ -370,13 +334,7 @@ func (db *DB) retentionCutoff() (b bool, err error) { // Appender opens a new appender against the database. func (db *DB) Appender() Appender { - db.writeMtx.Lock() - db.writeLastId++ - id := db.writeLastId - db.writesOpen[id] = struct{}{} - db.writeMtx.Unlock() - - return dbAppender{db: db, Appender: db.head.Appender(id, db.readLowWatermark()), writeId: id} + return dbAppender{db: db, Appender: db.head.Appender()} } // dbAppender wraps the DB's head appender and triggers compactions on commit @@ -384,8 +342,6 @@ func (db *DB) Appender() Appender { type dbAppender struct { Appender db *DB - - writeId uint64 } func (a dbAppender) Commit() error { @@ -400,10 +356,6 @@ func (a dbAppender) Commit() error { } } - a.db.writeMtx.Lock() - delete(a.db.writesOpen, a.writeId) - a.db.writeMtx.Unlock() - return err } @@ -720,16 +672,6 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { db.mtx.RLock() defer db.mtx.RUnlock() - db.writeMtx.Lock() - isolation := &IsolationState{ - maxWriteId: db.writeLastId, - incompleteWrites: make(map[uint64]struct{}, len(db.writesOpen)), - } - for k, _ := range db.writesOpen { - isolation.incompleteWrites[k] = struct{}{} - } - db.writeMtx.Unlock() - for _, b := range db.blocks { m := b.Meta() if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { @@ -741,13 +683,12 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { } sq := &querier{ - blocks: make([]Querier, 0, len(blocks)), - db: db, - isolation: db.IsolationState(), + blocks: make([]Querier, 0, len(blocks)), + db: db, } for _, b := range blocks { - q, err := NewBlockQuerier(b, mint, maxt, isolation) + q, err := NewBlockQuerier(b, mint, maxt) if err == nil { sq.blocks = append(sq.blocks, q) continue @@ -766,20 +707,6 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { return mint, mint + width } -// readLowWatermark returns the writeId below which -// we no longer need to track which writes were from -// which writeId. -func (db *DB) readLowWatermark() uint64 { - db.writeMtx.Lock() // Take writeMtx first. - defer db.writeMtx.Unlock() - db.readMtx.Lock() - defer db.readMtx.Unlock() - if db.readsOpen.prev == db.readsOpen { - return db.writeLastId - } - return db.readsOpen.prev.lowWaterMark -} - // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { db.cmtx.Lock() diff --git a/head.go b/head.go index c0672cbc..05375f76 100644 --- a/head.go +++ b/head.go @@ -70,6 +70,18 @@ type Head struct { postings *index.MemPostings // postings lists for terms tombstones memTombstones + + // Mutex for accessing writeLastId and writesOpen. + writeMtx sync.Mutex + // Each write is given an internal id. + lastWriteID uint64 + // Which writes are currently in progress. + writesOpen map[uint64]struct{} + // Mutex for accessing readLastId. + // If taking both writeMtx and readMtx, take writeMtx first. + readMtx sync.Mutex + // All current in use isolationStates. This is a doubly-linked list. + readsOpen *IsolationState } type headMetrics struct { @@ -86,6 +98,9 @@ type headMetrics struct { maxTime prometheus.GaugeFunc samplesAppended prometheus.Counter walTruncateDuration prometheus.Summary + + lowWatermark prometheus.GaugeFunc + highWatermark prometheus.GaugeFunc } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -147,6 +162,20 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", }) + m.lowWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_isolation_low_watermark", + Help: "The lowest write id that is still referenced.", + }, func() float64 { + return float64(h.readLowWatermark()) + }) + m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "tsdb_isolation_high_watermark", + Help: "The highest write id that has been given out.", + }, func() float64 { + h.writeMtx.Lock() + defer h.writeMtx.Unlock() + return float64(h.lastWriteID) + }) if r != nil { r.MustRegister( @@ -163,6 +192,8 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.gcDuration, m.walTruncateDuration, m.samplesAppended, + m.lowWatermark, + m.highWatermark, ) } return m @@ -179,6 +210,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } + headIso := &IsolationState{} + headIso.next = headIso + headIso.prev = headIso + h := &Head{ wal: wal, logger: l, @@ -190,6 +225,9 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), tombstones: memTombstones{}, + + writesOpen: map[uint64]struct{}{}, + readsOpen: headIso, } h.metrics = newHeadMetrics(h, r) @@ -389,8 +427,8 @@ func (h *rangeHead) Index() (IndexReader, error) { return h.head.indexRange(h.mint, h.maxt), nil } -func (h *rangeHead) Chunks(isolation *IsolationState) (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt, isolation), nil +func (h *rangeHead) Chunks() (ChunkReader, error) { + return h.head.chunksRange(h.mint, h.maxt, h.head.IsolationState()), nil } func (h *rangeHead) Tombstones() (TombstoneReader, error) { @@ -403,8 +441,8 @@ type initAppender struct { app Appender head *Head - writeId uint64 - cleanupWriteIdsBelow uint64 + writeID uint64 + cleanupWriteIDsBelow uint64 } func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -412,7 +450,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return a.app.Add(lset, t, v) } a.head.initTime(t) - a.app = a.head.appender(a.writeId, a.cleanupWriteIdsBelow) + a.app = a.head.appender(a.writeID, a.cleanupWriteIDsBelow) return a.app.Add(lset, t, v) } @@ -439,25 +477,33 @@ func (a *initAppender) Rollback() error { } // Appender returns a new Appender on the database. -func (h *Head) Appender(writeId, cleanupWriteIdsBelow uint64) Appender { +func (h *Head) Appender() Appender { h.metrics.activeAppenders.Inc() + h.writeMtx.Lock() + h.lastWriteID++ + writeID := h.lastWriteID + h.writesOpen[writeID] = struct{}{} + h.writeMtx.Unlock() + + cleanupWriteIDsBelow := h.readLowWatermark() + // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. if h.MinTime() == math.MinInt64 { - return &initAppender{head: h, writeId: writeId, cleanupWriteIdsBelow: cleanupWriteIdsBelow} + return &initAppender{head: h, writeID: writeID, cleanupWriteIDsBelow: cleanupWriteIDsBelow} } - return h.appender(writeId, cleanupWriteIdsBelow) + return h.appender(writeID, cleanupWriteIDsBelow) } -func (h *Head) appender(writeId, cleanupWriteIdsBelow uint64) *headAppender { +func (h *Head) appender(writeID, cleanupWriteIDsBelow uint64) *headAppender { return &headAppender{ head: h, mint: h.MaxTime() - h.chunkRange/2, samples: h.getAppendBuffer(), highTimestamp: math.MinInt64, - writeId: writeId, - cleanupWriteIdsBelow: cleanupWriteIdsBelow, + writeID: writeID, + cleanupWriteIDsBelow: cleanupWriteIDsBelow, } } @@ -481,8 +527,8 @@ type headAppender struct { samples []RefSample highTimestamp int64 - writeId uint64 - cleanupWriteIdsBelow uint64 + writeID uint64 + cleanupWriteIDsBelow uint64 } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { @@ -543,8 +589,8 @@ func (a *headAppender) Commit() error { for _, s := range a.samples { s.series.Lock() - ok, chunkCreated := s.series.append(s.T, s.V, a.writeId) - s.series.cleanupWriteIdsBelow(a.cleanupWriteIdsBelow) + ok, chunkCreated := s.series.append(s.T, s.V, a.writeID) + s.series.cleanupWriteIDsBelow(a.cleanupWriteIDsBelow) s.series.Unlock() if !ok { @@ -568,6 +614,10 @@ func (a *headAppender) Commit() error { } } + a.head.writeMtx.Lock() + delete(a.head.writesOpen, a.writeID) + a.head.writeMtx.Unlock() + return nil } @@ -674,8 +724,8 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { } // Chunks returns a ChunkReader against the block. -func (h *Head) Chunks(isolation *IsolationState) (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64, isolation), nil +func (h *Head) Chunks() (ChunkReader, error) { + return h.chunksRange(math.MinInt64, math.MaxInt64, h.IsolationState()), nil } func (h *Head) chunksRange(mint, maxt int64, isolation *IsolationState) *headChunkReader { @@ -708,6 +758,7 @@ type headChunkReader struct { } func (h *headChunkReader) Close() error { + h.isolation.Close() return nil } @@ -932,6 +983,20 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie return s, true } +// readLowWatermark returns the writeId below which +// we no longer need to track which writes were from +// which writeId. +func (h *Head) readLowWatermark() uint64 { + h.writeMtx.Lock() // Take writeMtx first. + defer h.writeMtx.Unlock() + h.readMtx.Lock() + defer h.readMtx.Unlock() + if h.readsOpen.prev == h.readsOpen { + return h.lastWriteID + } + return h.readsOpen.prev.lowWaterMark +} + // seriesHashmap is a simple hashmap for memSeries by their label set. It is built // on top of a regular hashmap and holds a slice of series to resolve hash collisions. // Its methods require the hash to be submitted with it to avoid re-computations throughout @@ -1121,9 +1186,9 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. // Write ids of most recent samples. This is a ring buffer. - writeIds []uint64 - writeIdFirst int // Position of first id in the ring. - writeIdCount int // How many ids in the ring. + writeIDs []uint64 + writeIDFirst int // Position of first id in the ring. + writeIDCount int // How many ids in the ring. } func (s *memSeries) minTime() int64 { @@ -1160,7 +1225,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { ref: id, chunkRange: chunkRange, nextAt: math.MinInt64, - writeIds: make([]uint64, 4), + writeIDs: make([]uint64, 4), } return s } @@ -1215,7 +1280,7 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { } // append adds the sample (t, v) to the series. -func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, writeID uint64) (success, chunkCreated bool) { const samplesPerChunk = 120 c := s.head() @@ -1251,39 +1316,39 @@ func (s *memSeries) append(t int64, v float64, writeId uint64) (success, chunkCr s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - if s.writeIdCount == len(s.writeIds) { + if s.writeIDCount == len(s.writeIDs) { // Ring buffer is full, expand by doubling. - newRing := make([]uint64, s.writeIdCount*2) - idx := copy(newRing[:], s.writeIds[s.writeIdFirst%len(s.writeIds):]) - copy(newRing[idx:], s.writeIds[:s.writeIdFirst%len(s.writeIds)]) - s.writeIds = newRing - s.writeIdFirst = 0 + newRing := make([]uint64, s.writeIDCount*2) + idx := copy(newRing[:], s.writeIDs[s.writeIDFirst%len(s.writeIDs):]) + copy(newRing[idx:], s.writeIDs[:s.writeIDFirst%len(s.writeIDs)]) + s.writeIDs = newRing + s.writeIDFirst = 0 } - s.writeIds[(s.writeIdFirst+s.writeIdCount)%len(s.writeIds)] = writeId - s.writeIdCount++ + s.writeIDs[(s.writeIDFirst+s.writeIDCount)%len(s.writeIDs)] = writeID + s.writeIDCount++ return true, chunkCreated } -// cleanupWriteIdsBelow cleans up older writeIds. Has to be called after acquiring +// cleanupWriteIDsBelow cleans up older writeIds. Has to be called after acquiring // lock. -func (s *memSeries) cleanupWriteIdsBelow(bound uint64) { - pos := s.writeIdFirst +func (s *memSeries) cleanupWriteIDsBelow(bound uint64) { + pos := s.writeIDFirst - for s.writeIdCount > 0 { - if s.writeIds[pos] < bound { - s.writeIdFirst++ - s.writeIdCount-- + for s.writeIDCount > 0 { + if s.writeIDs[pos] < bound { + s.writeIDFirst++ + s.writeIDCount-- } else { break } pos++ - if pos == len(s.writeIds) { + if pos == len(s.writeIDs) { pos = 0 } } - if s.writeIdFirst >= len(s.writeIds) { - s.writeIdFirst -= len(s.writeIds) + if s.writeIDFirst >= len(s.writeIDs) { + s.writeIDFirst -= len(s.writeIDs) } } @@ -1293,28 +1358,28 @@ func (s *memSeries) cleanupExtraWriteIds() { totalSamples += c.chunk.NumSamples() } - if s.writeIdCount <= totalSamples { + if s.writeIDCount <= totalSamples { return } - s.writeIdFirst += (s.writeIdCount - totalSamples) - s.writeIdCount = totalSamples + s.writeIDFirst += (s.writeIDCount - totalSamples) + s.writeIDCount = totalSamples - newBufSize := len(s.writeIds) + newBufSize := len(s.writeIDs) for totalSamples < newBufSize/2 { newBufSize = newBufSize / 2 } - if newBufSize == len(s.writeIds) { + if newBufSize == len(s.writeIDs) { return } newRing := make([]uint64, newBufSize) - idx := copy(newRing[:], s.writeIds[s.writeIdFirst%len(s.writeIds):]) - copy(newRing[idx:], s.writeIds[:s.writeIdFirst%len(s.writeIds)]) + idx := copy(newRing[:], s.writeIDs[s.writeIDFirst%len(s.writeIDs):]) + copy(newRing[idx:], s.writeIDs[:s.writeIDFirst%len(s.writeIDs)]) - s.writeIds = newRing - s.writeIdFirst = 0 + s.writeIDs = newRing + s.writeIDFirst = 0 } // computeChunkEndTime estimates the end timestamp based the beginning of a chunk, @@ -1352,13 +1417,13 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato previousSamples += d.chunk.NumSamples() } } - writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - s.writeIdCount) + writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - s.writeIDCount) // Iterate over the ring, find the first one that the isolation state says not // to return. - pos := s.writeIdFirst + pos := s.writeIDFirst for index := 0; index < writeIdsToConsider; index++ { - writeId := s.writeIds[pos] - if _, ok := isolation.incompleteWrites[writeId]; ok || writeId > isolation.maxWriteId { + writeID := s.writeIDs[pos] + if _, ok := isolation.incompleteWrites[writeID]; ok || writeID > isolation.maxWriteID { stopAfter = index - (writeIdsToConsider - c.chunk.NumSamples()) if stopAfter < 0 { stopAfter = 0 // Stopped in a previous chunk. @@ -1366,7 +1431,7 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato break } pos++ - if pos == len(s.writeIds) { + if pos == len(s.writeIDs) { pos = 0 } } diff --git a/head_test.go b/head_test.go index c81a120a..439d4a0c 100644 --- a/head_test.go +++ b/head_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "math" "math/rand" "testing" @@ -239,7 +240,7 @@ func TestHeadDeleteSimple(t *testing.T) { testutil.Ok(t, err) defer head.Close() - app := head.Appender(0, 0) + app := head.Appender() smpls := make([]float64, numSamples) for i := int64(0); i < numSamples; i++ { smpls[i] = rand.Float64() @@ -284,7 +285,7 @@ Outer: } // Compare the result. - q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime(), nil) + q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) testutil.Ok(t, err) res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) @@ -753,7 +754,20 @@ func TestMemSeriesIsolation(t *testing.T) { defer hb.Close() lastValue := func(maxWriteId uint64) int { - querier, err := NewBlockQuerier(hb, 0, 10000, &IsolationState{maxWriteId: maxWriteId}) + idx, err := hb.Index() + testutil.Ok(t, err) + + iso := hb.IsolationState() + iso.maxWriteID = maxWriteId + + querier := &blockQuerier{ + mint: 0, + maxt: 10000, + index: idx, + chunks: hb.chunksRange(math.MinInt64, math.MaxInt64, iso), + tombstones: emptyTombstoneReader, + } + testutil.Ok(t, err) defer querier.Close() @@ -769,7 +783,14 @@ func TestMemSeriesIsolation(t *testing.T) { i := 0 for ; i <= 1000; i++ { - app := hb.Appender(uint64(i), 0) + var app Appender + // To initialise bounds. + if hb.MinTime() == math.MinInt64 { + app = &initAppender{head: hb, writeID: uint64(i), cleanupWriteIDsBelow: 0} + } else { + app = hb.appender(uint64(i), 0) + } + _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -786,7 +807,7 @@ func TestMemSeriesIsolation(t *testing.T) { require.Equal(t, 999, lastValue(999)) // Cleanup writeIds below 500. - app := hb.Appender(uint64(i), 500) + app := hb.appender(uint64(i), 500) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -804,7 +825,7 @@ func TestMemSeriesIsolation(t *testing.T) { // Cleanup writeIds below 1000, which means the sample buffer is // the only thing with writeIds. - app = hb.Appender(uint64(i), 1000) + app = hb.appender(uint64(i), 1000) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -839,12 +860,12 @@ func TestHead_Truncate_WriteIDs(t *testing.T) { {minTime: 1000, maxTime: 1999, chunk: chk}, } - s1.writeIds = []uint64{2, 3, 4, 5, 0, 0, 0, 1} - s1.writeIdFirst = 7 - s1.writeIdCount = 5 + s1.writeIDs = []uint64{2, 3, 4, 5, 0, 0, 0, 1} + s1.writeIDFirst = 7 + s1.writeIDCount = 5 testutil.Ok(t, h.Truncate(1000)) - testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.writeIds) - testutil.Equals(t, 0, s1.writeIdFirst) - testutil.Equals(t, 3, s1.writeIdCount) + testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.writeIDs) + testutil.Equals(t, 0, s1.writeIDFirst) + testutil.Equals(t, 3, s1.writeIDCount) } diff --git a/querier.go b/querier.go index 10a5e7f8..ac99dfa2 100644 --- a/querier.go +++ b/querier.go @@ -53,43 +53,42 @@ type Series interface { // querier aggregates querying results from time blocks within // a single partition. type querier struct { - db *DB - isolation *IsolationState - blocks []Querier + db *DB + blocks []Querier } -// IsolationState returns an objet used to control isolation +// IsolationState returns an object used to control isolation // between a query and writes. Must be closed when complete. -func (s *DB) IsolationState() *IsolationState { - s.writeMtx.Lock() // Take write mutex before read mutex. - defer s.writeMtx.Unlock() +func (h *Head) IsolationState() *IsolationState { + h.writeMtx.Lock() // Take write mutex before read mutex. + defer h.writeMtx.Unlock() isolation := &IsolationState{ - maxWriteId: s.writeLastId, - lowWaterMark: s.writeLastId, - incompleteWrites: make(map[uint64]struct{}, len(s.writesOpen)), - db: s, + maxWriteID: h.lastWriteID, + lowWaterMark: h.lastWriteID, + incompleteWrites: make(map[uint64]struct{}, len(h.writesOpen)), + head: h, } - for k, _ := range s.writesOpen { + for k := range h.writesOpen { isolation.incompleteWrites[k] = struct{}{} if k < isolation.lowWaterMark { isolation.lowWaterMark = k } } - s.readMtx.Lock() - defer s.readMtx.Unlock() - isolation.prev = s.readsOpen - isolation.next = s.readsOpen.next - s.readsOpen.next.prev = isolation - s.readsOpen.next = isolation + h.readMtx.Lock() + defer h.readMtx.Unlock() + isolation.prev = h.readsOpen + isolation.next = h.readsOpen.next + h.readsOpen.next.prev = isolation + h.readsOpen.next = isolation return isolation } func (i *IsolationState) Close() { - i.db.readMtx.Lock() + i.head.readMtx.Lock() i.next.prev = i.prev i.prev.next = i.next - i.db.readMtx.Unlock() + i.head.readMtx.Unlock() } func (q *querier) LabelValues(n string) ([]string, error) { @@ -151,18 +150,16 @@ func (q *querier) Close() error { merr.Add(bq.Close()) } - q.isolation.Close() - return merr.Err() } // NewBlockQuerier returns a queries against the readers. -func NewBlockQuerier(b BlockReader, mint, maxt int64, isolation *IsolationState) (Querier, error) { +func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { indexr, err := b.Index() if err != nil { return nil, errors.Wrapf(err, "open index reader") } - chunkr, err := b.Chunks(isolation) + chunkr, err := b.Chunks() if err != nil { indexr.Close() return nil, errors.Wrapf(err, "open chunk reader") From dc64dcb0c7bc2803c1ab8015fa9094ead2bce6d8 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 4 Apr 2018 14:16:08 +0530 Subject: [PATCH 15/15] Encapsulate all isolation details. Signed-off-by: Goutham Veeramachaneni --- block.go | 13 ---- head.go | 163 +++++++++++----------------------------- head_test.go | 14 ++-- isolation.go | 208 +++++++++++++++++++++++++++++++++++++++++++++++++++ querier.go | 34 --------- 5 files changed, 257 insertions(+), 175 deletions(-) create mode 100644 isolation.go diff --git a/block.go b/block.go index 502cfd62..9375fb13 100644 --- a/block.go +++ b/block.go @@ -130,19 +130,6 @@ type BlockReader interface { Tombstones() (TombstoneReader, error) } -// IsolationState holds the isolation information. -type IsolationState struct { - // We will ignore all writes above the max, or that are incomplete. - maxWriteID uint64 - incompleteWrites map[uint64]struct{} - lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId. - head *Head - - // Doubly linked list of active reads. - next *IsolationState - prev *IsolationState -} - // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/head.go b/head.go index 05375f76..a855414d 100644 --- a/head.go +++ b/head.go @@ -71,17 +71,7 @@ type Head struct { tombstones memTombstones - // Mutex for accessing writeLastId and writesOpen. - writeMtx sync.Mutex - // Each write is given an internal id. - lastWriteID uint64 - // Which writes are currently in progress. - writesOpen map[uint64]struct{} - // Mutex for accessing readLastId. - // If taking both writeMtx and readMtx, take writeMtx first. - readMtx sync.Mutex - // All current in use isolationStates. This is a doubly-linked list. - readsOpen *IsolationState + iso *isolation } type headMetrics struct { @@ -166,15 +156,15 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "tsdb_isolation_low_watermark", Help: "The lowest write id that is still referenced.", }, func() float64 { - return float64(h.readLowWatermark()) + return float64(h.iso.lowWatermark()) }) m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "tsdb_isolation_high_watermark", Help: "The highest write id that has been given out.", }, func() float64 { - h.writeMtx.Lock() - defer h.writeMtx.Unlock() - return float64(h.lastWriteID) + h.iso.writeMtx.Lock() + defer h.iso.writeMtx.Unlock() + return float64(h.iso.lastWriteID) }) if r != nil { @@ -210,9 +200,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( if chunkRange < 1 { return nil, errors.Errorf("invalid chunk range %d", chunkRange) } - headIso := &IsolationState{} - headIso.next = headIso - headIso.prev = headIso h := &Head{ wal: wal, @@ -226,8 +213,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( postings: index.NewUnorderedMemPostings(), tombstones: memTombstones{}, - writesOpen: map[uint64]struct{}{}, - readsOpen: headIso, + iso: newIsolation(), } h.metrics = newHeadMetrics(h, r) @@ -428,7 +414,7 @@ func (h *rangeHead) Index() (IndexReader, error) { } func (h *rangeHead) Chunks() (ChunkReader, error) { - return h.head.chunksRange(h.mint, h.maxt, h.head.IsolationState()), nil + return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil } func (h *rangeHead) Tombstones() (TombstoneReader, error) { @@ -480,13 +466,8 @@ func (a *initAppender) Rollback() error { func (h *Head) Appender() Appender { h.metrics.activeAppenders.Inc() - h.writeMtx.Lock() - h.lastWriteID++ - writeID := h.lastWriteID - h.writesOpen[writeID] = struct{}{} - h.writeMtx.Unlock() - - cleanupWriteIDsBelow := h.readLowWatermark() + writeID := h.iso.newWriteID() + cleanupWriteIDsBelow := h.iso.lowWatermark() // The head cache might not have a starting point yet. The init appender // picks up the first appended timestamp as the base. @@ -614,9 +595,7 @@ func (a *headAppender) Commit() error { } } - a.head.writeMtx.Lock() - delete(a.head.writesOpen, a.writeID) - a.head.writeMtx.Unlock() + a.head.iso.closeWrite(a.writeID) return nil } @@ -725,14 +704,14 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { // Chunks returns a ChunkReader against the block. func (h *Head) Chunks() (ChunkReader, error) { - return h.chunksRange(math.MinInt64, math.MaxInt64, h.IsolationState()), nil + return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil } -func (h *Head) chunksRange(mint, maxt int64, isolation *IsolationState) *headChunkReader { +func (h *Head) chunksRange(mint, maxt int64, isoState *IsolationState) *headChunkReader { if hmin := h.MinTime(); hmin > mint { mint = hmin } - return &headChunkReader{head: h, mint: mint, maxt: maxt, isolation: isolation} + return &headChunkReader{head: h, mint: mint, maxt: maxt, isoState: isoState} } // MinTime returns the lowest time bound on visible data in the head. @@ -754,11 +733,11 @@ type headChunkReader struct { head *Head mint, maxt int64 - isolation *IsolationState + isoState *IsolationState } func (h *headChunkReader) Close() error { - h.isolation.Close() + h.isoState.Close() return nil } @@ -809,7 +788,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s: s, cid: int(cid), - isolation: h.isolation, + isoState: h.isoState, }, nil } @@ -818,12 +797,12 @@ type safeChunk struct { s *memSeries cid int - isolation *IsolationState + isoState *IsolationState } func (c *safeChunk) Iterator() chunkenc.Iterator { c.s.Lock() - it := c.s.iterator(c.cid, c.isolation) + it := c.s.iterator(c.cid, c.isoState) c.s.Unlock() return it } @@ -983,20 +962,6 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie return s, true } -// readLowWatermark returns the writeId below which -// we no longer need to track which writes were from -// which writeId. -func (h *Head) readLowWatermark() uint64 { - h.writeMtx.Lock() // Take writeMtx first. - defer h.writeMtx.Unlock() - h.readMtx.Lock() - defer h.readMtx.Unlock() - if h.readsOpen.prev == h.readsOpen { - return h.lastWriteID - } - return h.readsOpen.prev.lowWaterMark -} - // seriesHashmap is a simple hashmap for memSeries by their label set. It is built // on top of a regular hashmap and holds a slice of series to resolve hash collisions. // Its methods require the hash to be submitted with it to avoid re-computations throughout @@ -1185,10 +1150,7 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. - // Write ids of most recent samples. This is a ring buffer. - writeIDs []uint64 - writeIDFirst int // Position of first id in the ring. - writeIDCount int // How many ids in the ring. + txs *txRing } func (s *memSeries) minTime() int64 { @@ -1225,7 +1187,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { ref: id, chunkRange: chunkRange, nextAt: math.MinInt64, - writeIDs: make([]uint64, 4), + txs: newTxRing(4), } return s } @@ -1316,16 +1278,7 @@ func (s *memSeries) append(t int64, v float64, writeID uint64) (success, chunkCr s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - if s.writeIDCount == len(s.writeIDs) { - // Ring buffer is full, expand by doubling. - newRing := make([]uint64, s.writeIDCount*2) - idx := copy(newRing[:], s.writeIDs[s.writeIDFirst%len(s.writeIDs):]) - copy(newRing[idx:], s.writeIDs[:s.writeIDFirst%len(s.writeIDs)]) - s.writeIDs = newRing - s.writeIDFirst = 0 - } - s.writeIDs[(s.writeIDFirst+s.writeIDCount)%len(s.writeIDs)] = writeID - s.writeIDCount++ + s.txs.add(writeID) return true, chunkCreated } @@ -1333,23 +1286,7 @@ func (s *memSeries) append(t int64, v float64, writeID uint64) (success, chunkCr // cleanupWriteIDsBelow cleans up older writeIds. Has to be called after acquiring // lock. func (s *memSeries) cleanupWriteIDsBelow(bound uint64) { - pos := s.writeIDFirst - - for s.writeIDCount > 0 { - if s.writeIDs[pos] < bound { - s.writeIDFirst++ - s.writeIDCount-- - } else { - break - } - pos++ - if pos == len(s.writeIDs) { - pos = 0 - } - } - if s.writeIDFirst >= len(s.writeIDs) { - s.writeIDFirst -= len(s.writeIDs) - } + s.txs.cleanupWriteIDsBelow(bound) } func (s *memSeries) cleanupExtraWriteIds() { @@ -1358,28 +1295,7 @@ func (s *memSeries) cleanupExtraWriteIds() { totalSamples += c.chunk.NumSamples() } - if s.writeIDCount <= totalSamples { - return - } - - s.writeIDFirst += (s.writeIDCount - totalSamples) - s.writeIDCount = totalSamples - - newBufSize := len(s.writeIDs) - for totalSamples < newBufSize/2 { - newBufSize = newBufSize / 2 - } - - if newBufSize == len(s.writeIDs) { - return - } - - newRing := make([]uint64, newBufSize) - idx := copy(newRing[:], s.writeIDs[s.writeIDFirst%len(s.writeIDs):]) - copy(newRing[idx:], s.writeIDs[:s.writeIDFirst%len(s.writeIDs)]) - - s.writeIDs = newRing - s.writeIDFirst = 0 + s.txs.cutoffN(totalSamples) } // computeChunkEndTime estimates the end timestamp based the beginning of a chunk, @@ -1393,7 +1309,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterator { +func (s *memSeries) iterator(id int, isoState *IsolationState) chunkenc.Iterator { c := s.chunk(id) // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, @@ -1408,32 +1324,37 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato numSamples := c.chunk.NumSamples() stopAfter := numSamples - if isolation != nil { - totalSamples := 0 + if isoState != nil { + totalSamples := 0 // totalSamples in this series. previousSamples := 0 // Samples before this chunk. + for j, d := range s.chunks { totalSamples += d.chunk.NumSamples() if j < ix { previousSamples += d.chunk.NumSamples() } } - writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - s.writeIDCount) - // Iterate over the ring, find the first one that the isolation state says not + + // Removing the extra transactionIDs that are relevant for samples that + // come after this chunk, from the total transactionIDs. + writeIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + c.chunk.NumSamples())) + + // Iterate over the writeIDs, find the first one that the isolation state says not // to return. - pos := s.writeIDFirst - for index := 0; index < writeIdsToConsider; index++ { - writeID := s.writeIDs[pos] - if _, ok := isolation.incompleteWrites[writeID]; ok || writeID > isolation.maxWriteID { - stopAfter = index - (writeIdsToConsider - c.chunk.NumSamples()) + it := s.txs.iterator() + for index := 0; index < writeIDsToConsider; index++ { + writeID := it.At() + if _, ok := isoState.incompleteWrites[writeID]; ok || writeID > isoState.maxWriteID { + // If found limit the number of samples being iterated over. + stopAfter = c.chunk.NumSamples() - (writeIDsToConsider - index) + if stopAfter < 0 { stopAfter = 0 // Stopped in a previous chunk. } break } - pos++ - if pos == len(s.writeIDs) { - pos = 0 - } + + it.Next() } } diff --git a/head_test.go b/head_test.go index 439d4a0c..1b306462 100644 --- a/head_test.go +++ b/head_test.go @@ -757,7 +757,7 @@ func TestMemSeriesIsolation(t *testing.T) { idx, err := hb.Index() testutil.Ok(t, err) - iso := hb.IsolationState() + iso := hb.iso.State() iso.maxWriteID = maxWriteId querier := &blockQuerier{ @@ -860,12 +860,12 @@ func TestHead_Truncate_WriteIDs(t *testing.T) { {minTime: 1000, maxTime: 1999, chunk: chk}, } - s1.writeIDs = []uint64{2, 3, 4, 5, 0, 0, 0, 1} - s1.writeIDFirst = 7 - s1.writeIDCount = 5 + s1.txs.txIDs = []uint64{2, 3, 4, 5, 0, 0, 0, 1} + s1.txs.txIDFirst = 7 + s1.txs.txIDCount = 5 testutil.Ok(t, h.Truncate(1000)) - testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.writeIDs) - testutil.Equals(t, 0, s1.writeIDFirst) - testutil.Equals(t, 3, s1.writeIDCount) + testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.txs.txIDs) + testutil.Equals(t, 0, s1.txs.txIDFirst) + testutil.Equals(t, 3, s1.txs.txIDCount) } diff --git a/isolation.go b/isolation.go new file mode 100644 index 00000000..7c12fdcd --- /dev/null +++ b/isolation.go @@ -0,0 +1,208 @@ +package tsdb + +import "sync" + +// IsolationState holds the isolation information. +type IsolationState struct { + // We will ignore all writes above the max, or that are incomplete. + maxWriteID uint64 + incompleteWrites map[uint64]struct{} + lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId. + isolation *isolation + + // Doubly linked list of active reads. + next *IsolationState + prev *IsolationState +} + +// Close closes the state. +func (i *IsolationState) Close() { + i.isolation.readMtx.Lock() + i.next.prev = i.prev + i.prev.next = i.next + i.isolation.readMtx.Unlock() +} + +// isolation is the global isolation state. +type isolation struct { + // Mutex for accessing writeLastId and writesOpen. + writeMtx sync.Mutex + // Each write is given an internal id. + lastWriteID uint64 + // Which writes are currently in progress. + writesOpen map[uint64]struct{} + // Mutex for accessing readsOpen. + // If taking both writeMtx and readMtx, take writeMtx first. + readMtx sync.Mutex + // All current in use isolationStates. This is a doubly-linked list. + readsOpen *IsolationState +} + +func newIsolation() *isolation { + isoState := &IsolationState{} + isoState.next = isoState + isoState.prev = isoState + + return &isolation{ + writesOpen: map[uint64]struct{}{}, + readsOpen: isoState, + } +} + +// lowWatermark returns the writeId below which +// we no longer need to track which writes were from +// which writeId. +func (i *isolation) lowWatermark() uint64 { + i.writeMtx.Lock() // Take writeMtx first. + defer i.writeMtx.Unlock() + i.readMtx.Lock() + defer i.readMtx.Unlock() + if i.readsOpen.prev == i.readsOpen { + return i.lastWriteID + } + return i.readsOpen.prev.lowWaterMark +} + +// State returns an object used to control isolation +// between a query and writes. Must be closed when complete. +func (i *isolation) State() *IsolationState { + i.writeMtx.Lock() // Take write mutex before read mutex. + defer i.writeMtx.Unlock() + isoState := &IsolationState{ + maxWriteID: i.lastWriteID, + lowWaterMark: i.lastWriteID, + incompleteWrites: make(map[uint64]struct{}, len(i.writesOpen)), + isolation: i, + } + for k := range i.writesOpen { + isoState.incompleteWrites[k] = struct{}{} + if k < isoState.lowWaterMark { + isoState.lowWaterMark = k + } + } + + i.readMtx.Lock() + defer i.readMtx.Unlock() + isoState.prev = i.readsOpen + isoState.next = i.readsOpen.next + i.readsOpen.next.prev = isoState + i.readsOpen.next = isoState + return isoState +} + +// newWriteID increments the transaction counter and returns a new transaction ID. +func (i *isolation) newWriteID() uint64 { + i.writeMtx.Lock() + i.lastWriteID++ + writeID := i.lastWriteID + i.writesOpen[writeID] = struct{}{} + i.writeMtx.Unlock() + + return writeID +} + +func (i *isolation) closeWrite(writeID uint64) { + i.writeMtx.Lock() + delete(i.writesOpen, writeID) + i.writeMtx.Unlock() +} + +// The transactionID ring buffer. +type txRing struct { + txIDs []uint64 + txIDFirst int // Position of the first id in the ring. + txIDCount int // How many ids in the ring. +} + +func newTxRing(cap int) *txRing { + return &txRing{ + txIDs: make([]uint64, cap), + } +} + +func (txr *txRing) add(writeID uint64) { + if txr.txIDCount == len(txr.txIDs) { + // Ring buffer is full, expand by doubling. + newRing := make([]uint64, txr.txIDCount*2) + idx := copy(newRing[:], txr.txIDs[txr.txIDFirst%len(txr.txIDs):]) + copy(newRing[idx:], txr.txIDs[:txr.txIDFirst%len(txr.txIDs)]) + txr.txIDs = newRing + txr.txIDFirst = 0 + } + + txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = writeID + txr.txIDCount++ +} + +func (txr *txRing) cleanupWriteIDsBelow(bound uint64) { + pos := txr.txIDFirst + + for txr.txIDCount > 0 { + if txr.txIDs[pos] < bound { + txr.txIDFirst++ + txr.txIDCount-- + } else { + break + } + + pos++ + if pos == len(txr.txIDs) { + pos = 0 + } + } + + txr.txIDFirst = txr.txIDFirst % len(txr.txIDs) +} + +// cutoffN will only keep the latest N transactions in the ring. +// Will also downsize the ring if possible. +func (txr *txRing) cutoffN(n int) { + if txr.txIDCount <= n { + return + } + + txr.txIDFirst += (txr.txIDCount - n) + txr.txIDCount = n + + newBufSize := len(txr.txIDs) + for n < newBufSize/2 { + newBufSize = newBufSize / 2 + } + + if newBufSize == len(txr.txIDs) { + return + } + + newRing := make([]uint64, newBufSize) + idx := copy(newRing[:], txr.txIDs[txr.txIDFirst%len(txr.txIDs):]) + copy(newRing[idx:], txr.txIDs[:txr.txIDFirst%len(txr.txIDs)]) + + txr.txIDs = newRing + txr.txIDFirst = 0 +} + +func (txr *txRing) iterator() *txRingIterator { + return &txRingIterator{ + pos: txr.txIDFirst % len(txr.txIDs), + ids: txr.txIDs, + } +} + +// txRingIterator lets you iterate over the ring. It doesn't terminate, +// it DOESNT terminate. +type txRingIterator struct { + ids []uint64 + + pos int +} + +func (it *txRingIterator) At() uint64 { + return it.ids[it.pos] +} + +func (it *txRingIterator) Next() { + it.pos++ + if it.pos == len(it.ids) { + it.pos = 0 + } +} diff --git a/querier.go b/querier.go index ac99dfa2..9acb8486 100644 --- a/querier.go +++ b/querier.go @@ -57,40 +57,6 @@ type querier struct { blocks []Querier } -// IsolationState returns an object used to control isolation -// between a query and writes. Must be closed when complete. -func (h *Head) IsolationState() *IsolationState { - h.writeMtx.Lock() // Take write mutex before read mutex. - defer h.writeMtx.Unlock() - isolation := &IsolationState{ - maxWriteID: h.lastWriteID, - lowWaterMark: h.lastWriteID, - incompleteWrites: make(map[uint64]struct{}, len(h.writesOpen)), - head: h, - } - for k := range h.writesOpen { - isolation.incompleteWrites[k] = struct{}{} - if k < isolation.lowWaterMark { - isolation.lowWaterMark = k - } - } - - h.readMtx.Lock() - defer h.readMtx.Unlock() - isolation.prev = h.readsOpen - isolation.next = h.readsOpen.next - h.readsOpen.next.prev = isolation - h.readsOpen.next = isolation - return isolation -} - -func (i *IsolationState) Close() { - i.head.readMtx.Lock() - i.next.prev = i.prev - i.prev.next = i.next - i.head.readMtx.Unlock() -} - func (q *querier) LabelValues(n string) ([]string, error) { return q.lvals(q.blocks, n) }