@@ -107,13 +107,18 @@ package record
107
107
108
108
import (
109
109
"encoding/binary"
110
+ "encoding/hex"
111
+ "fmt"
110
112
"io"
111
113
"math"
114
+ "strings"
112
115
113
116
"github.com/cockroachdb/errors"
114
117
"github.com/cockroachdb/pebble/internal/base"
118
+ "github.com/cockroachdb/pebble/internal/binfmt"
115
119
"github.com/cockroachdb/pebble/internal/bitflip"
116
120
"github.com/cockroachdb/pebble/internal/crc"
121
+ "github.com/cockroachdb/pebble/internal/treeprinter"
117
122
)
118
123
119
124
// These constants are part of the wire format and should not be changed.
@@ -252,12 +257,33 @@ type Reader struct {
252
257
// encountered during WAL replay was the logical EOF or confirmed corruption.
253
258
invalidOffset uint64
254
259
255
- // loggerForTesting is a logging helper used by the Reader to accumulate log messages.
256
- loggerForTesting loggerForTesting
260
+ // logger is a logging helper used by the Reader to accumulate log messages.
261
+ logger * loggerForTesting
262
+
263
+ // visualLogger is a logging helper used by the Reader to accumulate visual logs.
264
+ visualLogger * visualLoggerForTesting
265
+ }
266
+
267
+ type loggerForTesting struct {
268
+ verbose bool
269
+ builder strings.Builder
270
+ }
271
+
272
+ func (l * loggerForTesting ) logf (format string , args ... interface {}) {
273
+ fmt .Fprintf (& l .builder , format , args ... )
257
274
}
258
275
259
- type loggerForTesting interface {
260
- logf (format string , args ... interface {})
276
+ func (l * loggerForTesting ) getLog () string {
277
+ return l .builder .String ()
278
+ }
279
+
280
+ type visualLoggerForTesting struct {
281
+ verbose bool
282
+ f * binfmt.Formatter
283
+ tp * treeprinter.Node
284
+ blockRoot treeprinter.Node
285
+ blockNode treeprinter.Node
286
+ chunkNode treeprinter.Node
261
287
}
262
288
263
289
// NewReader returns a new reader. If the file contains records encoded using
@@ -425,6 +451,20 @@ func (r *Reader) Next() (io.Reader, error) {
425
451
return singleReader {r , r .seq }, nil
426
452
}
427
453
454
+ func (r * Reader ) InvestigateChunks (verbose bool ) (string , string ) {
455
+ tree := treeprinter .New ()
456
+ r .visualLogger = & visualLoggerForTesting {
457
+ f : binfmt .New (r .buf [:]).LineWidth (20 ),
458
+ tp : & tree ,
459
+ verbose : verbose ,
460
+ }
461
+ r .logger = & loggerForTesting {
462
+ verbose : verbose ,
463
+ }
464
+ _ = r .readAheadForCorruption ()
465
+ return r .visualLogger .tp .String (), r .logger .getLog ()
466
+ }
467
+
428
468
// readAheadForCorruption scans ahead in the log to detect corruption.
429
469
// It loads in blocks and reads chunks until it either detects corruption
430
470
// due to an offset (encoded in a chunk header) exceeding the invalid offset,
@@ -440,19 +480,44 @@ func (r *Reader) Next() (io.Reader, error) {
440
480
// if there is confirmation of a corruption, otherwise ErrUnexpectedEOF is
441
481
// returned after reading all the blocks without corruption confirmation.
442
482
func (r * Reader ) readAheadForCorruption () error {
443
- if r .loggerForTesting != nil {
444
- r .loggerForTesting .logf ("Starting read ahead for corruption. Block corrupted %d.\n " , r .blockNum )
483
+ if r .logger != nil {
484
+ r .logger .logf ("Starting read ahead for corruption. Block corrupted %d.\n " , r .blockNum )
485
+ }
486
+ if r .visualLogger != nil {
487
+ r .visualLogger .blockRoot = r .visualLogger .tp .Child ("Block" )
488
+ }
489
+ getBufferDump := func (buf []byte , i int , j int ) string {
490
+ return fmt .Sprintf ("Buffer Dump: %s\n " , hex .EncodeToString (buf [i :j ]))
491
+ }
492
+
493
+ logMsgAndDump := func (logMsg , bufferDump string ) {
494
+ if r .logger != nil {
495
+ r .logger .logf ("\t %s" , logMsg )
496
+ if r .logger .verbose {
497
+ r .logger .logf ("\t %s" , bufferDump )
498
+ }
499
+ }
500
+ if r .visualLogger != nil {
501
+ r .visualLogger .chunkNode .Child (logMsg )
502
+ if r .visualLogger .verbose {
503
+ r .visualLogger .chunkNode .Child (bufferDump )
504
+ }
505
+ }
506
+ }
507
+
508
+ if r .visualLogger != nil {
509
+ defer r .visualLogger .f .SetAnchorOffset ()
510
+ defer r .visualLogger .f .ToTreePrinter (r .visualLogger .blockRoot )
445
511
}
446
512
447
513
for {
448
514
// Load the next block into r.buf.
449
515
n , err := io .ReadFull (r .r , r .buf [:])
450
516
r .begin , r .end , r .n = 0 , 0 , n
451
517
r .blockNum ++
452
- if r .loggerForTesting != nil {
453
- r .loggerForTesting .logf ("Read block %d with %d bytes\n " , r .blockNum , n )
518
+ if r .logger != nil {
519
+ r .logger .logf ("Read block %d with %d bytes\n " , r .blockNum , n )
454
520
}
455
-
456
521
if errors .Is (err , io .EOF ) {
457
522
// io.ErrUnexpectedEOF is returned instead of
458
523
// io.EOF because io library functions clear
@@ -464,8 +529,8 @@ func (r *Reader) readAheadForCorruption() error {
464
529
// invalid chunk should have been valid, the chunk represents
465
530
// an abrupt, unclean termination of the logical log. This
466
531
// abrupt end of file represented by io.ErrUnexpectedEOF.
467
- if r .loggerForTesting != nil {
468
- r .loggerForTesting .logf ("\t Encountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n " )
532
+ if r .logger != nil {
533
+ r .logger .logf ("\t Encountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n " )
469
534
}
470
535
return io .ErrUnexpectedEOF
471
536
}
@@ -475,93 +540,141 @@ func (r *Reader) readAheadForCorruption() error {
475
540
// However, if the error is not ErrUnexpectedEOF, then this
476
541
// error should be surfaced.
477
542
if err != nil && err != io .ErrUnexpectedEOF {
478
- if r .loggerForTesting != nil {
479
- r .loggerForTesting .logf ("\t Error reading block %d: %v" , r .blockNum , err )
543
+ if r .logger != nil {
544
+ r .logger .logf ("\t Error reading block %d: %v" , r .blockNum , err )
480
545
}
481
546
return err
482
547
}
483
548
549
+ chunkCount := 0
484
550
for r .end + legacyHeaderSize <= r .n {
485
551
checksum := binary .LittleEndian .Uint32 (r .buf [r .end + 0 : r .end + 4 ])
486
552
length := binary .LittleEndian .Uint16 (r .buf [r .end + 4 : r .end + 6 ])
487
553
chunkEncoding := r .buf [r .end + 6 ]
554
+ bufferDump := getBufferDump (r .buf [:], r .end , r .n )
555
+ chunkCount ++
488
556
489
- if r .loggerForTesting != nil {
490
- r .loggerForTesting .logf ("\t Block %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n " , r .blockNum , r .end , checksum , length , chunkEncoding )
557
+ if r .logger != nil {
558
+ r .logger .logf ("\t Block %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n " , r .blockNum , r .end , checksum , length , chunkEncoding )
559
+ }
560
+ if r .visualLogger != nil {
561
+ if chunkCount == 1 {
562
+ r .visualLogger .blockNode = r .visualLogger .blockRoot .Childf ("Block #%d" , r .blockNum )
563
+ }
564
+ r .visualLogger .chunkNode = r .visualLogger .blockNode .Childf ("Chunk #%d at offset %d" , chunkCount , r .end )
565
+ r .visualLogger .chunkNode .Childf ("Checksum: %d" , checksum )
566
+ r .visualLogger .chunkNode .Childf ("Encoded Length: %d" , length )
491
567
}
492
568
493
569
if int (chunkEncoding ) >= len (headerFormatMappings ) {
494
- if r .loggerForTesting != nil {
495
- r .loggerForTesting .logf ("\t Invalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n " , chunkEncoding , r .blockNum )
496
- }
570
+ logMsg := fmt .Sprintf ("Invalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n " , chunkEncoding , r .blockNum )
571
+ logMsgAndDump (logMsg , bufferDump )
497
572
break
498
573
}
499
574
500
575
headerFormat := headerFormatMappings [chunkEncoding ]
501
576
chunkPosition , wireFormat , headerSize := headerFormat .chunkPosition , headerFormat .wireFormat , headerFormat .headerSize
577
+ if r .visualLogger != nil {
578
+ encodingStr := chunkEncodingStr (chunkEncoding )
579
+ r .visualLogger .chunkNode .Childf ("Chunk encoding: %s(%d) (chunkPosition: %d, wireFormat: %d)" , encodingStr , chunkEncoding , chunkPosition , wireFormat )
580
+ }
581
+
502
582
if checksum == 0 && length == 0 && chunkPosition == invalidChunkPosition {
503
- if r .loggerForTesting != nil {
504
- r .loggerForTesting .logf ("\t Found invalid chunk marker at block %d offset %d; aborting this block scan\n " , r .blockNum , r .end )
505
- }
583
+ logMsg := fmt .Sprintf ("Found invalid chunk marker at block %d offset %d; aborting this block scan\n " , r .blockNum , r .end )
584
+ logMsgAndDump (logMsg , bufferDump )
506
585
break
507
586
}
508
587
if wireFormat == invalidWireFormat {
509
- if r .loggerForTesting != nil {
510
- r .loggerForTesting .logf ("\t Invalid wire format detected in block %d at offset %d\n " , r .blockNum , r .end )
511
- }
588
+ logMsg := fmt .Sprintf ("Invalid wire format detected in block %d at offset %d\n " , r .blockNum , r .end )
589
+ logMsgAndDump (logMsg , bufferDump )
512
590
break
513
591
}
514
592
if wireFormat == recyclableWireFormat || wireFormat == walSyncWireFormat {
515
593
if r .end + headerSize > r .n {
516
- if r .loggerForTesting != nil {
517
- r .loggerForTesting .logf ("\t Incomplete header in block %d at offset %d; breaking out\n " , r .blockNum , r .end )
518
- }
594
+ logMsg := fmt .Sprintf ("Incomplete header in block %d at offset %d; breaking out\n " , r .blockNum , r .end )
595
+ logMsgAndDump (logMsg , bufferDump )
519
596
break
520
597
}
521
598
logNum := binary .LittleEndian .Uint32 (r .buf [r .end + 7 : r .end + 11 ])
599
+ if r .visualLogger != nil {
600
+ r .visualLogger .chunkNode .Childf ("Log Num: %d" , logNum )
601
+ }
522
602
if logNum != r .logNum {
523
- if r .loggerForTesting != nil {
524
- r .loggerForTesting .logf ("\t Mismatch log number in block %d at offset %d (expected %d, got %d)\n " , r .blockNum , r .end , r .logNum , logNum )
525
- }
603
+ logMsg := fmt .Sprintf ("Mismatch log number in block %d at offset %d (expected %d, got %d)\n " , r .blockNum , r .end , r .logNum , logNum )
604
+ logMsgAndDump (logMsg , bufferDump )
526
605
break
527
606
}
528
607
}
529
608
530
609
r .begin = r .end + headerSize
531
610
r .end = r .begin + int (length )
611
+ bufferDump = getBufferDump (r .buf [:], r .begin , min (r .end , r .n ))
532
612
if r .end > r .n {
533
613
// The chunk straddles a 32KB boundary (or the end of file).
534
- if r .loggerForTesting != nil {
535
- r .loggerForTesting .logf ("\t Chunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n " , r .blockNum , r .begin , r .end , r .n )
536
- }
614
+ logMsg := fmt .Sprintf ("Chunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n " , r .blockNum , r .begin , r .end , r .n )
615
+ logMsgAndDump (logMsg , bufferDump )
537
616
break
538
617
}
539
618
if checksum != crc .New (r .buf [r .begin - headerSize + 6 :r .end ]).Value () {
540
- if r .loggerForTesting != nil {
541
- r .loggerForTesting .logf ("\t Checksum mismatch in block %d at offset %d; potential corruption\n " , r .blockNum , r .end )
542
- }
619
+ logMsg := fmt .Sprintf ("Checksum mismatch in block %d at offset %d; potential corruption\n " , r .blockNum , r .end )
620
+ logMsgAndDump (logMsg , bufferDump )
543
621
break
544
622
}
545
623
546
624
// Decode offset in header when chunk has the WAL Sync wire format.
547
625
if wireFormat == walSyncWireFormat {
548
626
syncedOffset := binary .LittleEndian .Uint64 (r .buf [r .begin - headerSize + 11 : r .begin - headerSize + 19 ])
549
- if r .loggerForTesting != nil {
550
- r .loggerForTesting .logf ("\t Block %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n " , r .blockNum , syncedOffset , r .invalidOffset )
627
+ if r .visualLogger != nil {
628
+ r .visualLogger .chunkNode .Childf ("Synced Offset: %d" , syncedOffset )
629
+ }
630
+ if r .logger != nil {
631
+ r .logger .logf ("Block %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n " , r .blockNum , syncedOffset , r .invalidOffset )
551
632
}
552
633
// If the encountered chunk offset promises durability beyond the invalid offset,
553
634
// the invalid offset must have been corruption.
554
635
if syncedOffset > r .invalidOffset {
555
- if r .loggerForTesting != nil {
556
- r .loggerForTesting .logf ("\t Corruption confirmed: syncedOffset %d exceeds invalidOffset %d\n " , syncedOffset , r .invalidOffset )
557
- }
636
+ logMsg := fmt .Sprintf ("Corruption confirmed: syncedOffset %d exceeds invalidOffset %d\n " , syncedOffset , r .invalidOffset )
637
+ logMsgAndDump (logMsg , bufferDump )
558
638
return r .err
559
639
}
560
640
}
561
641
}
562
642
}
563
643
}
564
644
645
+ func chunkEncodingStr (encoding byte ) string {
646
+ switch encoding {
647
+ case invalidChunkEncoding :
648
+ return "invalidInvalidChunk"
649
+ case fullChunkEncoding :
650
+ return "legacyFullChunk"
651
+ case firstChunkEncoding :
652
+ return "legacyFirstChunk"
653
+ case middleChunkEncoding :
654
+ return "legacyMiddleChunk"
655
+ case lastChunkEncoding :
656
+ return "legacyLastChunk"
657
+ case recyclableFullChunkEncoding :
658
+ return "recyclableFullChunk"
659
+ case recyclableFirstChunkEncoding :
660
+ return "recyclableFirstChunk"
661
+ case recyclableMiddleChunkEncoding :
662
+ return "recyclableMiddleChunk"
663
+ case recyclableLastChunkEncoding :
664
+ return "recyclableLastChunk"
665
+ case walSyncFullChunkEncoding :
666
+ return "walSyncFullChunk"
667
+ case walSyncFirstChunkEncoding :
668
+ return "walSyncFirstChunk"
669
+ case walSyncMiddleChunkEncoding :
670
+ return "walSyncMiddleChunk"
671
+ case walSyncLastChunkEncoding :
672
+ return "walSyncLastChunk"
673
+ default :
674
+ return "unknown encoding"
675
+ }
676
+ }
677
+
565
678
// Offset returns the current offset within the file. If called immediately
566
679
// before a call to Next(), Offset() will return the record offset.
567
680
func (r * Reader ) Offset () int64 {
0 commit comments