@@ -23,6 +23,7 @@ import (
2323 "math"
2424 "os"
2525 "path/filepath"
26+ "sort"
2627 "strconv"
2728 "sync"
2829 "time"
@@ -35,9 +36,7 @@ import (
3536)
3637
3738const (
38- version = 1
3939 defaultSegmentSize = 128 * 1024 * 1024 // 128 MB
40- maxRecordSize = 1 * 1024 * 1024 // 1MB
4140 pageSize = 32 * 1024 // 32KB
4241 recordHeaderSize = 7
4342)
@@ -94,7 +93,6 @@ func (e *CorruptionErr) Error() string {
9493
9594// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
9695func OpenWriteSegment (dir string , k int ) (* Segment , error ) {
97- // Only .active segments are allowed to be opened for write.
9896 f , err := os .OpenFile (SegmentName (dir , k ), os .O_WRONLY | os .O_APPEND , 0666 )
9997 if err != nil {
10098 return nil , err
@@ -127,7 +125,7 @@ func CreateSegment(dir string, k int) (*Segment, error) {
127125 return & Segment {File : f , i : k , dir : dir }, nil
128126}
129127
130- // OpenReadSegment opens the segment k in dir for reading .
128+ // OpenReadSegment opens the segment with the given filename .
131129func OpenReadSegment (fn string ) (* Segment , error ) {
132130 k , err := strconv .Atoi (filepath .Base (fn ))
133131 if err != nil {
@@ -142,7 +140,7 @@ func OpenReadSegment(fn string) (*Segment, error) {
142140
143141// WAL is a write ahead log that stores records in segment files.
144142// It must be read from start to end once before logging new data.
145- // If an errore occurs during read, the repair procedure must be called
143+ // If an erroe occurs during read, the repair procedure must be called
146144// before it's safe to do further writes.
147145//
148146// Segments are written to in pages of 32KB, with records possibly split
@@ -244,23 +242,19 @@ Loop:
244242 case f := <- w .actorc :
245243 f ()
246244 case donec := <- w .stopc :
245+ close (w .actorc )
247246 defer close (donec )
248247 break Loop
249248 }
250249 }
251250 // Drain and process any remaining functions.
252- for {
253- select {
254- case f := <- w .actorc :
255- f ()
256- default :
257- return
258- }
251+ for f := range w .actorc {
252+ f ()
259253 }
260254}
261255
262256// Repair attempts to repair the WAL based on the error.
263- // It discards all data behind the corruption
257+ // It discards all data after the corruption.
264258func (w * WAL ) Repair (err error ) error {
265259 // We could probably have a mode that only discards torn records right around
266260 // the corruption to preserve as data much as possible.
@@ -333,7 +327,7 @@ func (w *WAL) Repair(err error) error {
333327
334328// SegmentName builds a segment name for the directory.
335329func SegmentName (dir string , i int ) string {
336- return filepath .Join (dir , fmt .Sprintf ("%06d " , i ))
330+ return filepath .Join (dir , fmt .Sprintf ("%08d " , i ))
337331}
338332
339333// nextSegment creates the next segment and closes the previous one.
@@ -384,6 +378,7 @@ func (w *WAL) flushPage(clear bool) error {
384378 }
385379 p .flushed += n
386380
381+ // We flushed an entire page, prepare a new one.
387382 if clear {
388383 for i := range p .buf {
389384 p .buf [i ] = 0
@@ -485,7 +480,7 @@ func (w *WAL) log(rec []byte, final bool) error {
485480 binary .BigEndian .PutUint16 (buf [1 :], uint16 (len (part )))
486481 binary .BigEndian .PutUint32 (buf [3 :], crc )
487482
488- copy (buf [7 :], part )
483+ copy (buf [recordHeaderSize :], part )
489484 p .alloc += len (part ) + recordHeaderSize
490485
491486 // If we wrote a full record, we can fit more records of the batch
@@ -587,6 +582,9 @@ func listSegments(dir string) (refs []segmentRef, err error) {
587582 refs = append (refs , segmentRef {s : fn , n : k })
588583 last = k
589584 }
585+ sort .Slice (refs , func (i , j int ) bool {
586+ return refs [i ].n < refs [j ].n
587+ })
590588 return refs , nil
591589}
592590
@@ -667,10 +665,6 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
667665 // Only unset more so we don't invalidate the current segment and
668666 // offset before the next read.
669667 r .more = false
670- // If no more segments are left, it's the end for the reader.
671- if len (r .segs ) == 0 {
672- return n , io .EOF
673- }
674668 return n , nil
675669}
676670
@@ -689,7 +683,7 @@ func NewReader(r io.Reader) *Reader {
689683}
690684
691685// Next advances the reader to the next records and returns true if it exists.
692- // It must not be called once after it returned false.
686+ // It must not be called again after it returned false.
693687func (r * Reader ) Next () bool {
694688 err := r .next ()
695689 if errors .Cause (err ) == io .EOF {
@@ -702,8 +696,8 @@ func (r *Reader) Next() bool {
702696func (r * Reader ) next () (err error ) {
703697 // We have to use r.buf since allocating byte arrays here fails escape
704698 // analysis and ends up on the heap, even though it seemingly should not.
705- hdr := r .buf [:7 ]
706- buf := r .buf [7 :]
699+ hdr := r .buf [:recordHeaderSize ]
700+ buf := r .buf [recordHeaderSize :]
707701
708702 r .rec = r .rec [:0 ]
709703
0 commit comments