Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 149c5dc

Browse files
authored
Handle multiple refs for the same series when WAL reading. (#623)
This can happen if a given series is created/truncated/recreated. Signed-off-by: Brian Brazil <[email protected]>
1 parent 731ed22 commit 149c5dc

File tree

2 files changed

+70
-10
lines changed

2 files changed

+70
-10
lines changed

head.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
313313
}
314314
}
315315

316-
func (h *Head) loadWAL(r *wal.Reader) error {
316+
func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) error {
317317
// Track number of samples that referenced a series we don't know about
318318
// for error reporting.
319319
var unknownRefs uint64
@@ -322,10 +322,11 @@ func (h *Head) loadWAL(r *wal.Reader) error {
322322
// They are connected through a ring of channels which ensures that all sample batches
323323
// read from the WAL are processed in order.
324324
var (
325-
wg sync.WaitGroup
326-
n = runtime.GOMAXPROCS(0)
327-
inputs = make([]chan []RefSample, n)
328-
outputs = make([]chan []RefSample, n)
325+
wg sync.WaitGroup
326+
multiRefLock sync.Mutex
327+
n = runtime.GOMAXPROCS(0)
328+
inputs = make([]chan []RefSample, n)
329+
outputs = make([]chan []RefSample, n)
329330
)
330331
wg.Add(n)
331332

@@ -364,7 +365,14 @@ func (h *Head) loadWAL(r *wal.Reader) error {
364365
}
365366
}
366367
for _, s := range series {
367-
h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
368+
series, created := h.getOrCreateWithID(s.Ref, s.Labels.Hash(), s.Labels)
369+
370+
if !created {
371+
// There's already a different ref for this series.
372+
multiRefLock.Lock()
373+
multiRef[s.Ref] = series.ref
374+
multiRefLock.Unlock()
375+
}
368376

369377
if h.lastSeriesID < s.Ref {
370378
h.lastSeriesID = s.Ref
@@ -399,6 +407,9 @@ func (h *Head) loadWAL(r *wal.Reader) error {
399407
shards[i] = buf[:0]
400408
}
401409
for _, sam := range samples[:m] {
410+
if r, ok := multiRef[sam.Ref]; ok {
411+
sam.Ref = r
412+
}
402413
mod := sam.Ref % uint64(n)
403414
shards[mod] = append(shards[mod], sam)
404415
}
@@ -478,6 +489,7 @@ func (h *Head) Init(minValidTime int64) error {
478489
if err != nil && err != ErrNotFound {
479490
return errors.Wrap(err, "find last checkpoint")
480491
}
492+
multiRef := map[uint64]uint64{}
481493
if err == nil {
482494
sr, err := wal.NewSegmentsReader(dir)
483495
if err != nil {
@@ -487,7 +499,7 @@ func (h *Head) Init(minValidTime int64) error {
487499

488500
// A corrupted checkpoint is a hard error for now and requires user
489501
// intervention. There's likely little data that can be recovered anyway.
490-
if err := h.loadWAL(wal.NewReader(sr)); err != nil {
502+
if err := h.loadWAL(wal.NewReader(sr), multiRef); err != nil {
491503
return errors.Wrap(err, "backfill checkpoint")
492504
}
493505
startFrom++
@@ -507,7 +519,7 @@ func (h *Head) Init(minValidTime int64) error {
507519
}
508520

509521
sr := wal.NewSegmentBufReader(s)
510-
err = h.loadWAL(wal.NewReader(sr))
522+
err = h.loadWAL(wal.NewReader(sr), multiRef)
511523
sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows.
512524
if err == nil {
513525
continue

head_test.go

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,13 @@ func TestHead_ReadWAL(t *testing.T) {
109109
},
110110
[]RefSeries{
111111
{Ref: 50, Labels: labels.FromStrings("a", "4")},
112+
// This series has two refs pointing to it.
113+
{Ref: 101, Labels: labels.FromStrings("a", "3")},
112114
},
113115
[]RefSample{
114116
{Ref: 10, T: 101, V: 5},
115117
{Ref: 50, T: 101, V: 6},
118+
{Ref: 101, T: 101, V: 7},
116119
},
117120
[]Stone{
118121
{ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}},
@@ -133,7 +136,7 @@ func TestHead_ReadWAL(t *testing.T) {
133136
testutil.Ok(t, err)
134137

135138
testutil.Ok(t, head.Init(math.MinInt64))
136-
testutil.Equals(t, uint64(100), head.lastSeriesID)
139+
testutil.Equals(t, uint64(101), head.lastSeriesID)
137140

138141
s10 := head.series.getByID(10)
139142
s11 := head.series.getByID(11)
@@ -156,7 +159,52 @@ func TestHead_ReadWAL(t *testing.T) {
156159

157160
testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
158161
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
159-
testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
162+
testutil.Equals(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0)))
163+
}
164+
165+
func TestHead_WALMultiRef(t *testing.T) {
166+
dir, err := ioutil.TempDir("", "test_wal_multi_ref")
167+
testutil.Ok(t, err)
168+
defer func() {
169+
testutil.Ok(t, os.RemoveAll(dir))
170+
}()
171+
172+
w, err := wal.New(nil, nil, dir)
173+
testutil.Ok(t, err)
174+
175+
head, err := NewHead(nil, nil, w, 1000)
176+
testutil.Ok(t, err)
177+
178+
testutil.Ok(t, head.Init(0))
179+
app := head.Appender()
180+
ref1, err := app.Add(labels.FromStrings("foo", "bar"), 100, 1)
181+
testutil.Ok(t, err)
182+
testutil.Ok(t, app.Commit())
183+
184+
testutil.Ok(t, head.Truncate(200))
185+
186+
app = head.Appender()
187+
ref2, err := app.Add(labels.FromStrings("foo", "bar"), 300, 2)
188+
testutil.Ok(t, err)
189+
testutil.Ok(t, app.Commit())
190+
191+
if ref1 == ref2 {
192+
t.Fatal("Refs are the same")
193+
}
194+
testutil.Ok(t, head.Close())
195+
196+
w, err = wal.New(nil, nil, dir)
197+
testutil.Ok(t, err)
198+
199+
head, err = NewHead(nil, nil, w, 1000)
200+
testutil.Ok(t, err)
201+
testutil.Ok(t, head.Init(0))
202+
defer head.Close()
203+
204+
q, err := NewBlockQuerier(head, 0, 300)
205+
testutil.Ok(t, err)
206+
series := query(t, q, labels.NewEqualMatcher("foo", "bar"))
207+
testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{100, 1}, sample{300, 2}}}, series)
160208
}
161209

162210
func TestHead_Truncate(t *testing.T) {

0 commit comments

Comments
 (0)