This repository was archived by the owner on Aug 13, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 177
Migrate write ahead log #340
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ import ( | |
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/tsdb/fileutil" | ||
| "github.com/prometheus/tsdb/labels" | ||
| "github.com/prometheus/tsdb/wal" | ||
| ) | ||
|
|
||
| // WALEntryType indicates what data a WAL entry contains. | ||
|
|
@@ -82,6 +83,8 @@ func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { | |
|
|
||
| // WAL is a write ahead log that can log new series labels and samples. | ||
| // It must be completely read before new entries are logged. | ||
| // | ||
| // DEPRECATED: use wal pkg combined with the record codex instead. | ||
| type WAL interface { | ||
| Reader() WALReader | ||
| LogSeries([]RefSeries) error | ||
|
|
@@ -173,6 +176,8 @@ func newCRC32() hash.Hash32 { | |
| } | ||
|
|
||
| // SegmentWAL is a write ahead log for series data. | ||
| // | ||
| // DEPRECATED: use wal pkg combined with the record coders instead. | ||
| type SegmentWAL struct { | ||
| mtx sync.Mutex | ||
| metrics *walMetrics | ||
|
|
@@ -1206,3 +1211,100 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { | |
| } | ||
| return nil | ||
| } | ||
|
|
||
| // MigrateWAL rewrites the deprecated write ahead log into the new format. | ||
| func MigrateWAL(logger log.Logger, dir string) (err error) { | ||
| if logger == nil { | ||
| logger = log.NewNopLogger() | ||
| } | ||
| // Detect whether we still have the old WAL. | ||
| fns, err := sequenceFiles(dir) | ||
| if err != nil && !os.IsNotExist(err) { | ||
| return errors.Wrap(err, "list sequence files") | ||
| } | ||
| if len(fns) == 0 { | ||
| return nil // No WAL at all yet. | ||
| } | ||
| // Check header of first segment to see whether we are still dealing with an | ||
| // old WAL. | ||
| f, err := os.Open(fns[0]) | ||
| if err != nil { | ||
| return errors.Wrap(err, "check first existing segment") | ||
| } | ||
| defer f.Close() | ||
|
|
||
| var hdr [4]byte | ||
| if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { | ||
| return errors.Wrap(err, "read header from first segment") | ||
| } | ||
| // If we cannot read the magic header for segments of the old WAL, abort. | ||
| // Either it's migrated already or there's a corruption issue with which | ||
| // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. | ||
| if binary.BigEndian.Uint32(hdr[:]) != WALMagic { | ||
| return nil | ||
| } | ||
|
|
||
| level.Info(logger).Log("msg", "migrating WAL format") | ||
|
|
||
| tmpdir := dir + ".tmp" | ||
| if err := os.RemoveAll(tmpdir); err != nil { | ||
| return errors.Wrap(err, "cleanup replacement dir") | ||
| } | ||
| repl, err := wal.New(logger, nil, tmpdir) | ||
| if err != nil { | ||
| return errors.Wrap(err, "open new WAL") | ||
| } | ||
| // It should've already been closed as part of the previous finalization. | ||
| // Do it once again in case of prior errors. | ||
| defer func() { | ||
| if err != nil { | ||
| repl.Close() | ||
| } | ||
| }() | ||
|
|
||
| w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) | ||
| if err != nil { | ||
| return errors.Wrap(err, "open old WAL") | ||
| } | ||
| defer w.Close() | ||
|
|
||
| rdr := w.Reader() | ||
|
|
||
| var ( | ||
| enc RecordEncoder | ||
| b []byte | ||
| ) | ||
| decErr := rdr.Read( | ||
| func(s []RefSeries) { | ||
| if err != nil { | ||
| return | ||
| } | ||
| err = repl.Log(enc.Series(s, b[:0])) | ||
| }, | ||
| func(s []RefSample) { | ||
| if err != nil { | ||
| return | ||
| } | ||
| err = repl.Log(enc.Samples(s, b[:0])) | ||
| }, | ||
| func(s []Stone) { | ||
| if err != nil { | ||
| return | ||
| } | ||
| err = repl.Log(enc.Tombstones(s, b[:0])) | ||
| }, | ||
| ) | ||
| if decErr != nil { | ||
| return errors.Wrap(err, "decode old entries") | ||
| } | ||
| if err != nil { | ||
| return errors.Wrap(err, "write new entries") | ||
| } | ||
| if err := repl.Close(); err != nil { | ||
| return errors.Wrap(err, "close new WAL") | ||
| } | ||
| if err := fileutil.Rename(tmpdir, dir); err != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This no longer works given that dir already exists and Rename no longer deletes the destination first. |
||
| return errors.Wrap(err, "replace old WAL") | ||
| } | ||
| return nil | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/coders/codex/