Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 7699051

Browse files
authored
Merge pull request #340 from prometheus/wal_migrate
Migrate write ahead log
2 parents a9a8fab + ee7ee05 commit 7699051

File tree

5 files changed

+225
-1
lines changed

5 files changed

+225
-1
lines changed

db.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
192192
if err := repairBadIndexVersion(l, dir); err != nil {
193193
return nil, err
194194
}
195+
// Migrate old WAL.
196+
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
197+
return nil, errors.Wrap(err, "migrate WAL")
198+
}
195199

196200
db = &DB{
197201
dir: dir,

head.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ func (h *Head) Init() error {
412412
if err == nil {
413413
return nil
414414
}
415+
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)
416+
415417
if err := h.wal.Repair(err); err != nil {
416418
return errors.Wrap(err, "repair corrupted WAL")
417419
}

repair_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
7676
}
7777

7878
// On DB opening all blocks in the base dir should be repaired.
79-
db, _ := Open("testdata/repair_index_version", nil, nil, nil)
79+
db, err := Open("testdata/repair_index_version", nil, nil, nil)
8080
if err != nil {
8181
t.Fatal(err)
8282
}

wal.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/prometheus/client_golang/prometheus"
3434
"github.com/prometheus/tsdb/fileutil"
3535
"github.com/prometheus/tsdb/labels"
36+
"github.com/prometheus/tsdb/wal"
3637
)
3738

3839
// WALEntryType indicates what data a WAL entry contains.
@@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
8283

8384
// WAL is a write ahead log that can log new series labels and samples.
8485
// It must be completely read before new entries are logged.
86+
//
87+
// DEPRECATED: use wal pkg combined with the record codex instead.
8588
type WAL interface {
8689
Reader() WALReader
8790
LogSeries([]RefSeries) error
@@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 {
173176
}
174177

175178
// SegmentWAL is a write ahead log for series data.
179+
//
180+
// DEPRECATED: use wal pkg combined with the record coders instead.
176181
type SegmentWAL struct {
177182
mtx sync.Mutex
178183
metrics *walMetrics
@@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
12061211
}
12071212
return nil
12081213
}
1214+
1215+
// MigrateWAL rewrites the deprecated write ahead log into the new format.
1216+
func MigrateWAL(logger log.Logger, dir string) (err error) {
1217+
if logger == nil {
1218+
logger = log.NewNopLogger()
1219+
}
1220+
// Detect whether we still have the old WAL.
1221+
fns, err := sequenceFiles(dir)
1222+
if err != nil && !os.IsNotExist(err) {
1223+
return errors.Wrap(err, "list sequence files")
1224+
}
1225+
if len(fns) == 0 {
1226+
return nil // No WAL at all yet.
1227+
}
1228+
// Check header of first segment to see whether we are still dealing with an
1229+
// old WAL.
1230+
f, err := os.Open(fns[0])
1231+
if err != nil {
1232+
return errors.Wrap(err, "check first existing segment")
1233+
}
1234+
defer f.Close()
1235+
1236+
var hdr [4]byte
1237+
if _, err := f.Read(hdr[:]); err != nil && err != io.EOF {
1238+
return errors.Wrap(err, "read header from first segment")
1239+
}
1240+
// If we cannot read the magic header for segments of the old WAL, abort.
1241+
// Either it's migrated already or there's a corruption issue with which
1242+
// we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case.
1243+
if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
1244+
return nil
1245+
}
1246+
1247+
level.Info(logger).Log("msg", "migrating WAL format")
1248+
1249+
tmpdir := dir + ".tmp"
1250+
if err := os.RemoveAll(tmpdir); err != nil {
1251+
return errors.Wrap(err, "cleanup replacement dir")
1252+
}
1253+
repl, err := wal.New(logger, nil, tmpdir)
1254+
if err != nil {
1255+
return errors.Wrap(err, "open new WAL")
1256+
}
1257+
// It should've already been closed as part of the previous finalization.
1258+
// Do it once again in case of prior errors.
1259+
defer func() {
1260+
if err != nil {
1261+
repl.Close()
1262+
}
1263+
}()
1264+
1265+
w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
1266+
if err != nil {
1267+
return errors.Wrap(err, "open old WAL")
1268+
}
1269+
defer w.Close()
1270+
1271+
rdr := w.Reader()
1272+
1273+
var (
1274+
enc RecordEncoder
1275+
b []byte
1276+
)
1277+
decErr := rdr.Read(
1278+
func(s []RefSeries) {
1279+
if err != nil {
1280+
return
1281+
}
1282+
err = repl.Log(enc.Series(s, b[:0]))
1283+
},
1284+
func(s []RefSample) {
1285+
if err != nil {
1286+
return
1287+
}
1288+
err = repl.Log(enc.Samples(s, b[:0]))
1289+
},
1290+
func(s []Stone) {
1291+
if err != nil {
1292+
return
1293+
}
1294+
err = repl.Log(enc.Tombstones(s, b[:0]))
1295+
},
1296+
)
1297+
if decErr != nil {
1298+
return errors.Wrap(err, "decode old entries")
1299+
}
1300+
if err != nil {
1301+
return errors.Wrap(err, "write new entries")
1302+
}
1303+
if err := repl.Close(); err != nil {
1304+
return errors.Wrap(err, "close new WAL")
1305+
}
1306+
if err := fileutil.Rename(tmpdir, dir); err != nil {
1307+
return errors.Wrap(err, "replace old WAL")
1308+
}
1309+
return nil
1310+
}

wal_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ import (
1919
"io/ioutil"
2020
"math/rand"
2121
"os"
22+
"path"
2223
"testing"
2324
"time"
2425

2526
"github.com/go-kit/kit/log"
2627
"github.com/prometheus/tsdb/fileutil"
2728
"github.com/prometheus/tsdb/labels"
2829
"github.com/prometheus/tsdb/testutil"
30+
"github.com/prometheus/tsdb/wal"
2931
)
3032

3133
func TestSegmentWAL_cut(t *testing.T) {
@@ -431,3 +433,117 @@ func TestWALRestoreCorrupted(t *testing.T) {
431433
})
432434
}
433435
}
436+
437+
func TestMigrateWAL_Empty(t *testing.T) {
438+
// The migration proecedure must properly deal with a zero-length segment,
439+
// which is valid in the new format.
440+
dir, err := ioutil.TempDir("", "walmigrate")
441+
testutil.Ok(t, err)
442+
defer os.RemoveAll(dir)
443+
444+
wdir := path.Join(dir, "wal")
445+
446+
// Initialize empty WAL.
447+
w, err := wal.New(nil, nil, wdir)
448+
testutil.Ok(t, err)
449+
testutil.Ok(t, w.Close())
450+
451+
testutil.Ok(t, MigrateWAL(nil, wdir))
452+
}
453+
454+
func TestMigrateWAL_Fuzz(t *testing.T) {
455+
dir, err := ioutil.TempDir("", "walmigrate")
456+
testutil.Ok(t, err)
457+
defer os.RemoveAll(dir)
458+
459+
wdir := path.Join(dir, "wal")
460+
461+
// Should pass if no WAL exists yet.
462+
testutil.Ok(t, MigrateWAL(nil, wdir))
463+
464+
oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil)
465+
testutil.Ok(t, err)
466+
467+
// Write some data.
468+
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
469+
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
470+
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
471+
}))
472+
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
473+
{Ref: 1, T: 100, V: 200},
474+
{Ref: 2, T: 300, V: 400},
475+
}))
476+
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
477+
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
478+
}))
479+
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
480+
{Ref: 3, T: 100, V: 200},
481+
{Ref: 4, T: 300, V: 400},
482+
}))
483+
testutil.Ok(t, oldWAL.LogDeletes([]Stone{
484+
{ref: 1, intervals: []Interval{{100, 200}}},
485+
}))
486+
487+
testutil.Ok(t, oldWAL.Close())
488+
489+
// Perform migration.
490+
testutil.Ok(t, MigrateWAL(nil, wdir))
491+
492+
w, err := wal.New(nil, nil, wdir)
493+
testutil.Ok(t, err)
494+
495+
// We can properly write some new data after migration.
496+
var enc RecordEncoder
497+
testutil.Ok(t, w.Log(enc.Samples([]RefSample{
498+
{Ref: 500, T: 1, V: 1},
499+
}, nil)))
500+
501+
testutil.Ok(t, w.Close())
502+
503+
// Read back all data.
504+
sr, err := wal.NewSegmentsReader(wdir)
505+
testutil.Ok(t, err)
506+
507+
r := wal.NewReader(sr)
508+
var res []interface{}
509+
var dec RecordDecoder
510+
511+
for r.Next() {
512+
rec := r.Record()
513+
514+
switch dec.Type(rec) {
515+
case RecordSeries:
516+
s, err := dec.Series(rec, nil)
517+
testutil.Ok(t, err)
518+
res = append(res, s)
519+
case RecordSamples:
520+
s, err := dec.Samples(rec, nil)
521+
testutil.Ok(t, err)
522+
res = append(res, s)
523+
case RecordTombstones:
524+
s, err := dec.Tombstones(rec, nil)
525+
testutil.Ok(t, err)
526+
res = append(res, s)
527+
default:
528+
t.Fatalf("unknown record type %d", dec.Type(rec))
529+
}
530+
}
531+
testutil.Ok(t, r.Err())
532+
533+
testutil.Equals(t, []interface{}{
534+
[]RefSeries{
535+
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
536+
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
537+
},
538+
[]RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}},
539+
[]RefSeries{
540+
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
541+
},
542+
[]RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}},
543+
[]Stone{{ref: 1, intervals: []Interval{{100, 200}}}},
544+
[]RefSample{{Ref: 500, T: 1, V: 1}},
545+
}, res)
546+
547+
// Migrating an already migrated WAL shouldn't do anything.
548+
testutil.Ok(t, MigrateWAL(nil, wdir))
549+
}

0 commit comments

Comments
 (0)