Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.
Closed
28 changes: 20 additions & 8 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type DiskBlock interface {
Index() IndexReader

// Chunks returns a ChunkReader over the block's data.
Chunks() ChunkReader
Chunks(*IsolationState) ChunkReader

// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() TombstoneReader
Expand Down Expand Up @@ -75,12 +75,24 @@ type Snapshottable interface {
// Appendable defines an entity to which data can be appended.
type Appendable interface {
// Appender returns a new Appender against an underlying store.
Appender() Appender
Appender(writeId, cleanupWriteIdsBelow uint64) Appender
}

// Queryable defines an entity which provides a Querier.
type Queryable interface {
Querier(mint, maxt int64) Querier
Querier(mint, maxt int64, isolation *IsolationState) Querier
}

type IsolationState struct {
// We will ignore all writes above the max, or that are incomplete.
maxWriteId uint64
incompleteWrites map[uint64]struct{}
lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId.
db *DB

// Doubly linked list of active reads.
next *IsolationState
prev *IsolationState
}

// BlockMeta provides meta information about a block.
Expand Down Expand Up @@ -224,19 +236,19 @@ func (pb *persistedBlock) String() string {
return pb.meta.ULID.String()
}

func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
func (pb *persistedBlock) Querier(mint, maxt int64, isolation *IsolationState) Querier {
return &blockQuerier{
mint: mint,
maxt: maxt,
index: pb.Index(),
chunks: pb.Chunks(),
chunks: pb.Chunks(nil),
tombstones: pb.Tombstones(),
}
}

func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks(*IsolationState) ChunkReader { return pb.chunkr }
func (pb *persistedBlock) Tombstones() TombstoneReader {
return pb.tombstones
}
Expand Down
2 changes: 1 addition & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
if err != nil {
return nil, err
}
s := newCompactionSeriesSet(b.Index(), b.Chunks(), b.Tombstones(), all)
s := newCompactionSeriesSet(b.Index(), b.Chunks(nil), b.Tombstones(), all)

if i == 0 {
set = s
Expand Down
76 changes: 72 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ type DB struct {
mtx sync.RWMutex
blocks []Block

// Mutex for accessing writeLastId and writesOpen.
writeMtx sync.Mutex
// Each write is given an internal id.
writeLastId uint64
// Which writes are currently in progress.
writesOpen map[uint64]struct{}

// Mutex for accessing readLastId.
// If taking both writeMtx and readMtx, take writeMtx first.
readMtx sync.Mutex
// All current in use isolationStates. This is a doubly-linked list.
readsOpen *IsolationState

// Mutex that must be held when modifying just the head blocks
// or the general layout.
// Must never be held when acquiring a blocks's mutex!
Expand All @@ -129,6 +142,8 @@ type DB struct {
type dbMetrics struct {
activeAppenders prometheus.Gauge
loadedBlocks prometheus.GaugeFunc
lowWatermark prometheus.GaugeFunc
highWatermark prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
reloadDuration prometheus.Summary
Expand All @@ -151,6 +166,20 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
defer db.mtx.RUnlock()
return float64(len(db.blocks))
})
m.lowWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_isolation_low_watermark",
Help: "The lowest write id that is still referenced.",
}, func() float64 {
return float64(db.readLowWatermark())
})
m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_isolation_high_watermark",
Help: "The highest write id that has been given out.",
}, func() float64 {
db.writeMtx.Lock()
defer db.writeMtx.Unlock()
return float64(db.writeLastId)
})
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_reloads_total",
Help: "Number of times the database reloaded block data from disk.",
Expand All @@ -176,6 +205,8 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
r.MustRegister(
m.activeAppenders,
m.loadedBlocks,
m.lowWatermark,
m.highWatermark,
m.reloads,
m.reloadsFailed,
m.reloadDuration,
Expand All @@ -201,10 +232,16 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
opts = DefaultOptions
}

head := &IsolationState{}
head.next = head
head.prev = head

db = &DB{
dir: dir,
logger: l,
opts: opts,
writesOpen: map[uint64]struct{}{},
readsOpen: head,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
Expand Down Expand Up @@ -591,12 +628,20 @@ func (db *DB) Appender() Appender {
db.metrics.activeAppenders.Inc()

db.mtx.RLock()
return &dbAppender{db: db}

db.writeMtx.Lock()
db.writeLastId++
id := db.writeLastId
db.writesOpen[id] = struct{}{}
db.writeMtx.Unlock()
return &dbAppender{db: db, writeId: id, cleanupWriteIdsBelow: db.readLowWatermark()}
}

type dbAppender struct {
db *DB
heads []*metaAppender
db *DB
heads []*metaAppender
writeId uint64
cleanupWriteIdsBelow uint64

samples int
}
Expand Down Expand Up @@ -683,7 +728,7 @@ func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
// Instantiate appender after returning headmtx!
app := &metaAppender{
meta: hb.Meta(),
app: hb.Appender(),
app: hb.Appender(a.writeId, a.cleanupWriteIdsBelow),
}
a.heads = append(a.heads, app)

Expand Down Expand Up @@ -731,9 +776,28 @@ func (db *DB) ensureHead(t int64) error {
return err
}

// readLowWatermark returns the writeId below which
// we no longer need to track which writes were from
// which writeId.
func (db *DB) readLowWatermark() uint64 {
db.writeMtx.Lock() // Take writeMtx first.
defer db.writeMtx.Unlock()
db.readMtx.Lock()
defer db.readMtx.Unlock()
if db.readsOpen.prev == db.readsOpen {
return db.writeLastId
}
return db.readsOpen.prev.lowWaterMark
}

func (a *dbAppender) Commit() error {
defer a.db.metrics.activeAppenders.Dec()
defer a.db.mtx.RUnlock()
defer func() {
a.db.writeMtx.Lock()
delete(a.db.writesOpen, a.writeId)
a.db.writeMtx.Unlock()
}()

// Commits to partial appenders must be concurrent as concurrent appenders
// may have conflicting locks on head appenders.
Expand Down Expand Up @@ -766,6 +830,10 @@ func (a *dbAppender) Rollback() error {
defer a.db.metrics.activeAppenders.Dec()
defer a.db.mtx.RUnlock()

a.db.writeMtx.Lock()
delete(a.db.writesOpen, a.writeId)
a.db.writeMtx.Unlock()

var g errgroup.Group

for _, h := range a.heads {
Expand Down
98 changes: 98 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"strconv"
"testing"

"github.com/pkg/errors"
Expand All @@ -38,6 +39,10 @@ func readSeriesSet(ss SeriesSet) (map[string][]sample, error) {
samples = append(samples, sample{t: t, v: v})
}

if len(samples) == 0 {
continue
}

name := series.Labels().String()
result[name] = samples
if err := ss.Err(); err != nil {
Expand Down Expand Up @@ -216,3 +221,96 @@ Outer:
}
}
}

func TestDBCannotSeePartialCommits(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)

db, err := Open(tmpdir, nil, nil, nil)
require.NoError(t, err)
defer db.Close()

stop := make(chan struct{})
firstInsert := make(chan struct{})

// Insert data in batches.
go func() {
iter := 0
for {
app := db.Appender()

for j := 0; j < 100; j++ {
_, err := app.Add(labels.FromStrings("foo", "bar", "a", strconv.Itoa(j)), int64(iter), float64(iter))
require.NoError(t, err)
}
err = app.Commit()
require.NoError(t, err)

if iter == 0 {
close(firstInsert)
}
iter++

select {
case <-stop:
return
default:
}
}
}()

<-firstInsert

// This is a race condition, so do a few tests to tickle it.
// Usually most will fail.
inconsistencies := 0
for i := 0; i < 10; i++ {
func() {
querier := db.Querier(0, 1000000)
defer querier.Close()

seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar")))
require.NoError(t, err)
values := map[float64]struct{}{}
for _, series := range seriesSet {
values[series[len(series)-1].v] = struct{}{}
}
if len(values) != 1 {
inconsistencies++
}
}()
}
stop <- struct{}{}

require.Equal(t, 0, inconsistencies, "Some queries saw inconsistent results.")
}

func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)

db, err := Open(tmpdir, nil, nil, nil)
require.NoError(t, err)
defer db.Close()

querier := db.Querier(0, 1000000)
defer querier.Close()

app := db.Appender()
_, err = app.Add(labels.FromStrings("foo", "bar"), 0, 0)
require.NoError(t, err)
// This commit is after the querier is created, so should not be returned.
err = app.Commit()
require.NoError(t, err)

seriesSet, err := readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar")))
require.NoError(t, err)
require.Equal(t, map[string][]sample{}, seriesSet)

querier = db.Querier(0, 1000000)
defer querier.Close()
seriesSet, err = readSeriesSet(querier.Select(labels.NewEqualMatcher("foo", "bar")))
require.NoError(t, err)
require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}})

}
Loading