diff --git a/wal.go b/wal.go index 5c6e78d2..1dfce3f6 100644 --- a/wal.go +++ b/wal.go @@ -14,7 +14,6 @@ package tsdb import ( - "bufio" "encoding/binary" "fmt" "hash" @@ -27,6 +26,7 @@ import ( "sync" "time" + "github.com/alin-sinpalean/concurrent-writer/concurrent" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" @@ -185,7 +185,7 @@ type SegmentWAL struct { segmentSize int64 crc32 hash.Hash32 - cur *bufio.Writer + cur *concurrent.Writer curN int64 stopc chan struct{} @@ -551,15 +551,20 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { // cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. -func (w *SegmentWAL) cut() error { +// Optionally, a channel may be provided to allow the caller to wait on the +// current segment having been finished, as this is done asynchronously. +func (w *SegmentWAL) cut(done chan interface{}) error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.flush(); err != nil { - return err - } + cur := w.cur // Finish last segment asynchronously to not block the WAL moving along // in the new segment. go func() { + if cur != nil { + if err := cur.Flush(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + } off, err := hf.Seek(0, os.SEEK_CUR) if err != nil { level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) @@ -573,6 +578,9 @@ func (w *SegmentWAL) cut() error { if err := hf.Close(); err != nil { level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) } + if done != nil { + close(done) + } }() } @@ -594,7 +602,7 @@ func (w *SegmentWAL) cut() error { w.files = append(w.files, newSegmentFile(f)) // TODO(gouthamve): make the buffer size a constant. - w.cur = bufio.NewWriterSize(f, 8*1024*1024) + w.cur = concurrent.NewWriterSize(f, 8*1024*1024) w.curN = 8 return nil @@ -609,52 +617,32 @@ func (w *SegmentWAL) head() *segmentFile { // Sync flushes the changes to disk. func (w *SegmentWAL) Sync() error { - var head *segmentFile - var err error + // Retrieve references to the head segment and current writer under mutex lock. + w.mtx.Lock() + cur := w.cur + head := w.head() + w.mtx.Unlock() - // Flush the writer and retrieve the reference to the head segment under mutex lock. - func() { - w.mtx.Lock() - defer w.mtx.Unlock() - if err = w.flush(); err != nil { - return - } - head = w.head() - }() - if err != nil { + // But only flush and fsync after releasing the mutex as it will block on disk I/O. + return syncImpl(cur, head, w.metrics.fsyncDuration) +} + +func syncImpl(cur *concurrent.Writer, head *segmentFile, fsyncDuration prometheus.Summary) error { + if cur == nil { + return nil + } + if err := cur.Flush(); err != nil { return errors.Wrap(err, "flush buffer") } if head != nil { - // But only fsync the head segment after releasing the mutex as it will block on disk I/O. start := time.Now() err := fileutil.Fdatasync(head.File) - w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) + fsyncDuration.Observe(time.Since(start).Seconds()) return err } return nil } -func (w *SegmentWAL) sync() error { - if err := w.flush(); err != nil { - return err - } - if w.head() == nil { - return nil - } - - start := time.Now() - err := fileutil.Fdatasync(w.head().File) - w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) - return err -} - -func (w *SegmentWAL) flush() error { - if w.cur == nil { - return nil - } - return w.cur.Flush() -} - func (w *SegmentWAL) run(interval time.Duration) { var tick <-chan time.Time @@ -683,14 +671,16 @@ func (w *SegmentWAL) Close() error { <-w.donec w.mtx.Lock() - defer w.mtx.Unlock() + cur := w.cur + hf := w.head() + w.mtx.Unlock() - if err := w.sync(); err != nil { + if err := syncImpl(cur, hf, w.metrics.fsyncDuration); err != nil { return err } // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. - if hf := w.head(); hf != nil { + if hf != nil { return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) } return nil @@ -716,7 +706,7 @@ func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened. // Probably fine in general but may yield a lot of short files in some cases. if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { - if err := w.cut(); err != nil { + if err := w.cut(nil); err != nil { return err } } diff --git a/wal_test.go b/wal_test.go index aadce89d..86a3fcc8 100644 --- a/wal_test.go +++ b/wal_test.go @@ -89,7 +89,9 @@ func TestSegmentWAL_cut(t *testing.T) { require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) - require.NoError(t, w.cut(), "cut failed") + done := make(chan interface{}) + require.NoError(t, w.cut(done), "cut failed") + <-done // Cutting creates a new file. require.Equal(t, 2, len(w.files)) @@ -389,17 +391,15 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) - require.NoError(t, w.cut()) + done := make(chan interface{}) + require.NoError(t, w.cut(done)) + <-done require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) require.NoError(t, w.Close()) - // cut() truncates and fsyncs the first segment async. If it happens after - // the corruption we apply below, the corruption will be overwritten again. - // Fire and forget a sync to avoid flakyness. - w.files[0].Sync() // Corrupt the second entry in the first file. // After re-opening we must be able to read the first entry // and the rest, including the second file, must be truncated for clean further