Skip to content
This repository was archived by the owner on Aug 13, 2019. It is now read-only.

Commit 876a391

Browse files
author
Fabian Reinartz
committed
Address comments
Signed-off-by: Fabian Reinartz <[email protected]>
1 parent 70df46a commit 876a391

File tree

4 files changed

+37
-35
lines changed

4 files changed

+37
-35
lines changed

checkpoint.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ type CheckpointStats struct {
3535
DroppedSeries int
3636
DroppedSamples int
3737
DroppedTombstones int
38-
TotalSeries int
39-
TotalSamples int
40-
TotalTombstones int
38+
TotalSeries int // Processed series including dropped ones.
39+
TotalSamples int // Processed samples inlcuding dropped ones.
40+
TotalTombstones int // Processed tombstones including droppes ones.
4141
}
4242

4343
// LastCheckpoint returns the directory name of the most recent checkpoint.
@@ -129,16 +129,16 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
129129
sr = last
130130
}
131131

132-
segs, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
132+
segsr, err := wal.NewSegmentsRangeReader(w.Dir(), m, n)
133133
if err != nil {
134134
return nil, errors.Wrap(err, "create segment reader")
135135
}
136-
defer segs.Close()
136+
defer segsr.Close()
137137

138138
if sr != nil {
139-
sr = io.MultiReader(sr, segs)
139+
sr = io.MultiReader(sr, segsr)
140140
} else {
141-
sr = segs
141+
sr = segsr
142142
}
143143
}
144144

@@ -169,7 +169,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
169169

170170
// We don't reset the buffer since we batch up multiple records
171171
// before writing them to the checkpoint.
172-
// Remember where the record for this iteration starts.
172+
// Remember where the record for this iteration starts.
173173
start := len(buf)
174174
rec := r.Record()
175175

docs/format/wal.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ e.g. `000000`, `000001`, `000002`, etc., and are limited to 128MB by default.
55
A segment is written to in pages of 32KB. Only the last page of the most recent segment
66
may be partial. A WAL record is an opaque byte slice that gets split up into sub-records
77
should it exceed the remaining space of the current page. Records are never split across
8-
segment boundaries.
9-
The encoding of pages is largely borrowed from [LevelDB's/RocksDB's wirte ahead log.][1]
8+
segment boundaries. If a single record exceeds the default segment size, a segment with
9+
a larger size will be created.
10+
The encoding of pages is largely borrowed from [LevelDB's/RocksDB's write ahead log.][1]
1011

1112
Notable deviations are that the record fragment is encoded as:
1213

14+
```
1315
┌───────────┬──────────┬────────────┬──────────────┐
1416
│ type <1b> │ len <2b> │ CRC32 <4b> │ data <bytes> │
1517
└───────────┴──────────┴────────────┴──────────────┘
18+
```
1619

1720
## Record encoding
1821

@@ -22,6 +25,7 @@ The records written to the write ahead log are encoded as follows:
2225

2326
Series records encode the labels that identifier a series and its unique ID.
2427

28+
```
2529
┌────────────────────────────────────────────┐
2630
│ type = 1 <1b> │
2731
├────────────────────────────────────────────┤
@@ -36,12 +40,14 @@ Series records encode the labels that identifier a series and its unique ID.
3640
│ └───────────────────────┴────────────────┘ │
3741
│ . . . │
3842
└────────────────────────────────────────────┘
43+
```
3944

4045
### Sample records
4146

4247
Sample records encode samples as a list of triples `(series_id, timestamp, value)`.
4348
Series reference and timestamp are encoded as deltas w.r.t the first sample.
4449

50+
```
4551
┌──────────────────────────────────────────────────────────────────┐
4652
│ type = 2 <1b> │
4753
├──────────────────────────────────────────────────────────────────┤
@@ -53,13 +59,14 @@ Series reference and timestamp are encoded as deltas w.r.t the first sample.
5359
│ └────────────────────┴───────────────────────────┴─────────────┘ │
5460
│ . . . │
5561
└──────────────────────────────────────────────────────────────────┘
62+
```
5663

5764
### Tombstone records
5865

5966
Tombstone records encode tombstones as a list of triples `(series_id, min_time, max_time)`
6067
and specify an interval for which samples of a series got deleted.
6168

62-
69+
```
6370
┌─────────────────────────────────────────────────────┐
6471
│ type = 3 <1b> │
6572
├─────────────────────────────────────────────────────┤
@@ -68,5 +75,6 @@ and specify an interval for which samples of a series got deleted.
6875
│ └─────────┴───────────────────┴───────────────────┘ │
6976
│ . . . │
7077
└─────────────────────────────────────────────────────┘
78+
```
7179

72-
[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format]
80+
[1][https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log-File-Format]

head.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func (h *Head) Truncate(mint int64) error {
438438
return nil // no segments yet.
439439
}
440440
// The lower third of segments should contain mostly obsolete samples.
441-
// If we have too few segments, it's not worth checkpointing yet.
441+
// If we have less than three segments, it's not worth checkpointing yet.
442442
n = m + (n-m)/3
443443
if n <= m {
444444
return nil

wal/wal.go

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"math"
2424
"os"
2525
"path/filepath"
26+
"sort"
2627
"strconv"
2728
"sync"
2829
"time"
@@ -35,9 +36,7 @@ import (
3536
)
3637

3738
const (
38-
version = 1
3939
defaultSegmentSize = 128 * 1024 * 1024 // 128 MB
40-
maxRecordSize = 1 * 1024 * 1024 // 1MB
4140
pageSize = 32 * 1024 // 32KB
4241
recordHeaderSize = 7
4342
)
@@ -94,7 +93,6 @@ func (e *CorruptionErr) Error() string {
9493

9594
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
9695
func OpenWriteSegment(dir string, k int) (*Segment, error) {
97-
// Only .active segments are allowed to be opened for write.
9896
f, err := os.OpenFile(SegmentName(dir, k), os.O_WRONLY|os.O_APPEND, 0666)
9997
if err != nil {
10098
return nil, err
@@ -127,7 +125,7 @@ func CreateSegment(dir string, k int) (*Segment, error) {
127125
return &Segment{File: f, i: k, dir: dir}, nil
128126
}
129127

130-
// OpenReadSegment opens the segment k in dir for reading.
128+
// OpenReadSegment opens the segment with the given filename.
131129
func OpenReadSegment(fn string) (*Segment, error) {
132130
k, err := strconv.Atoi(filepath.Base(fn))
133131
if err != nil {
@@ -142,7 +140,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
142140

143141
// WAL is a write ahead log that stores records in segment files.
144142
// It must be read from start to end once before logging new data.
145-
// If an errore occurs during read, the repair procedure must be called
143+
// If an erroe occurs during read, the repair procedure must be called
146144
// before it's safe to do further writes.
147145
//
148146
// Segments are written to in pages of 32KB, with records possibly split
@@ -244,23 +242,19 @@ Loop:
244242
case f := <-w.actorc:
245243
f()
246244
case donec := <-w.stopc:
245+
close(w.actorc)
247246
defer close(donec)
248247
break Loop
249248
}
250249
}
251250
// Drain and process any remaining functions.
252-
for {
253-
select {
254-
case f := <-w.actorc:
255-
f()
256-
default:
257-
return
258-
}
251+
for f := range w.actorc {
252+
f()
259253
}
260254
}
261255

262256
// Repair attempts to repair the WAL based on the error.
263-
// It discards all data behind the corruption
257+
// It discards all data after the corruption.
264258
func (w *WAL) Repair(err error) error {
265259
// We could probably have a mode that only discards torn records right around
266260
// the corruption to preserve as data much as possible.
@@ -333,7 +327,7 @@ func (w *WAL) Repair(err error) error {
333327

334328
// SegmentName builds a segment name for the directory.
335329
func SegmentName(dir string, i int) string {
336-
return filepath.Join(dir, fmt.Sprintf("%06d", i))
330+
return filepath.Join(dir, fmt.Sprintf("%08d", i))
337331
}
338332

339333
// nextSegment creates the next segment and closes the previous one.
@@ -384,6 +378,7 @@ func (w *WAL) flushPage(clear bool) error {
384378
}
385379
p.flushed += n
386380

381+
// We flushed an entire page, prepare a new one.
387382
if clear {
388383
for i := range p.buf {
389384
p.buf[i] = 0
@@ -485,7 +480,7 @@ func (w *WAL) log(rec []byte, final bool) error {
485480
binary.BigEndian.PutUint16(buf[1:], uint16(len(part)))
486481
binary.BigEndian.PutUint32(buf[3:], crc)
487482

488-
copy(buf[7:], part)
483+
copy(buf[recordHeaderSize:], part)
489484
p.alloc += len(part) + recordHeaderSize
490485

491486
// If we wrote a full record, we can fit more records of the batch
@@ -587,6 +582,9 @@ func listSegments(dir string) (refs []segmentRef, err error) {
587582
refs = append(refs, segmentRef{s: fn, n: k})
588583
last = k
589584
}
585+
sort.Slice(refs, func(i, j int) bool {
586+
return refs[i].n < refs[j].n
587+
})
590588
return refs, nil
591589
}
592590

@@ -667,10 +665,6 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
667665
// Only unset more so we don't invalidate the current segment and
668666
// offset before the next read.
669667
r.more = false
670-
// If no more segments are left, it's the end for the reader.
671-
if len(r.segs) == 0 {
672-
return n, io.EOF
673-
}
674668
return n, nil
675669
}
676670

@@ -689,7 +683,7 @@ func NewReader(r io.Reader) *Reader {
689683
}
690684

691685
// Next advances the reader to the next records and returns true if it exists.
692-
// It must not be called once after it returned false.
686+
// It must not be called again after it returned false.
693687
func (r *Reader) Next() bool {
694688
err := r.next()
695689
if errors.Cause(err) == io.EOF {
@@ -702,8 +696,8 @@ func (r *Reader) Next() bool {
702696
func (r *Reader) next() (err error) {
703697
// We have to use r.buf since allocating byte arrays here fails escape
704698
// analysis and ends up on the heap, even though it seemingly should not.
705-
hdr := r.buf[:7]
706-
buf := r.buf[7:]
699+
hdr := r.buf[:recordHeaderSize]
700+
buf := r.buf[recordHeaderSize:]
707701

708702
r.rec = r.rec[:0]
709703

0 commit comments

Comments
 (0)