Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, err
}
// Migrate old WAL.
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
}

db = &DB{
dir: dir,
Expand Down
2 changes: 2 additions & 0 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ func (h *Head) Init() error {
if err == nil {
return nil
}
level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err)

if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}
Expand Down
2 changes: 1 addition & 1 deletion repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestRepairBadIndexVersion(t *testing.T) {
}

// On DB opening all blocks in the base dir should be repaired.
db, _ := Open("testdata/repair_index_version", nil, nil, nil)
db, err := Open("testdata/repair_index_version", nil, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
102 changes: 102 additions & 0 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/wal"
)

// WALEntryType indicates what data a WAL entry contains.
Expand Down Expand Up @@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {

// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
//
// DEPRECATED: use wal pkg combined with the record coders instead.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
Expand Down Expand Up @@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 {
}

// SegmentWAL is a write ahead log for series data.
//
// DEPRECATED: use wal pkg combined with the record coders instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/coders/codex/

type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
Expand Down Expand Up @@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
}
return nil
}

// MigrateWAL rewrites the deprecated write ahead log into the new format.
func MigrateWAL(logger log.Logger, dir string) (err error) {
if logger == nil {
logger = log.NewNopLogger()
}
// Detect whether we still have the old WAL.
fns, err := sequenceFiles(dir)
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "list sequence files")
}
if len(fns) == 0 {
return nil // No WAL at all yet.
}
// Check header of first segment to see whether we are still dealing with an
// old WAL.
f, err := os.Open(fns[0])
if err != nil {
return errors.Wrap(err, "check first existing segment")
}
defer f.Close()

var hdr [4]byte
if _, err := f.Read(hdr[:]); err != nil && err != io.EOF {
return errors.Wrap(err, "read header from first segment")
}
// If we cannot read the magic header for segments of the old WAL, abort.
// Either it's migrated already or there's a corruption issue with which
// we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case.
if binary.BigEndian.Uint32(hdr[:]) != WALMagic {
return nil
}

level.Info(logger).Log("msg", "migrating WAL format")

tmpdir := dir + ".tmp"
if err := os.RemoveAll(tmpdir); err != nil {
return errors.Wrap(err, "cleanup replacement dir")
}
repl, err := wal.New(logger, nil, tmpdir)
if err != nil {
return errors.Wrap(err, "open new WAL")
}
// We close it once already before as part of finalization.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't read properly

// Do it once again in case of prior errors.
defer func() {
if err != nil {
repl.Close()
}
}()

w, err := OpenSegmentWAL(dir, logger, time.Minute, nil)
if err != nil {
return errors.Wrap(err, "open old WAL")
}
defer w.Close()

rdr := w.Reader()

var (
enc RecordEncoder
b []byte
)
decErr := rdr.Read(
func(s []RefSeries) {
if err != nil {
return
}
err = repl.Log(enc.Series(s, b[:0]))
},
func(s []RefSample) {
if err != nil {
return
}
err = repl.Log(enc.Samples(s, b[:0]))
},
func(s []Stone) {
if err != nil {
return
}
err = repl.Log(enc.Tombstones(s, b[:0]))
},
)
if decErr != nil {
return errors.Wrap(err, "decode old entries")
}
if err != nil {
return errors.Wrap(err, "write new entries")
}
if err := repl.Close(); err != nil {
return errors.Wrap(err, "close new WAL")
}
if err := fileutil.Rename(tmpdir, dir); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer works given that dir already exists and Rename no longer deletes the destination first.

return errors.Wrap(err, "replace old WAL")
}
return nil
}
116 changes: 116 additions & 0 deletions wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"io/ioutil"
"math/rand"
"os"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/wal"
)

func TestSegmentWAL_cut(t *testing.T) {
Expand Down Expand Up @@ -431,3 +433,117 @@ func TestWALRestoreCorrupted(t *testing.T) {
})
}
}

func TestMigrateWAL_Empty(t *testing.T) {
// The migration proecedure must properly deal with a zero-length segment,
// which is valid in the new format.
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)

wdir := path.Join(dir, "wal")

// Initialize empty WAL.
w, err := wal.New(nil, nil, wdir)
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

testutil.Ok(t, MigrateWAL(nil, wdir))
}

func TestMigrateWAL_Fuzz(t *testing.T) {
dir, err := ioutil.TempDir("", "walmigrate")
testutil.Ok(t, err)
defer os.RemoveAll(dir)

wdir := path.Join(dir, "wal")

// Should pass if no WAL exists yet.
testutil.Ok(t, MigrateWAL(nil, wdir))

oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil)
testutil.Ok(t, err)

// Write some data.
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
}))
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
{Ref: 1, T: 100, V: 200},
{Ref: 2, T: 300, V: 400},
}))
testutil.Ok(t, oldWAL.LogSeries([]RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
}))
testutil.Ok(t, oldWAL.LogSamples([]RefSample{
{Ref: 3, T: 100, V: 200},
{Ref: 4, T: 300, V: 400},
}))
testutil.Ok(t, oldWAL.LogDeletes([]Stone{
{ref: 1, intervals: []Interval{{100, 200}}},
}))

testutil.Ok(t, oldWAL.Close())

// Perform migration.
testutil.Ok(t, MigrateWAL(nil, wdir))

w, err := wal.New(nil, nil, wdir)
testutil.Ok(t, err)

// We can properly write some new data after migration.
var enc RecordEncoder
testutil.Ok(t, w.Log(enc.Samples([]RefSample{
{Ref: 500, T: 1, V: 1},
}, nil)))

testutil.Ok(t, w.Close())

// Read back all data.
sr, err := wal.NewSegmentsReader(wdir)
testutil.Ok(t, err)

r := wal.NewReader(sr)
var res []interface{}
var dec RecordDecoder

for r.Next() {
rec := r.Record()

switch dec.Type(rec) {
case RecordSeries:
s, err := dec.Series(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
case RecordSamples:
s, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
case RecordTombstones:
s, err := dec.Tombstones(rec, nil)
testutil.Ok(t, err)
res = append(res, s)
default:
t.Fatalf("unknown record type %d", dec.Type(rec))
}
}
testutil.Ok(t, r.Err())

testutil.Equals(t, []interface{}{
[]RefSeries{
{Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")},
{Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")},
},
[]RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}},
[]RefSeries{
{Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")},
},
[]RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}},
[]Stone{{ref: 1, intervals: []Interval{{100, 200}}}},
[]RefSample{{Ref: 500, T: 1, V: 1}},
}, res)

// Migrating an already migrated WAL shouldn't do anything.
testutil.Ok(t, MigrateWAL(nil, wdir))
}