Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 6ea65a3

Browse files
committed
Provide option to compress WAL records
In running Prometheus instances, compressing the records was shown to reduce disk usage by half while incurring a negligible CPU cost. Signed-off-by: Chris Marchbanks <[email protected]>
1 parent c2c921a commit 6ea65a3

File tree

15 files changed

+680
-506
lines changed

15 files changed

+680
-506
lines changed

checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
135135
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
136136
return nil, errors.Wrap(err, "create checkpoint dir")
137137
}
138-
cp, err := wal.New(nil, nil, cpdirtmp)
138+
cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
139139
if err != nil {
140140
return nil, errors.Wrap(err, "open checkpoint")
141141
}

checkpoint_test.go

Lines changed: 96 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -86,108 +86,112 @@ func TestDeleteCheckpoints(t *testing.T) {
8686
}
8787

8888
func TestCheckpoint(t *testing.T) {
89-
dir, err := ioutil.TempDir("", "test_checkpoint")
90-
testutil.Ok(t, err)
91-
defer func() {
92-
testutil.Ok(t, os.RemoveAll(dir))
93-
}()
94-
95-
var enc RecordEncoder
96-
// Create a dummy segment to bump the initial number.
97-
seg, err := wal.CreateSegment(dir, 100)
98-
testutil.Ok(t, err)
99-
testutil.Ok(t, seg.Close())
100-
101-
// Manually create checkpoint for 99 and earlier.
102-
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
103-
testutil.Ok(t, err)
104-
105-
// Add some data we expect to be around later.
106-
err = w.Log(enc.Series([]RefSeries{
107-
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
108-
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
109-
}, nil))
110-
testutil.Ok(t, err)
111-
testutil.Ok(t, w.Close())
112-
113-
// Start a WAL and write records to it as usual.
114-
w, err = wal.NewSize(nil, nil, dir, 64*1024)
115-
testutil.Ok(t, err)
89+
for _, compress := range []bool{false, true} {
90+
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
91+
dir, err := ioutil.TempDir("", "test_checkpoint")
92+
testutil.Ok(t, err)
93+
defer func() {
94+
testutil.Ok(t, os.RemoveAll(dir))
95+
}()
11696

117-
var last int64
118-
for i := 0; ; i++ {
119-
_, n, err := w.Segments()
120-
testutil.Ok(t, err)
121-
if n >= 106 {
122-
break
123-
}
124-
// Write some series initially.
125-
if i == 0 {
126-
b := enc.Series([]RefSeries{
127-
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
128-
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
129-
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
130-
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
131-
}, nil)
132-
testutil.Ok(t, w.Log(b))
133-
}
134-
// Write samples until the WAL has enough segments.
135-
// Make them have drifting timestamps within a record to see that they
136-
// get filtered properly.
137-
b := enc.Samples([]RefSample{
138-
{Ref: 0, T: last, V: float64(i)},
139-
{Ref: 1, T: last + 10000, V: float64(i)},
140-
{Ref: 2, T: last + 20000, V: float64(i)},
141-
{Ref: 3, T: last + 30000, V: float64(i)},
142-
}, nil)
143-
testutil.Ok(t, w.Log(b))
144-
145-
last += 100
146-
}
147-
testutil.Ok(t, w.Close())
97+
var enc RecordEncoder
98+
// Create a dummy segment to bump the initial number.
99+
seg, err := wal.CreateSegment(dir, 100)
100+
testutil.Ok(t, err)
101+
testutil.Ok(t, seg.Close())
148102

149-
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
150-
return x%2 == 0
151-
}, last/2)
152-
testutil.Ok(t, err)
153-
testutil.Ok(t, w.Truncate(107))
154-
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
103+
// Manually create checkpoint for 99 and earlier.
104+
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress)
105+
testutil.Ok(t, err)
155106

156-
// Only the new checkpoint should be left.
157-
files, err := fileutil.ReadDir(dir)
158-
testutil.Ok(t, err)
159-
testutil.Equals(t, 1, len(files))
160-
testutil.Equals(t, "checkpoint.000106", files[0])
107+
// Add some data we expect to be around later.
108+
err = w.Log(enc.Series([]RefSeries{
109+
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
110+
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
111+
}, nil))
112+
testutil.Ok(t, err)
113+
testutil.Ok(t, w.Close())
161114

162-
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
163-
testutil.Ok(t, err)
164-
defer sr.Close()
115+
// Start a WAL and write records to it as usual.
116+
w, err = wal.NewSize(nil, nil, dir, 64*1024, compress)
117+
testutil.Ok(t, err)
165118

166-
var dec RecordDecoder
167-
var series []RefSeries
168-
r := wal.NewReader(sr)
119+
var last int64
120+
for i := 0; ; i++ {
121+
_, n, err := w.Segments()
122+
testutil.Ok(t, err)
123+
if n >= 106 {
124+
break
125+
}
126+
// Write some series initially.
127+
if i == 0 {
128+
b := enc.Series([]RefSeries{
129+
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
130+
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
131+
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
132+
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
133+
}, nil)
134+
testutil.Ok(t, w.Log(b))
135+
}
136+
// Write samples until the WAL has enough segments.
137+
// Make them have drifting timestamps within a record to see that they
138+
// get filtered properly.
139+
b := enc.Samples([]RefSample{
140+
{Ref: 0, T: last, V: float64(i)},
141+
{Ref: 1, T: last + 10000, V: float64(i)},
142+
{Ref: 2, T: last + 20000, V: float64(i)},
143+
{Ref: 3, T: last + 30000, V: float64(i)},
144+
}, nil)
145+
testutil.Ok(t, w.Log(b))
146+
147+
last += 100
148+
}
149+
testutil.Ok(t, w.Close())
169150

170-
for r.Next() {
171-
rec := r.Record()
151+
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
152+
return x%2 == 0
153+
}, last/2)
154+
testutil.Ok(t, err)
155+
testutil.Ok(t, w.Truncate(107))
156+
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
172157

173-
switch dec.Type(rec) {
174-
case RecordSeries:
175-
series, err = dec.Series(rec, series)
158+
// Only the new checkpoint should be left.
159+
files, err := fileutil.ReadDir(dir)
176160
testutil.Ok(t, err)
177-
case RecordSamples:
178-
samples, err := dec.Samples(rec, nil)
161+
testutil.Equals(t, 1, len(files))
162+
testutil.Equals(t, "checkpoint.000106", files[0])
163+
164+
sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
179165
testutil.Ok(t, err)
180-
for _, s := range samples {
181-
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
166+
defer sr.Close()
167+
168+
var dec RecordDecoder
169+
var series []RefSeries
170+
r := wal.NewReader(sr)
171+
172+
for r.Next() {
173+
rec := r.Record()
174+
175+
switch dec.Type(rec) {
176+
case RecordSeries:
177+
series, err = dec.Series(rec, series)
178+
testutil.Ok(t, err)
179+
case RecordSamples:
180+
samples, err := dec.Samples(rec, nil)
181+
testutil.Ok(t, err)
182+
for _, s := range samples {
183+
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
184+
}
185+
}
182186
}
183-
}
187+
testutil.Ok(t, r.Err())
188+
testutil.Equals(t, []RefSeries{
189+
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
190+
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
191+
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
192+
}, series)
193+
})
184194
}
185-
testutil.Ok(t, r.Err())
186-
testutil.Equals(t, []RefSeries{
187-
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
188-
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
189-
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
190-
}, series)
191195
}
192196

193197
func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
@@ -197,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
197201
defer func() {
198202
testutil.Ok(t, os.RemoveAll(dir))
199203
}()
200-
w, err := wal.NewSize(nil, nil, dir, 64*1024)
204+
w, err := wal.NewSize(nil, nil, dir, 64*1024, false)
201205
testutil.Ok(t, err)
202206
testutil.Ok(t, w.Log([]byte{99}))
203207
w.Close()

db.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ var DefaultOptions = &Options{
5151
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
5252
NoLockfile: false,
5353
AllowOverlappingBlocks: false,
54+
WALCompression: false,
5455
}
5556

5657
// Options of the DB storage.
@@ -80,6 +81,9 @@ type Options struct {
8081
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
8182
// This in-turn enables vertical compaction and vertical query merge.
8283
AllowOverlappingBlocks bool
84+
85+
// WALCompression will turn on Snappy compression for records on the WAL.
86+
WALCompression bool
8387
}
8488

8589
// Appender allows appending a batch of data. It must be completed with a
@@ -306,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
306310
if opts.WALSegmentSize > 0 {
307311
segmentSize = opts.WALSegmentSize
308312
}
309-
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
313+
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
310314
if err != nil {
311315
return nil, err
312316
}

db_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
14041404
}()
14051405

14061406
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
1407-
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
1407+
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
14081408
testutil.Ok(t, err)
14091409

14101410
var enc RecordEncoder
@@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
14541454
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
14551455

14561456
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
1457-
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
1457+
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
14581458
testutil.Ok(t, err)
14591459

14601460
var enc RecordEncoder

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ require (
44
github.com/cespare/xxhash v1.1.0
55
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954
66
github.com/go-kit/kit v0.8.0
7+
github.com/golang/snappy v0.0.1
78
github.com/oklog/ulid v1.3.1
89
github.com/pkg/errors v0.8.0
910
github.com/prometheus/client_golang v1.0.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
3030
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
3131
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
3232
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
33+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
34+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
3335
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
3436
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
3537
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=

0 commit comments

Comments
 (0)