Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit ee4a2c5

Browse files
committed
Optionally compress the WAL using Snappy
Signed-off-by: Chris Marchbanks <[email protected]>
1 parent d48a5e2 commit ee4a2c5

File tree

14 files changed

+160
-107
lines changed

14 files changed

+160
-107
lines changed

checkpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
135135
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
136136
return nil, errors.Wrap(err, "create checkpoint dir")
137137
}
138-
cp, err := wal.New(nil, nil, cpdirtmp)
138+
cp, err := wal.New(nil, nil, cpdirtmp, w.Compress())
139139
if err != nil {
140140
return nil, errors.Wrap(err, "open checkpoint")
141141
}

checkpoint_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func TestCheckpoint(t *testing.T) {
9999
testutil.Ok(t, seg.Close())
100100

101101
// Manually create checkpoint for 99 and earlier.
102-
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
102+
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), false)
103103
testutil.Ok(t, err)
104104

105105
// Add some data we expect to be around later.
@@ -111,7 +111,7 @@ func TestCheckpoint(t *testing.T) {
111111
testutil.Ok(t, w.Close())
112112

113113
// Start a WAL and write records to it as usual.
114-
w, err = wal.NewSize(nil, nil, dir, 64*1024)
114+
w, err = wal.NewSize(nil, nil, dir, 64*1024, false)
115115
testutil.Ok(t, err)
116116

117117
var last int64
@@ -197,7 +197,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
197197
defer func() {
198198
testutil.Ok(t, os.RemoveAll(dir))
199199
}()
200-
w, err := wal.NewSize(nil, nil, dir, 64*1024)
200+
w, err := wal.NewSize(nil, nil, dir, 64*1024, false)
201201
testutil.Ok(t, err)
202202
testutil.Ok(t, w.Log([]byte{99}))
203203
w.Close()

db.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ type Options struct {
8080
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
8181
// This in-turn enables vertical compaction and vertical query merge.
8282
AllowOverlappingBlocks bool
83+
84+
// WALCompression will turn on Snappy compression for records on the WAL.
85+
WALCompression bool
8386
}
8487

8588
// Appender allows appending a batch of data. It must be completed with a
@@ -300,7 +303,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
300303
if opts.WALSegmentSize > 0 {
301304
segmentSize = opts.WALSegmentSize
302305
}
303-
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
306+
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
304307
if err != nil {
305308
return nil, err
306309
}

db_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
14041404
}()
14051405

14061406
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
1407-
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
1407+
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
14081408
testutil.Ok(t, err)
14091409

14101410
var enc RecordEncoder
@@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
14541454
createBlock(t, dir, genSeries(1, 1, 1000, 6000))
14551455

14561456
testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
1457-
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
1457+
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
14581458
testutil.Ok(t, err)
14591459

14601460
var enc RecordEncoder

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/go-stack/stack v1.8.0 // indirect
1313
github.com/gogo/protobuf v1.1.1 // indirect
1414
github.com/golang/protobuf v1.2.0 // indirect
15+
github.com/golang/snappy v0.0.1
1516
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect
1617
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
1718
github.com/oklog/ulid v1.3.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
2222
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
2323
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
2424
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
25+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
26+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
2527
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
2628
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
2729
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=

head_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) {
124124
testutil.Ok(t, os.RemoveAll(dir))
125125
}()
126126

127-
w, err := wal.New(nil, nil, dir)
127+
w, err := wal.New(nil, nil, dir, false)
128128
testutil.Ok(t, err)
129129
defer w.Close()
130130
populateTestWAL(t, w, entries)
@@ -290,7 +290,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
290290
testutil.Ok(t, os.RemoveAll(dir))
291291
}()
292292

293-
w, err := wal.New(nil, nil, dir)
293+
w, err := wal.New(nil, nil, dir, false)
294294
testutil.Ok(t, err)
295295
defer w.Close()
296296
populateTestWAL(t, w, entries)
@@ -348,7 +348,7 @@ Outer:
348348
testutil.Ok(t, os.RemoveAll(dir))
349349
}()
350350

351-
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
351+
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
352352
testutil.Ok(t, err)
353353
defer w.Close()
354354

@@ -370,7 +370,7 @@ Outer:
370370
}
371371

372372
// Compare the samples for both heads - before and after the reload.
373-
reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload.
373+
reloadedW, err := wal.New(nil, nil, w.Dir(), false) // Use a new wal to ensure deleted samples are gone even after a reload.
374374
testutil.Ok(t, err)
375375
defer reloadedW.Close()
376376
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
@@ -511,7 +511,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
511511
defer func() {
512512
testutil.Ok(t, os.RemoveAll(dir))
513513
}()
514-
wlog, err := wal.NewSize(nil, nil, dir, 32768)
514+
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
515515
testutil.Ok(t, err)
516516

517517
// Enough samples to cause a checkpoint.
@@ -977,7 +977,7 @@ func TestHead_LogRollback(t *testing.T) {
977977
testutil.Ok(t, os.RemoveAll(dir))
978978
}()
979979

980-
w, err := wal.New(nil, nil, dir)
980+
w, err := wal.New(nil, nil, dir, false)
981981
testutil.Ok(t, err)
982982
defer w.Close()
983983
h, err := NewHead(nil, nil, w, 1000)
@@ -1046,7 +1046,7 @@ func TestWalRepair(t *testing.T) {
10461046
testutil.Ok(t, os.RemoveAll(dir))
10471047
}()
10481048

1049-
w, err := wal.New(nil, nil, dir)
1049+
w, err := wal.New(nil, nil, dir, false)
10501050
testutil.Ok(t, err)
10511051
defer w.Close()
10521052

wal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) {
12451245
if err := os.RemoveAll(tmpdir); err != nil {
12461246
return errors.Wrap(err, "cleanup replacement dir")
12471247
}
1248-
repl, err := wal.New(logger, nil, tmpdir)
1248+
repl, err := wal.New(logger, nil, tmpdir, false)
12491249
if err != nil {
12501250
return errors.Wrap(err, "open new WAL")
12511251
}

wal/live_reader.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/go-kit/kit/log"
2424
"github.com/go-kit/kit/log/level"
25+
"github.com/golang/snappy"
2526
"github.com/pkg/errors"
2627
"github.com/prometheus/client_golang/prometheus"
2728
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -166,10 +167,11 @@ func (r *LiveReader) buildRecord() (bool, error) {
166167
return false, nil
167168
}
168169

169-
rt := recType(r.hdr[0])
170+
rt := recTypeFromHeader(r.hdr[0])
170171
if rt == recFirst || rt == recFull {
171172
r.rec = r.rec[:0]
172173
}
174+
compressed := r.hdr[0]&8 == 8
173175
r.rec = append(r.rec, temp...)
174176

175177
if err := validateRecord(rt, r.index); err != nil {
@@ -178,6 +180,9 @@ func (r *LiveReader) buildRecord() (bool, error) {
178180
}
179181
if rt == recLast || rt == recFull {
180182
r.index = 0
183+
if compressed && len(r.rec) > 0 {
184+
r.rec, _ = snappy.Decode(nil, r.rec)
185+
}
181186
return true, nil
182187
}
183188
// Only increment i for non-zero records since we use it

wal/reader.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"hash/crc32"
2020
"io"
2121

22+
"github.com/golang/snappy"
2223
"github.com/pkg/errors"
2324
)
2425

@@ -45,7 +46,7 @@ func (r *Reader) Next() bool {
4546
// The last WAL segment record shouldn't be torn(should be full or last).
4647
// The last record would be torn after a crash just before
4748
// the last record part could be persisted to disk.
48-
if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle {
49+
if r.curRecTyp == recFirst || r.curRecTyp == recMiddle {
4950
r.err = errors.New("last record is torn")
5051
}
5152
return false
@@ -68,7 +69,8 @@ func (r *Reader) next() (err error) {
6869
return errors.Wrap(err, "read first header byte")
6970
}
7071
r.total++
71-
r.curRecTyp = recType(hdr[0])
72+
r.curRecTyp = recTypeFromHeader(hdr[0])
73+
compressed := hdr[0]&8 == 8
7274

7375
// Gobble up zero bytes.
7476
if r.curRecTyp == recPageTerm {
@@ -129,6 +131,9 @@ func (r *Reader) next() (err error) {
129131
return err
130132
}
131133
if r.curRecTyp == recLast || r.curRecTyp == recFull {
134+
if compressed && len(r.rec) > 0 {
135+
r.rec, _ = snappy.Decode(nil, r.rec)
136+
}
132137
return nil
133138
}
134139

0 commit comments

Comments
 (0)