Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 414ad68

Browse files
author
Fabian Reinartz
committed
Migrate write ahead log
On startup, rewrite the old write ahead log into the new format once. Signed-off-by: Fabian Reinartz <[email protected]>
1 parent 4a5744b commit 414ad68

File tree

4 files changed

+193
-0
lines changed

4 files changed

+193
-0
lines changed

db.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
193193
if err := repairBadIndexVersion(l, dir); err != nil {
194194
return nil, err
195195
}
196+
// Migrate old WAL.
197+
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
198+
return nil, errors.Wrap(err, "migrate WAL")
199+
}
196200

197201
db = &DB{
198202
dir: dir,

head.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,8 @@ func (h *Head) Init() error {
392392
if err == nil {
393393
return nil
394394
}
395+
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
396+
395397
if err := h.wal.Repair(err); err != nil {
396398
return errors.Wrap(err, "repair corrupted WAL")
397399
}

wal.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/prometheus/client_golang/prometheus"
3434
"github.com/prometheus/tsdb/fileutil"
3535
"github.com/prometheus/tsdb/labels"
36+
"github.com/prometheus/tsdb/wal"
3637
)
3738

3839
// WALEntryType indicates what data a WAL entry contains.
@@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
8283

8384
// WAL is a write ahead log that can log new series labels and samples.
8485
// It must be completely read before new entries are logged.
86+
//
87+
// DEPRECATED: use wal pkg combined with the record coders instead.
8588
type WAL interface {
8689
Reader() WALReader
8790
LogSeries([]RefSeries) error
@@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 {
173176
}
174177

175178
// SegmentWAL is a write ahead log for series data.
179+
//
180+
// DEPRECATED: use wal pkg combined with the record coders instead.
176181
type SegmentWAL struct {
177182
mtx sync.Mutex
178183
metrics *walMetrics
@@ -1206,3 +1211,86 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
12061211
}
12071212
return nil
12081213
}
1214+
1215+
// MigrateWAL rewrites the deprecated write ahead log into the new format.
1216+
func MigrateWAL(logger log.Logger, dir string) error {
1217+
// Detect whether we still have the old WAL.
1218+
fns, err := sequenceFiles(dir)
1219+
if err != nil && !os.IsNotExist(err) {
1220+
return errors.Wrap(err, "list sequence files")
1221+
}
1222+
if len(fns) == 0 {
1223+
return nil // No WAL at all yet.
1224+
}
1225+
// Check header of first segment.
1226+
f, err := os.Open(fns[0])
1227+
if err != nil {
1228+
return errors.Wrap(err, "check first existing segment")
1229+
}
1230+
var hdr [4]byte
1231+
if n, err := f.Read(hdr[:]); err != nil {
1232+
return errors.Wrap(err, "read header from first segment")
1233+
} else if n != 4 {
1234+
return errors.New("could not read full header from segment")
1235+
}
1236+
if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
1237+
return nil // Not the old WAL anymore.
1238+
}
1239+
1240+
level.Info(logger).Log("msg", "migrating WAL format")
1241+
1242+
tmpdir := dir + ".tmp"
1243+
if err := os.RemoveAll(tmpdir); err != nil {
1244+
return errors.Wrap(err, "cleanup replacement dir")
1245+
}
1246+
repl, err := wal.New(logger, nil, tmpdir)
1247+
if err != nil {
1248+
return errors.Wrap(err, "open new WAL")
1249+
}
1250+
w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
1251+
if err != nil {
1252+
return errors.Wrap(err, "open old WAL")
1253+
}
1254+
rdr := w.Reader()
1255+
1256+
var (
1257+
enc RecordEncoder
1258+
b []byte
1259+
)
1260+
decErr := rdr.Read(
1261+
func(s []RefSeries) {
1262+
if err != nil {
1263+
return
1264+
}
1265+
err = repl.Log(enc.Series(s, b[:0]))
1266+
},
1267+
func(s []RefSample) {
1268+
if err != nil {
1269+
return
1270+
}
1271+
err = repl.Log(enc.Samples(s, b[:0]))
1272+
},
1273+
func(s []Stone) {
1274+
if err != nil {
1275+
return
1276+
}
1277+
err = repl.Log(enc.Tombstones(s, b[:0]))
1278+
},
1279+
)
1280+
if decErr != nil {
1281+
return errors.Wrap(err, "decode old entries")
1282+
}
1283+
if err != nil {
1284+
return errors.Wrap(err, "write new entries")
1285+
}
1286+
if err := w.Close(); err != nil {
1287+
return errors.Wrap(err, "close old WAL")
1288+
}
1289+
if err := repl.Close(); err != nil {
1290+
return errors.Wrap(err, "close new WAL")
1291+
}
1292+
if err := fileutil.Rename(tmpdir, dir); err != nil {
1293+
return errors.Wrap(err, "replace old WAL")
1294+
}
1295+
return nil
1296+
}

wal_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ import (
1919
"io/ioutil"
2020
"math/rand"
2121
"os"
22+
"path"
2223
"testing"
2324
"time"
2425

2526
"github.com/go-kit/kit/log"
2627
"github.com/prometheus/tsdb/fileutil"
2728
"github.com/prometheus/tsdb/labels"
2829
"github.com/prometheus/tsdb/testutil"
30+
"github.com/prometheus/tsdb/wal"
2931
)
3032

3133
func TestSegmentWAL_cut(t *testing.T) {
@@ -431,3 +433,100 @@ func TestWALRestoreCorrupted(t *testing.T) {
431433
})
432434
}
433435
}
436+
437+
func TestMigrateWAL_Fuzz(t *testing.T) {
438+
dir, err := ioutil.TempDir("", "walmigrate")
439+
testutil.Ok(t, err)
440+
defer os.RemoveAll(dir)
441+
442+
wdir := path.Join(dir, "wal")
443+
444+
// Should pass if no WAL exists yet.
445+
testutil.Ok(t, MigrateWAL(nil, wdir))
446+
447+
oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil)
448+
testutil.Ok(t, err)
449+
450+
// Write some data.
451+
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
452+
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
453+
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
454+
}))
455+
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
456+
{Ref: 1, T: 100, V: 200},
457+
{Ref: 2, T: 300, V: 400},
458+
}))
459+
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
460+
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
461+
}))
462+
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
463+
{Ref: 3, T: 100, V: 200},
464+
{Ref: 4, T: 300, V: 400},
465+
}))
466+
testutil.Ok(t, oldWAL.LogDeletes([]Stone{
467+
{ref: 1, intervals: []Interval{{100, 200}}},
468+
}))
469+
470+
testutil.Ok(t, oldWAL.Close())
471+
472+
// Perform migration.
473+
testutil.Ok(t, MigrateWAL(nil, wdir))
474+
475+
w, err := wal.New(nil, nil, wdir)
476+
testutil.Ok(t, err)
477+
478+
// We can properly write some new data after migration.
479+
var enc RecordEncoder
480+
testutil.Ok(t, w.Log(enc.Samples([]RefSample{
481+
{Ref: 500, T: 1, V: 1},
482+
}, nil)))
483+
484+
testutil.Ok(t, w.Close())
485+
486+
// Read back all data.
487+
sr, err := wal.NewSegmentsReader(wdir)
488+
testutil.Ok(t, err)
489+
490+
r := wal.NewReader(sr)
491+
var res []interface{}
492+
var dec RecordDecoder
493+
494+
for r.Next() {
495+
rec := r.Record()
496+
497+
switch dec.Type(rec) {
498+
case RecordSeries:
499+
s, err := dec.Series(rec, nil)
500+
testutil.Ok(t, err)
501+
res = append(res, s)
502+
case RecordSamples:
503+
s, err := dec.Samples(rec, nil)
504+
testutil.Ok(t, err)
505+
res = append(res, s)
506+
case RecordTombstones:
507+
s, err := dec.Tombstones(rec, nil)
508+
testutil.Ok(t, err)
509+
res = append(res, s)
510+
default:
511+
t.Fatalf("unknown record type %d", dec.Type(rec))
512+
}
513+
}
514+
testutil.Ok(t, r.Err())
515+
516+
testutil.Equals(t, []interface{}{
517+
[]RefSeries{
518+
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
519+
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
520+
},
521+
[]RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}},
522+
[]RefSeries{
523+
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
524+
},
525+
[]RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}},
526+
[]Stone{{ref: 1, intervals: []Interval{{100, 200}}}},
527+
[]RefSample{{Ref: 500, T: 1, V: 1}},
528+
}, res)
529+
530+
// Migrating an already migrated WAL shouldn't do anything.
531+
testutil.Ok(t, MigrateWAL(nil, wdir))
532+
}

0 commit comments

Comments
 (0)