Skip to content

record, tool: log chunk details #4533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 154 additions & 41 deletions record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,18 @@ package record

import (
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"math"
"strings"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/cockroachdb/pebble/internal/bitflip"
"github.com/cockroachdb/pebble/internal/crc"
"github.com/cockroachdb/pebble/internal/treeprinter"
)

// These constants are part of the wire format and should not be changed.
Expand Down Expand Up @@ -252,12 +257,33 @@ type Reader struct {
// encountered during WAL replay was the logical EOF or confirmed corruption.
invalidOffset uint64

// loggerForTesting is a logging helper used by the Reader to accumulate log messages.
loggerForTesting loggerForTesting
// logger is a logging helper used by the Reader to accumulate log messages.
logger *loggerForTesting

// visualLogger is a logging helper used by the Reader to accumulate visual logs.
visualLogger *visualLoggerForTesting
}

type loggerForTesting struct {
verbose bool
builder strings.Builder
}

func (l *loggerForTesting) logf(format string, args ...interface{}) {
fmt.Fprintf(&l.builder, format, args...)
}

type loggerForTesting interface {
logf(format string, args ...interface{})
func (l *loggerForTesting) getLog() string {
return l.builder.String()
}

type visualLoggerForTesting struct {
verbose bool
f *binfmt.Formatter
tp *treeprinter.Node
blockRoot treeprinter.Node
blockNode treeprinter.Node
chunkNode treeprinter.Node
}

// NewReader returns a new reader. If the file contains records encoded using
Expand Down Expand Up @@ -425,6 +451,20 @@ func (r *Reader) Next() (io.Reader, error) {
return singleReader{r, r.seq}, nil
}

func (r *Reader) InvestigateChunks(verbose bool) (string, string) {
tree := treeprinter.New()
r.visualLogger = &visualLoggerForTesting{
f: binfmt.New(r.buf[:]).LineWidth(20),
tp: &tree,
verbose: verbose,
}
r.logger = &loggerForTesting{
verbose: verbose,
}
_ = r.readAheadForCorruption()
return r.visualLogger.tp.String(), r.logger.getLog()
}

// readAheadForCorruption scans ahead in the log to detect corruption.
// It loads in blocks and reads chunks until it either detects corruption
// due to an offset (encoded in a chunk header) exceeding the invalid offset,
Expand All @@ -440,19 +480,44 @@ func (r *Reader) Next() (io.Reader, error) {
// if there is confirmation of a corruption, otherwise ErrUnexpectedEOF is
// returned after reading all the blocks without corruption confirmation.
func (r *Reader) readAheadForCorruption() error {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("Starting read ahead for corruption. Block corrupted %d.\n", r.blockNum)
if r.logger != nil {
r.logger.logf("Starting read ahead for corruption. Block corrupted %d.\n", r.blockNum)
}
if r.visualLogger != nil {
r.visualLogger.blockRoot = r.visualLogger.tp.Child("Block")
}
getBufferDump := func(buf []byte, i int, j int) string {
return fmt.Sprintf("Buffer Dump: %s\n", hex.EncodeToString(buf[i:j]))
}

logMsgAndDump := func(logMsg, bufferDump string) {
if r.logger != nil {
r.logger.logf("\t%s", logMsg)
if r.logger.verbose {
r.logger.logf("\t%s", bufferDump)
}
}
if r.visualLogger != nil {
r.visualLogger.chunkNode.Child(logMsg)
if r.visualLogger.verbose {
r.visualLogger.chunkNode.Child(bufferDump)
}
}
}

if r.visualLogger != nil {
defer r.visualLogger.f.SetAnchorOffset()
defer r.visualLogger.f.ToTreePrinter(r.visualLogger.blockRoot)
}

for {
// Load the next block into r.buf.
n, err := io.ReadFull(r.r, r.buf[:])
r.begin, r.end, r.n = 0, 0, n
r.blockNum++
if r.loggerForTesting != nil {
r.loggerForTesting.logf("Read block %d with %d bytes\n", r.blockNum, n)
if r.logger != nil {
r.logger.logf("Read block %d with %d bytes\n", r.blockNum, n)
}

if errors.Is(err, io.EOF) {
// io.ErrUnexpectedEOF is returned instead of
// io.EOF because io library functions clear
Expand All @@ -464,8 +529,8 @@ func (r *Reader) readAheadForCorruption() error {
// invalid chunk should have been valid, the chunk represents
// an abrupt, unclean termination of the logical log. This
// abrupt end of file represented by io.ErrUnexpectedEOF.
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tEncountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n")
if r.logger != nil {
r.logger.logf("\tEncountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n")
}
return io.ErrUnexpectedEOF
}
Expand All @@ -475,93 +540,141 @@ func (r *Reader) readAheadForCorruption() error {
// However, if the error is not ErrUnexpectedEOF, then this
// error should be surfaced.
if err != nil && err != io.ErrUnexpectedEOF {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tError reading block %d: %v", r.blockNum, err)
if r.logger != nil {
r.logger.logf("\tError reading block %d: %v", r.blockNum, err)
}
return err
}

chunkCount := 0
for r.end+legacyHeaderSize <= r.n {
checksum := binary.LittleEndian.Uint32(r.buf[r.end+0 : r.end+4])
length := binary.LittleEndian.Uint16(r.buf[r.end+4 : r.end+6])
chunkEncoding := r.buf[r.end+6]
bufferDump := getBufferDump(r.buf[:], r.end, r.n)
chunkCount++

if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tBlock %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n", r.blockNum, r.end, checksum, length, chunkEncoding)
if r.logger != nil {
r.logger.logf("\tBlock %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n", r.blockNum, r.end, checksum, length, chunkEncoding)
}
if r.visualLogger != nil {
if chunkCount == 1 {
r.visualLogger.blockNode = r.visualLogger.blockRoot.Childf("Block #%d", r.blockNum)
}
r.visualLogger.chunkNode = r.visualLogger.blockNode.Childf("Chunk #%d at offset %d", chunkCount, r.end)
r.visualLogger.chunkNode.Childf("Checksum: %d", checksum)
r.visualLogger.chunkNode.Childf("Encoded Length: %d", length)
}

if int(chunkEncoding) >= len(headerFormatMappings) {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tInvalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n", chunkEncoding, r.blockNum)
}
logMsg := fmt.Sprintf("Invalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n", chunkEncoding, r.blockNum)
logMsgAndDump(logMsg, bufferDump)
break
}

headerFormat := headerFormatMappings[chunkEncoding]
chunkPosition, wireFormat, headerSize := headerFormat.chunkPosition, headerFormat.wireFormat, headerFormat.headerSize
if r.visualLogger != nil {
encodingStr := chunkEncodingStr(chunkEncoding)
r.visualLogger.chunkNode.Childf("Chunk encoding: %s(%d) (chunkPosition: %d, wireFormat: %d)", encodingStr, chunkEncoding, chunkPosition, wireFormat)
}

if checksum == 0 && length == 0 && chunkPosition == invalidChunkPosition {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tFound invalid chunk marker at block %d offset %d; aborting this block scan\n", r.blockNum, r.end)
}
logMsg := fmt.Sprintf("Found invalid chunk marker at block %d offset %d; aborting this block scan\n", r.blockNum, r.end)
logMsgAndDump(logMsg, bufferDump)
break
}
if wireFormat == invalidWireFormat {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tInvalid wire format detected in block %d at offset %d\n", r.blockNum, r.end)
}
logMsg := fmt.Sprintf("Invalid wire format detected in block %d at offset %d\n", r.blockNum, r.end)
logMsgAndDump(logMsg, bufferDump)
break
}
if wireFormat == recyclableWireFormat || wireFormat == walSyncWireFormat {
if r.end+headerSize > r.n {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tIncomplete header in block %d at offset %d; breaking out\n", r.blockNum, r.end)
}
logMsg := fmt.Sprintf("Incomplete header in block %d at offset %d; breaking out\n", r.blockNum, r.end)
logMsgAndDump(logMsg, bufferDump)
break
}
logNum := binary.LittleEndian.Uint32(r.buf[r.end+7 : r.end+11])
if r.visualLogger != nil {
r.visualLogger.chunkNode.Childf("Log Num: %d", logNum)
}
if logNum != r.logNum {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tMismatch log number in block %d at offset %d (expected %d, got %d)\n", r.blockNum, r.end, r.logNum, logNum)
}
logMsg := fmt.Sprintf("Mismatch log number in block %d at offset %d (expected %d, got %d)\n", r.blockNum, r.end, r.logNum, logNum)
logMsgAndDump(logMsg, bufferDump)
break
}
}

r.begin = r.end + headerSize
r.end = r.begin + int(length)
bufferDump = getBufferDump(r.buf[:], r.begin, min(r.end, r.n))
if r.end > r.n {
// The chunk straddles a 32KB boundary (or the end of file).
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tChunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n", r.blockNum, r.begin, r.end, r.n)
}
logMsg := fmt.Sprintf("Chunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n", r.blockNum, r.begin, r.end, r.n)
logMsgAndDump(logMsg, bufferDump)
break
}
if checksum != crc.New(r.buf[r.begin-headerSize+6:r.end]).Value() {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tChecksum mismatch in block %d at offset %d; potential corruption\n", r.blockNum, r.end)
}
logMsg := fmt.Sprintf("Checksum mismatch in block %d at offset %d; potential corruption\n", r.blockNum, r.end)
logMsgAndDump(logMsg, bufferDump)
break
}

// Decode offset in header when chunk has the WAL Sync wire format.
if wireFormat == walSyncWireFormat {
syncedOffset := binary.LittleEndian.Uint64(r.buf[r.begin-headerSize+11 : r.begin-headerSize+19])
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tBlock %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n", r.blockNum, syncedOffset, r.invalidOffset)
if r.visualLogger != nil {
r.visualLogger.chunkNode.Childf("Synced Offset: %d", syncedOffset)
}
if r.logger != nil {
r.logger.logf("Block %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n", r.blockNum, syncedOffset, r.invalidOffset)
}
// If the encountered chunk offset promises durability beyond the invalid offset,
// the invalid offset must have been corruption.
if syncedOffset > r.invalidOffset {
if r.loggerForTesting != nil {
r.loggerForTesting.logf("\tCorruption confirmed: syncedOffset %d exceeds invalidOffset %d\n", syncedOffset, r.invalidOffset)
}
logMsg := fmt.Sprintf("Corruption confirmed: syncedOffset %d exceeds invalidOffset %d\n", syncedOffset, r.invalidOffset)
logMsgAndDump(logMsg, bufferDump)
return r.err
}
}
}
}
}

func chunkEncodingStr(encoding byte) string {
switch encoding {
case invalidChunkEncoding:
return "invalidInvalidChunk"
case fullChunkEncoding:
return "legacyFullChunk"
case firstChunkEncoding:
return "legacyFirstChunk"
case middleChunkEncoding:
return "legacyMiddleChunk"
case lastChunkEncoding:
return "legacyLastChunk"
case recyclableFullChunkEncoding:
return "recyclableFullChunk"
case recyclableFirstChunkEncoding:
return "recyclableFirstChunk"
case recyclableMiddleChunkEncoding:
return "recyclableMiddleChunk"
case recyclableLastChunkEncoding:
return "recyclableLastChunk"
case walSyncFullChunkEncoding:
return "walSyncFullChunk"
case walSyncFirstChunkEncoding:
return "walSyncFirstChunk"
case walSyncMiddleChunkEncoding:
return "walSyncMiddleChunk"
case walSyncLastChunkEncoding:
return "walSyncLastChunk"
default:
return "unknown encoding"
}
}

// Offset returns the current offset within the file. If called immediately
// before a call to Next(), Offset() will return the record offset.
func (r *Reader) Offset() int64 {
Expand Down
Loading