Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit eb93fec

Browse files
brian-brazilgouthamve
authored andcommitted
Use a double linked list to track active reads
1 parent d75c23e commit eb93fec

File tree

3 files changed

+32
-32
lines changed

3 files changed

+32
-32
lines changed

block.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,17 +134,18 @@ type BlockReader interface {
134134
type Appendable interface {
135135
// Appender returns a new Appender against an underlying store.
136136
Appender(writeId, cleanupWriteIdsBelow uint64) Appender
137-
138-
// Busy returns whether there are any currently active appenders.
139-
Busy() bool
140137
}
141138

142139
type IsolationState struct {
143140
// We will ignore all writes above the max, or that are incomplete.
144141
maxWriteId uint64
145142
incompleteWrites map[uint64]struct{}
146-
readId uint64
143+
lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId.
147144
db *DB
145+
146+
// Doubly linked list of active reads.
147+
next *IsolationState
148+
prev *IsolationState
148149
}
149150

150151
// BlockMeta provides meta information about a block.

db.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,10 @@ type DB struct {
123123
// Which writes are currently in progress.
124124
writesOpen map[uint64]struct{}
125125
// Mutex for accessing readLastId.
126+
// If taking both writeMtx and readMtx, take writeMtx first.
126127
readMtx sync.Mutex
127-
// Each isolated read is given an internal id.
128-
readLastId uint64
129-
// All current in use isolationStates.
130-
readsOpen map[uint64]*IsolationState
128+
// All current in use isolationStates. This is a doubly-linked list.
129+
readsOpen *IsolationState
131130
}
132131

133132
type dbMetrics struct {
@@ -206,12 +205,16 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
206205
return nil, err
207206
}
208207

208+
head := &IsolationState{}
209+
head.next = head
210+
head.prev = head
211+
209212
db = &DB{
210213
dir: dir,
211214
logger: l,
212215
opts: opts,
213216
writesOpen: map[uint64]struct{}{},
214-
readsOpen: map[uint64]*IsolationState{},
217+
readsOpen: head,
215218
compactc: make(chan struct{}, 1),
216219
donec: make(chan struct{}),
217220
stopc: make(chan struct{}),
@@ -748,25 +751,15 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
748751
// readLowWatermark returns the writeId below which
749752
// we no longer need to track which writes were from
750753
// which writeId.
751-
// TODO: Optimise this, needs to be O(1).
752754
func (db *DB) readLowWatermark() uint64 {
753-
db.writeMtx.Lock()
754-
id := db.writeLastId
755-
db.writeMtx.Unlock()
756-
755+
db.writeMtx.Lock() // Take writeMtx first.
756+
defer db.writeMtx.Unlock()
757757
db.readMtx.Lock()
758-
for _, isolation := range db.readsOpen {
759-
if isolation.maxWriteId < id {
760-
id = isolation.maxWriteId
761-
}
762-
for i := range isolation.incompleteWrites {
763-
if i < id {
764-
id = i
765-
}
766-
}
758+
defer db.readMtx.Unlock()
759+
if db.readsOpen.prev == db.readsOpen {
760+
return db.writeLastId
767761
}
768-
db.readMtx.Unlock()
769-
return id
762+
return db.readsOpen.prev.lowWaterMark
770763
}
771764

772765
// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.

querier.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,28 +61,34 @@ type querier struct {
6161
// IsolationState returns an objet used to control isolation
6262
// between a query and writes. Must be closed when complete.
6363
func (s *DB) IsolationState() *IsolationState {
64-
s.writeMtx.Lock()
64+
s.writeMtx.Lock() // Take write mutex before read mutex.
65+
defer s.writeMtx.Unlock()
6566
isolation := &IsolationState{
6667
maxWriteId: s.writeLastId,
68+
lowWaterMark: s.writeLastId,
6769
incompleteWrites: make(map[uint64]struct{}, len(s.writesOpen)),
6870
db: s,
6971
}
7072
for k, _ := range s.writesOpen {
7173
isolation.incompleteWrites[k] = struct{}{}
74+
if k < isolation.lowWaterMark {
75+
isolation.lowWaterMark = k
76+
}
7277
}
73-
s.writeMtx.Unlock()
7478

7579
s.readMtx.Lock()
76-
isolation.readId = s.readLastId
77-
s.readLastId++
78-
s.readsOpen[isolation.readId] = isolation
79-
s.readMtx.Unlock()
80+
defer s.readMtx.Unlock()
81+
isolation.prev = s.readsOpen
82+
isolation.next = s.readsOpen.next
83+
s.readsOpen.next.prev = isolation
84+
s.readsOpen.next = isolation
8085
return isolation
8186
}
8287

8388
func (i *IsolationState) Close() {
8489
i.db.readMtx.Lock()
85-
delete(i.db.readsOpen, i.readId)
90+
i.next.prev = i.prev
91+
i.prev.next = i.next
8692
i.db.readMtx.Unlock()
8793
}
8894

0 commit comments

Comments
 (0)