Skip to content

Dev/robin/10530 replicate logs races with sealer regression test #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cfglogging.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package veracity
import (
"github.com/datatrails/go-datatrails-common/logger"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
)

// cfgLogging establishes the logger
@@ -12,7 +13,14 @@ func cfgLogging(cmd *CmdCtx, cCtx *cli.Context) error {
if logLevel == "" {
logLevel = "INFO"
}
logger.New(logLevel, logger.WithConsole())
cmd.log = logger.Sugar
if logLevel == "NOOP" {
cmd.log = &logger.WrappedLogger{
SugaredLogger: zap.NewNop().Sugar(),
}
} else {
logger.New(logLevel, logger.WithConsole())
cmd.log = logger.Sugar
}

return nil
}
84 changes: 67 additions & 17 deletions replicatelogs.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"time"

"github.com/datatrails/go-datatrails-common/cbor"
"github.com/datatrails/go-datatrails-common/cose"
"github.com/datatrails/go-datatrails-common/logger"
"github.com/datatrails/go-datatrails-merklelog/massifs"
"github.com/datatrails/go-datatrails-merklelog/massifs/watcher"
@@ -38,17 +39,35 @@ const (
)

var (
ErrChangesFlagIsExclusive = errors.New("use --changes Or --massif and --tenant, not both")
ErrNewReplicaNotEmpty = errors.New("the local directory for a new replica already exists")
ErrSealNotFound = errors.New("seal not found")
ErrSealVerifyFailed = errors.New("the seal signature verification failed")
ErrFailedCheckingConsistencyProof = errors.New("failed to check a consistency proof")
ErrFailedToCreateReplicaDir = errors.New("failed to create a directory needed for local replication")
ErrRequiredOption = errors.New("a required option was not provided")
ErrRemoteLogTruncated = errors.New("the local replica indicates the remote log has been truncated")
ErrRemoteLogInconsistentRootState = errors.New("the local replica root state disagrees with the remote")
ErrChangesFlagIsExclusive = errors.New("use --changes Or --massif and --tenant, not both")
ErrNewReplicaNotEmpty = errors.New("the local directory for a new replica already exists")
ErrSealNotFound = errors.New("seal not found")
ErrSealVerifyFailed = errors.New("the seal signature verification failed")
ErrFailedCheckingConsistencyProof = errors.New("failed to check a consistency proof")
ErrFailedToCreateReplicaDir = errors.New("failed to create a directory needed for local replication")
ErrRequiredOption = errors.New("a required option was not provided")
ErrRemoteLogTruncated = errors.New("the local replica indicates the remote log has been truncated")
ErrRemoteLogInconsistentRootState = errors.New("the local replica root state disagrees with the remote")
ErrInconsistentUseOfPrefetchedSeal = errors.New("prefetching signed root reader used inconsistently")
)

// prefetchingSealReader pre-fetches the seal for the massif to avoid racing with the
// sealer. If the massif is read first, the log can grow and a a new seal can
// be applied to the *longer* log. At which point the previously read copy of
// the massif will be "to short" for the seal.
// See Bug#10530
type prefetchingSealReader struct {
msg *cose.CoseSign1Message
state massifs.MMRState
tenantIdentity string
massifIndex uint32
}

type changeCollector struct {
log logger.Logger
watchOutput string
}

// NewReplicateLogsCmd updates a local replica of a remote log, verifying the mutual consistency of the two before making any changes.
//
//nolint:gocognit
@@ -266,6 +285,7 @@ type VerifiedReplica struct {
writeOpener massifs.WriteAppendOpener
localReader massifs.ReplicaReader
remoteReader MassifReader
rootReader massifs.SealGetter
cborCodec cbor.CBORCodec
}

@@ -324,7 +344,6 @@ func NewVerifiedReplica(

remoteReader := massifs.NewMassifReader(
logger.Sugar, reader,
massifs.WithSealGetter(&cmd.rootReader),
)

return &VerifiedReplica{
@@ -333,6 +352,7 @@ func NewVerifiedReplica(
writeOpener: NewFileWriteOpener(),
localReader: &localReader,
remoteReader: &remoteReader,
rootReader: &cmd.rootReader,
cborCodec: cmd.cborCodec,
}, nil
}
@@ -441,10 +461,20 @@ func (v *VerifiedReplica) ReplicateVerifiedUpdates(

for i := startMassif; i <= endMassif; i++ {

remoteVerifyOpts := []massifs.ReaderOption{massifs.WithCBORCodec(v.cborCodec)}
// Note: we have to fetch the seal before the massif, otherwise we can lose a rase with the builder
// See bug#10530
remoteSealReader, err := NewPrefetchingSealReader(ctx, v.rootReader, tenantIdentity, i)
if err != nil {
return err
}
remoteVerifyOpts := []massifs.ReaderOption{
massifs.WithCBORCodec(v.cborCodec),
massifs.WithSealGetter(remoteSealReader),
}
if local != nil {
var baseState massifs.MMRState
// Promote the trusted base state to a V1 state if it is a V0 state.
baseState, err := trustedBaseState(local)
baseState, err = trustedBaseState(local)
if err != nil {
return err
}
@@ -595,11 +625,6 @@ func peakBaggedRoot(state massifs.MMRState) []byte {
return mmr.HashPeaksRHS(sha256.New(), state.Peaks)
}

type changeCollector struct {
log logger.Logger
watchOutput string
}

func (c *changeCollector) Logf(msg string, args ...any) {
if c.log == nil {
return
@@ -677,3 +702,28 @@ func readTenantMassifChanges(ctx context.Context, cCtx *cli.Context, cmd *CmdCtx
// No explicit config and --all not set, read from stdin
return stdinToDecodedTenantMassifs()
}

func NewPrefetchingSealReader(ctx context.Context, sealGetter massifs.SealGetter, tenantIdentity string, massifIndex uint32) (*prefetchingSealReader, error) {

msg, state, err := sealGetter.GetSignedRoot(ctx, tenantIdentity, massifIndex)
if err != nil {
return nil, err
}
reader := prefetchingSealReader{
msg: msg,
state: state,
tenantIdentity: tenantIdentity,
massifIndex: massifIndex,
}
return &reader, nil
}

func (r *prefetchingSealReader) GetSignedRoot(ctx context.Context, tenantIdentity string, massifIndex uint32, opts ...massifs.ReaderOption) (*cose.CoseSign1Message, massifs.MMRState, error) {
if tenantIdentity != r.tenantIdentity {
return nil, massifs.MMRState{}, fmt.Errorf("%w: tenant requested: %s, tenant prefetched: %s", ErrInconsistentUseOfPrefetchedSeal, tenantIdentity, r.tenantIdentity)
}
if massifIndex != r.massifIndex {
return nil, massifs.MMRState{}, fmt.Errorf("%w: massif requested: %d, massif prefetched: %d", ErrInconsistentUseOfPrefetchedSeal, massifIndex, r.massifIndex)
}
return r.msg, r.state, nil
}
105 changes: 105 additions & 0 deletions tests/replicatelogs/replicatelogs_azurite_test.go
Original file line number Diff line number Diff line change
@@ -7,11 +7,13 @@ import (
"crypto/elliptic"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"testing"
"time"

"github.com/datatrails/go-datatrails-common/logger"
"github.com/datatrails/go-datatrails-merklelog/massifs"
@@ -43,6 +45,109 @@ func mustHashFile(t *testing.T, filename string) []byte {
return hash
}

// TestRegression10530 covers the case where the seal fetched for the massif is
// ahead of the masssif. Essentially, this can only happen where the seal is
// read *after* the massif, and between reading the massif and generating the
// seal that was fetched, more items are added to the massif. Thus the massif
// data fetched does not containe all the items that are covered by the seal.
//
// NOTICE: this test is unavoidably *theoretically* flaky on the PASS side.
// In that it can fail to trigger the race condition and so PASS.
// However, If the test is succsfull in triggering the race condition, it will
// always FAIL.
// Further, in development against the known broken code, we have never seen
// this test take more than 3 attempts to cause the race As configured
//
// The implication of this is that there is a small chance this test will not
// imediately catch the regresion, but soon after. This means that on a FAIL
// from this test, it is 100% a return of the race condition and should be
// investigated. There is a tiny chance the race may have been re-introduced by
// an earlier change than is associated with the fail.
func (s *ReplicateLogsCmdSuite) TestRegression10530() {

tests := []struct {
name string
// attempts mitigates the inherent flakyness of detecting a race bug
attempts int
massifHeight uint8
leafBatch int
batchCount int
activeMassif string
}{
// note: we only need a single massif to catch the race condition. and
// having more makes it much harder to configure the replication run.
// these numbers are tuned to balance run time against the reliability
// of catching the error. A pass unfortunately takes 10's of seconds
{"one by one", 3, 14, 1, 250, "0"},
}
key := massifs.TestGenerateECKey(s.T(), elliptic.P256())
tc := massifs.NewLocalMassifReaderTestContext(s.T(), logger.Sugar, "TestRegression10530")
tenantId0 := tc.G.NewTenantIdentity()
tc.AzuriteContext.DeleteBlobsByPrefix(massifs.TenantMassifPrefix(tenantId0))

for _, tt := range tests {

s.Run(tt.name, func() {

for attempt := range tt.attempts {

ctx, cancel := context.WithCancel(context.Background())
go func(cancel context.CancelFunc, massifHeight uint8, leafBatch, batchcount int) {
defer cancel()

for range batchcount {
tc.AddLeavesToLog(
tenantId0, massifHeight, leafBatch,
massifs.TestWithSealKey(&key),
)
}
}(cancel, tt.massifHeight, tt.leafBatch, tt.batchCount)

// Replicate the log
// note: VERACITY_IKWID is set in main, we need it to enable --envauth so we force it here
app := veracity.NewApp("tests", true)
veracity.AddCommands(app, true)

replicaDir := s.T().TempDir()
veracityRuns := 1
done := false
for !done {

err := app.Run([]string{
"veracity",
"--loglevel", "NOOP", // sets the zap noop logger which avoids a race with our logging package.
"--envauth", // uses the emulator
"--container", tc.TestConfig.Container,
"--data-url", s.Env.AzuriteVerifiableDataURL,
"--tenant", tenantId0,
"--height", fmt.Sprintf("%d", tt.massifHeight),
"replicate-logs",
// "--ancestors", fmt.Sprintf("%d", tt.ancestors),
"--replicadir", replicaDir,
"--massif", tt.activeMassif,
})
if err != nil {
// We want to fatal out on the first instalce of state size exceeds data
if errors.Is(err, massifs.ErrStateSizeExceedsData) {
s.T().Fatalf("seal race detected on run %d, in attempt %d. %v", veracityRuns, attempt, err)
}
fmt.Printf("run %d: %v\n", veracityRuns, err)
}
time.Sleep(1 * time.Second)

select {
case <-ctx.Done():
done = true
default:
continue
}
veracityRuns++
}
}
})
}
}

// TestReplicateMassifUpdate ensures that an extension to a previously replicated
// massif is handled correctly
func (s *ReplicateLogsCmdSuite) TestReplicateMassifUpdate() {
Loading