From 4bf8a821cfcfc9e743f54d1f310cf57848aef3a9 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Mon, 18 Sep 2017 18:05:42 +0200 Subject: [PATCH 1/4] Use concurrent.Writer in place of bufio.Writer and remove all Flush() calls from under mutex protection to reduce the likelihood of commits blocking on slow disk. --- wal.go | 71 +++++++++++++++++++++++++--------------------------------- 1 file changed, 30 insertions(+), 41 deletions(-) diff --git a/wal.go b/wal.go index 9af9a185..a69ee53a 100644 --- a/wal.go +++ b/wal.go @@ -14,7 +14,6 @@ package tsdb import ( - "bufio" "encoding/binary" "fmt" "hash" @@ -27,6 +26,7 @@ import ( "sync" "time" + "github.com/alin-sinpalean/concurrent-writer/concurrent" "github.com/coreos/etcd/pkg/fileutil" "github.com/go-kit/kit/log" "github.com/pkg/errors" @@ -161,7 +161,7 @@ type SegmentWAL struct { segmentSize int64 crc32 hash.Hash32 - cur *bufio.Writer + cur *concurrent.Writer curN int64 stopc chan struct{} @@ -525,12 +525,15 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { func (w *SegmentWAL) cut() error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.flush(); err != nil { - return err - } + cur := w.cur // Finish last segment asynchronously to not block the WAL moving along // in the new segment. go func() { + if cur != nil { + if err := cur.Flush(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + } off, err := hf.Seek(0, os.SEEK_CUR) if err != nil { w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) @@ -565,7 +568,7 @@ func (w *SegmentWAL) cut() error { w.files = append(w.files, newSegmentFile(f)) // TODO(gouthamve): make the buffer size a constant. - w.cur = bufio.NewWriterSize(f, 8*1024*1024) + w.cur = concurrent.NewWriterSize(f, 8*1024*1024) w.curN = 8 return nil @@ -580,45 +583,29 @@ func (w *SegmentWAL) head() *segmentFile { // Sync flushes the changes to disk. func (w *SegmentWAL) Sync() error { - var head *segmentFile - var err error - - // Flush the writer and retrieve the reference to the head segment under mutex lock. - func() { - w.mtx.Lock() - defer w.mtx.Unlock() - if err = w.flush(); err != nil { - return - } - head = w.head() - }() - if err != nil { + // Retrieve references to the head segment and current writer under mutex lock. + w.mtx.Lock() + cur := w.cur + head := w.head() + w.mtx.Unlock() + + // But only flush and fsync after releasing the mutex as it will block on disk I/O. + return syncImpl(cur, head) +} + +func syncImpl(cur *concurrent.Writer, head *segmentFile) error { + if cur == nil { + return nil + } + if err := cur.Flush(); err != nil { return errors.Wrap(err, "flush buffer") } if head != nil { - // But only fsync the head segment after releasing the mutex as it will block on disk I/O. return fileutil.Fdatasync(head.File) } return nil } -func (w *SegmentWAL) sync() error { - if err := w.flush(); err != nil { - return err - } - if w.head() == nil { - return nil - } - return fileutil.Fdatasync(w.head().File) -} - -func (w *SegmentWAL) flush() error { - if w.cur == nil { - return nil - } - return w.cur.Flush() -} - func (w *SegmentWAL) run(interval time.Duration) { var tick <-chan time.Time @@ -647,15 +634,17 @@ func (w *SegmentWAL) Close() error { <-w.donec w.mtx.Lock() - defer w.mtx.Unlock() + cur := w.cur + head := w.head() + w.mtx.Unlock() - if err := w.sync(); err != nil { + if err := syncImpl(cur, head); err != nil { return err } // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. - if hf := w.head(); hf != nil { - return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) + if head != nil { + return errors.Wrapf(head.Close(), "closing WAL head %s", head.Name()) } return nil } From fc844b65e41d8a9b9191c20684f4d7009335a9d5 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Tue, 19 Sep 2017 17:36:04 +0200 Subject: [PATCH 2/4] Add an optional channel parameter to SegmentWAL.cut(), to allow the caller to wait on the asynchronous finishing of the current segment. Mostly for testing. --- wal.go | 9 +++- wal_test.go | 116 ++++++++++++++++++++++++++-------------------------- 2 files changed, 66 insertions(+), 59 deletions(-) diff --git a/wal.go b/wal.go index a69ee53a..ecab90a1 100644 --- a/wal.go +++ b/wal.go @@ -522,7 +522,9 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { // cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. -func (w *SegmentWAL) cut() error { +// Optionally a channel may be provided to allow the caller to wait on the +// current segment having been finished, as this is done asynchronously. +func (w *SegmentWAL) cut(done chan interface{}) error { // Sync current head to disk and close. if hf := w.head(); hf != nil { cur := w.cur @@ -547,6 +549,9 @@ func (w *SegmentWAL) cut() error { if err := hf.Close(); err != nil { w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) } + if done != nil { + close(done) + } }() } @@ -669,7 +674,7 @@ func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened. // Probably fine in general but may yield a lot of short files in some cases. if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { - if err := w.cut(); err != nil { + if err := w.cut(nil); err != nil { return err } } diff --git a/wal_test.go b/wal_test.go index f36b22e9..1df91d8c 100644 --- a/wal_test.go +++ b/wal_test.go @@ -89,7 +89,9 @@ func TestSegmentWAL_cut(t *testing.T) { require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) - require.NoError(t, w.cut(), "cut failed") + done := make(chan interface{}) + require.NoError(t, w.cut(done), "cut failed") + <-done // Cutting creates a new file. require.Equal(t, 2, len(w.files)) @@ -378,77 +380,77 @@ func TestWALRestoreCorrupted(t *testing.T) { }, }, } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - // Generate testing data. It does not make semantical sense but - // for the purpose of this test. - dir, err := ioutil.TempDir("", "test_corrupted") - require.NoError(t, err) - defer os.RemoveAll(dir) + for x := 0; x < 10; x++ { + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Generate testing data. It does not make semantical sense but + // for the purpose of this test. + dir, err := ioutil.TempDir("", "test_corrupted") + require.NoError(t, err) + defer os.RemoveAll(dir) - w, err := OpenSegmentWAL(dir, nil, 0) - require.NoError(t, err) + w, err := OpenSegmentWAL(dir, nil, 0) + require.NoError(t, err) - require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) - require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) - require.NoError(t, w.cut()) + done := make(chan interface{}) + require.NoError(t, w.cut(done)) + <-done - require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) - require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) - require.NoError(t, w.Close()) + require.NoError(t, w.Close()) - // cut() truncates and fsyncs the first segment async. If it happens after - // the corruption we apply below, the corruption will be overwritten again. - // Fire and forget a sync to avoid flakyness. - w.files[0].Sync() - // Corrupt the second entry in the first file. - // After re-opening we must be able to read the first entry - // and the rest, including the second file, must be truncated for clean further - // writes. - c.f(t, w) + // Corrupt the second entry in the first file. + // After re-opening we must be able to read the first entry + // and the rest, including the second file, must be truncated for clean further + // writes. + c.f(t, w) - logger := log.NewLogfmtLogger(os.Stderr) + logger := log.NewLogfmtLogger(os.Stderr) - w2, err := OpenSegmentWAL(dir, logger, 0) - require.NoError(t, err) + w2, err := OpenSegmentWAL(dir, logger, 0) + require.NoError(t, err) - r := w2.Reader() + r := w2.Reader() - serf := func(l []RefSeries) error { - require.Equal(t, 0, len(l)) - return nil - } - delf := func([]Stone) error { return nil } - - // Weird hack to check order of reads. - i := 0 - samplf := func(s []RefSample) error { - if i == 0 { - require.Equal(t, []RefSample{{T: 1, V: 2}}, s) - i++ - } else { - require.Equal(t, []RefSample{{T: 99, V: 100}}, s) + serf := func(l []RefSeries) error { + require.Equal(t, 0, len(l)) + return nil + } + delf := func([]Stone) error { return nil } + + // Weird hack to check order of reads. + i := 0 + samplf := func(s []RefSample) error { + if i == 0 { + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) + i++ + } else { + require.Equal(t, []RefSample{{T: 99, V: 100}}, s) + } + + return nil } - return nil - } - - require.NoError(t, r.Read(serf, samplf, delf)) + require.NoError(t, r.Read(serf, samplf, delf)) - require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) - require.NoError(t, w2.Close()) + require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) + require.NoError(t, w2.Close()) - // We should see the first valid entry and the new one, everything after - // is truncated. - w3, err := OpenSegmentWAL(dir, logger, 0) - require.NoError(t, err) + // We should see the first valid entry and the new one, everything after + // is truncated. + w3, err := OpenSegmentWAL(dir, logger, 0) + require.NoError(t, err) - r = w3.Reader() + r = w3.Reader() - i = 0 - require.NoError(t, r.Read(serf, samplf, delf)) - }) + i = 0 + require.NoError(t, r.Read(serf, samplf, delf)) + }) + } } } From ddb75717ade818160662bdcb421fcebb7edcc502 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Tue, 19 Sep 2017 17:40:55 +0200 Subject: [PATCH 3/4] Revert debugging change that was unintentionally included with the commit. --- wal_test.go | 110 ++++++++++++++++++++++++++-------------------------- 1 file changed, 54 insertions(+), 56 deletions(-) diff --git a/wal_test.go b/wal_test.go index 1df91d8c..6120f950 100644 --- a/wal_test.go +++ b/wal_test.go @@ -380,77 +380,75 @@ func TestWALRestoreCorrupted(t *testing.T) { }, }, } - for x := 0; x < 10; x++ { - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - // Generate testing data. It does not make semantical sense but - // for the purpose of this test. - dir, err := ioutil.TempDir("", "test_corrupted") - require.NoError(t, err) - defer os.RemoveAll(dir) + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Generate testing data. It does not make semantical sense but + // for the purpose of this test. + dir, err := ioutil.TempDir("", "test_corrupted") + require.NoError(t, err) + defer os.RemoveAll(dir) - w, err := OpenSegmentWAL(dir, nil, 0) - require.NoError(t, err) + w, err := OpenSegmentWAL(dir, nil, 0) + require.NoError(t, err) - require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) - require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}})) - done := make(chan interface{}) - require.NoError(t, w.cut(done)) - <-done + done := make(chan interface{}) + require.NoError(t, w.cut(done)) + <-done - require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) - require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}})) + require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}})) - require.NoError(t, w.Close()) + require.NoError(t, w.Close()) - // Corrupt the second entry in the first file. - // After re-opening we must be able to read the first entry - // and the rest, including the second file, must be truncated for clean further - // writes. - c.f(t, w) + // Corrupt the second entry in the first file. + // After re-opening we must be able to read the first entry + // and the rest, including the second file, must be truncated for clean further + // writes. + c.f(t, w) - logger := log.NewLogfmtLogger(os.Stderr) + logger := log.NewLogfmtLogger(os.Stderr) - w2, err := OpenSegmentWAL(dir, logger, 0) - require.NoError(t, err) + w2, err := OpenSegmentWAL(dir, logger, 0) + require.NoError(t, err) - r := w2.Reader() + r := w2.Reader() - serf := func(l []RefSeries) error { - require.Equal(t, 0, len(l)) - return nil - } - delf := func([]Stone) error { return nil } - - // Weird hack to check order of reads. - i := 0 - samplf := func(s []RefSample) error { - if i == 0 { - require.Equal(t, []RefSample{{T: 1, V: 2}}, s) - i++ - } else { - require.Equal(t, []RefSample{{T: 99, V: 100}}, s) - } - - return nil + serf := func(l []RefSeries) error { + require.Equal(t, 0, len(l)) + return nil + } + delf := func([]Stone) error { return nil } + + // Weird hack to check order of reads. + i := 0 + samplf := func(s []RefSample) error { + if i == 0 { + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) + i++ + } else { + require.Equal(t, []RefSample{{T: 99, V: 100}}, s) } - require.NoError(t, r.Read(serf, samplf, delf)) + return nil + } - require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) - require.NoError(t, w2.Close()) + require.NoError(t, r.Read(serf, samplf, delf)) - // We should see the first valid entry and the new one, everything after - // is truncated. - w3, err := OpenSegmentWAL(dir, logger, 0) - require.NoError(t, err) + require.NoError(t, w2.LogSamples([]RefSample{{T: 99, V: 100}})) + require.NoError(t, w2.Close()) - r = w3.Reader() + // We should see the first valid entry and the new one, everything after + // is truncated. + w3, err := OpenSegmentWAL(dir, logger, 0) + require.NoError(t, err) - i = 0 - require.NoError(t, r.Read(serf, samplf, delf)) - }) - } + r = w3.Reader() + + i = 0 + require.NoError(t, r.Read(serf, samplf, delf)) + }) } } From 593b96877b62c1ea62e7ae5b5c181900fb820170 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Tue, 10 Oct 2017 09:35:01 +0200 Subject: [PATCH 4/4] Fix compilation error. --- wal.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/wal.go b/wal.go index 07a9fe54..1663d753 100644 --- a/wal.go +++ b/wal.go @@ -547,7 +547,7 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { // cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. -// Optionally a channel may be provided to allow the caller to wait on the +// Optionally, a channel may be provided to allow the caller to wait on the // current segment having been finished, as this is done asynchronously. func (w *SegmentWAL) cut(done chan interface{}) error { // Sync current head to disk and close. @@ -620,10 +620,10 @@ func (w *SegmentWAL) Sync() error { w.mtx.Unlock() // But only flush and fsync after releasing the mutex as it will block on disk I/O. - return syncImpl(cur, head) + return syncImpl(cur, head, w.metrics.fsyncDuration) } -func syncImpl(cur *concurrent.Writer, head *segmentFile) error { +func syncImpl(cur *concurrent.Writer, head *segmentFile, fsyncDuration prometheus.Summary) error { if cur == nil { return nil } @@ -633,7 +633,7 @@ func syncImpl(cur *concurrent.Writer, head *segmentFile) error { if head != nil { start := time.Now() err := fileutil.Fdatasync(head.File) - w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) + fsyncDuration.Observe(time.Since(start).Seconds()) return err } return nil @@ -668,16 +668,16 @@ func (w *SegmentWAL) Close() error { w.mtx.Lock() cur := w.cur - head := w.head() + hf := w.head() w.mtx.Unlock() - if err := syncImpl(cur, head); err != nil { + if err := syncImpl(cur, hf, w.metrics.fsyncDuration); err != nil { return err } // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. - if head != nil { - return errors.Wrapf(head.Close(), "closing WAL head %s", head.Name()) + if hf != nil { + return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) } return nil }