Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit d8db95c

Browse files
committed
Refactor to have no impact on interfaces
Signed-off-by: Goutham Veeramachaneni <[email protected]>
1 parent f7c4253 commit d8db95c

File tree

7 files changed

+186
-181
lines changed

7 files changed

+186
-181
lines changed

block.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,24 +124,19 @@ type BlockReader interface {
124124
Index() (IndexReader, error)
125125

126126
// Chunks returns a ChunkReader over the block's data.
127-
Chunks(*IsolationState) (ChunkReader, error)
127+
Chunks() (ChunkReader, error)
128128

129129
// Tombstones returns a TombstoneReader over the block's deleted data.
130130
Tombstones() (TombstoneReader, error)
131131
}
132132

133-
// Appendable defines an entity to which data can be appended.
134-
type Appendable interface {
135-
// Appender returns a new Appender against an underlying store.
136-
Appender(writeId, cleanupWriteIdsBelow uint64) Appender
137-
}
138-
133+
// IsolationState holds the isolation information.
139134
type IsolationState struct {
140135
// We will ignore all writes above the max, or that are incomplete.
141-
maxWriteId uint64
136+
maxWriteID uint64
142137
incompleteWrites map[uint64]struct{}
143138
lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId.
144-
db *DB
139+
head *Head
145140

146141
// Doubly linked list of active reads.
147142
next *IsolationState
@@ -337,7 +332,7 @@ func (pb *Block) Index() (IndexReader, error) {
337332
}
338333

339334
// Chunks returns a new ChunkReader against the block data.
340-
func (pb *Block) Chunks(_ *IsolationState) (ChunkReader, error) {
335+
func (pb *Block) Chunks() (ChunkReader, error) {
341336
if err := pb.startRead(); err != nil {
342337
return nil, err
343338
}

compact.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
521521
}
522522
closers = append(closers, indexr)
523523

524-
chunkr, err := b.Chunks(nil)
524+
chunkr, err := b.Chunks()
525525
if err != nil {
526526
return errors.Wrapf(err, "open chunk reader for block %s", b)
527527
}

compact_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,6 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {
398398

399399
type erringBReader struct{}
400400

401-
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
402-
func (erringBReader) Chunks(*IsolationState) (ChunkReader, error) { return nil, errors.New("chunks") }
403-
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }
401+
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
402+
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
403+
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }

db.go

Lines changed: 4 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -115,24 +115,10 @@ type DB struct {
115115
// cmtx is used to control compactions and deletions.
116116
cmtx sync.Mutex
117117
compactionsEnabled bool
118-
119-
// Mutex for accessing writeLastId and writesOpen.
120-
writeMtx sync.Mutex
121-
// Each write is given an internal id.
122-
writeLastId uint64
123-
// Which writes are currently in progress.
124-
writesOpen map[uint64]struct{}
125-
// Mutex for accessing readLastId.
126-
// If taking both writeMtx and readMtx, take writeMtx first.
127-
readMtx sync.Mutex
128-
// All current in use isolationStates. This is a doubly-linked list.
129-
readsOpen *IsolationState
130118
}
131119

132120
type dbMetrics struct {
133121
loadedBlocks prometheus.GaugeFunc
134-
lowWatermark prometheus.GaugeFunc
135-
highWatermark prometheus.GaugeFunc
136122
reloads prometheus.Counter
137123
reloadsFailed prometheus.Counter
138124
compactionsTriggered prometheus.Counter
@@ -152,20 +138,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
152138
defer db.mtx.RUnlock()
153139
return float64(len(db.blocks))
154140
})
155-
m.lowWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
156-
Name: "tsdb_isolation_low_watermark",
157-
Help: "The lowest write id that is still referenced.",
158-
}, func() float64 {
159-
return float64(db.readLowWatermark())
160-
})
161-
m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
162-
Name: "tsdb_isolation_high_watermark",
163-
Help: "The highest write id that has been given out.",
164-
}, func() float64 {
165-
db.writeMtx.Lock()
166-
defer db.writeMtx.Unlock()
167-
return float64(db.writeLastId)
168-
})
169141
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
170142
Name: "prometheus_tsdb_reloads_total",
171143
Help: "Number of times the database reloaded block data from disk.",
@@ -194,8 +166,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
194166
if r != nil {
195167
r.MustRegister(
196168
m.loadedBlocks,
197-
m.lowWatermark,
198-
m.highWatermark,
199169
m.reloads,
200170
m.reloadsFailed,
201171
m.cutoffs,
@@ -223,16 +193,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
223193
return nil, err
224194
}
225195

226-
head := &IsolationState{}
227-
head.next = head
228-
head.prev = head
229-
230196
db = &DB{
231197
dir: dir,
232198
logger: l,
233199
opts: opts,
234-
writesOpen: map[uint64]struct{}{},
235-
readsOpen: head,
236200
compactc: make(chan struct{}, 1),
237201
donec: make(chan struct{}),
238202
stopc: make(chan struct{}),
@@ -370,22 +334,14 @@ func (db *DB) retentionCutoff() (b bool, err error) {
370334

371335
// Appender opens a new appender against the database.
372336
func (db *DB) Appender() Appender {
373-
db.writeMtx.Lock()
374-
db.writeLastId++
375-
id := db.writeLastId
376-
db.writesOpen[id] = struct{}{}
377-
db.writeMtx.Unlock()
378-
379-
return dbAppender{db: db, Appender: db.head.Appender(id, db.readLowWatermark()), writeId: id}
337+
return dbAppender{db: db, Appender: db.head.Appender()}
380338
}
381339

382340
// dbAppender wraps the DB's head appender and triggers compactions on commit
383341
// if necessary.
384342
type dbAppender struct {
385343
Appender
386344
db *DB
387-
388-
writeId uint64
389345
}
390346

391347
func (a dbAppender) Commit() error {
@@ -400,10 +356,6 @@ func (a dbAppender) Commit() error {
400356
}
401357
}
402358

403-
a.db.writeMtx.Lock()
404-
delete(a.db.writesOpen, a.writeId)
405-
a.db.writeMtx.Unlock()
406-
407359
return err
408360
}
409361

@@ -720,16 +672,6 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
720672
db.mtx.RLock()
721673
defer db.mtx.RUnlock()
722674

723-
db.writeMtx.Lock()
724-
isolation := &IsolationState{
725-
maxWriteId: db.writeLastId,
726-
incompleteWrites: make(map[uint64]struct{}, len(db.writesOpen)),
727-
}
728-
for k, _ := range db.writesOpen {
729-
isolation.incompleteWrites[k] = struct{}{}
730-
}
731-
db.writeMtx.Unlock()
732-
733675
for _, b := range db.blocks {
734676
m := b.Meta()
735677
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
@@ -741,13 +683,12 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
741683
}
742684

743685
sq := &querier{
744-
blocks: make([]Querier, 0, len(blocks)),
745-
db: db,
746-
isolation: db.IsolationState(),
686+
blocks: make([]Querier, 0, len(blocks)),
687+
db: db,
747688
}
748689

749690
for _, b := range blocks {
750-
q, err := NewBlockQuerier(b, mint, maxt, isolation)
691+
q, err := NewBlockQuerier(b, mint, maxt)
751692
if err == nil {
752693
sq.blocks = append(sq.blocks, q)
753694
continue
@@ -766,20 +707,6 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
766707
return mint, mint + width
767708
}
768709

769-
// readLowWatermark returns the writeId below which
770-
// we no longer need to track which writes were from
771-
// which writeId.
772-
func (db *DB) readLowWatermark() uint64 {
773-
db.writeMtx.Lock() // Take writeMtx first.
774-
defer db.writeMtx.Unlock()
775-
db.readMtx.Lock()
776-
defer db.readMtx.Unlock()
777-
if db.readsOpen.prev == db.readsOpen {
778-
return db.writeLastId
779-
}
780-
return db.readsOpen.prev.lowWaterMark
781-
}
782-
783710
// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
784711
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
785712
db.cmtx.Lock()

0 commit comments

Comments
 (0)