Skip to content

Commit f91ebc1

Browse files
authored
feature: replay consensus wal messages (#2828)
* feature: implement WAL recovery mechanism for consensus
1 parent 70cc8aa commit f91ebc1

30 files changed

+672
-141
lines changed

consensus/db/buckets_consensus.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type BucketConsensus byte
1111
// keys like Bolt or MDBX does. We use a global prefix list as a poor
1212
// man's bucket alternative.
1313
const (
14-
WALEntry BucketConsensus = iota // key: WAL_prefix + Height + MsgIndex. Val: Encoded Tendermint consensus message.
14+
WALEntryBucket BucketConsensus = iota // key: WAL_prefix + Height + MsgIndex. Val: Encoded Tendermint consensus message.
1515
)
1616

1717
// Key flattens a prefix and series of byte arrays into a single []byte.

consensus/db/buckets_consensus_enumer.go

Lines changed: 74 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consensus/db/db.go

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ const NumBytesForHeight = 4
1414
// walMsgCount tracks the number of wal entries at the current height
1515
type walMsgCount uint32
1616

17-
type walEntry[V types.Hashable[H], H types.Hash, A types.Addr] struct {
17+
type WalEntry[V types.Hashable[H], H types.Hash, A types.Addr] struct {
1818
Type types.MessageType `cbor:"type"`
1919
Entry types.Message[V, H, A] `cbor:"data"` // cbor serialised Msg or Timeout
2020
}
2121

22-
func (w *walEntry[V, H, A]) UnmarshalCBOR(data []byte) error {
22+
func (w *WalEntry[V, H, A]) UnmarshalCBOR(data []byte) error {
2323
var wrapperType struct {
2424
Type types.MessageType `cbor:"type"`
2525
RawData cbor.RawMessage `cbor:"data"` // Golang can't unmarshal to an interface
@@ -60,15 +60,31 @@ func (w *walEntry[V, H, A]) UnmarshalCBOR(data []byte) error {
6060
}
6161

6262
// TendermintDB defines the methods for interacting with the Tendermint WAL database.
63+
//
64+
// The purpose of the WAL is to record any event that may result in a state change.
65+
// These events fall into the following categories:
66+
// 1. Incoming messages. We do not need to store outgoing messages.
67+
// 2. When we propose a value.
68+
// 3. When a timeout is triggered (not when it is scheduled).
69+
// The purpose of the WAL is to allow the node to recover the state it was in before the crash.
70+
// No new messages should be broadcast during replay.
71+
//
72+
// We commit the WAL to disk when:
73+
// 1. We start a new round
74+
// 2. Right before we broadcast a message
75+
//
76+
// We call Delete when we start a new height and commit a block
77+
//
78+
//go:generate mockgen -destination=../mocks/mock_db.go -package=mocks github.com/NethermindEth/juno/consensus/db TendermintDB
6379
type TendermintDB[V types.Hashable[H], H types.Hash, A types.Addr] interface {
64-
// CommitBatch writes the accumulated batch operations to the underlying database.
65-
CommitBatch() error
66-
// GetWALMsgs retrieves all WAL messages (consensus messages and timeouts) stored for a given height from the database.
67-
GetWALMsgs(height types.Height) ([]walEntry[V, H, A], error)
80+
// Flush writes the accumulated batch operations to the underlying database.
81+
Flush() error
82+
// GetWALEntries retrieves all WAL messages (consensus messages and timeouts) stored for a given height from the database.
83+
GetWALEntries(height types.Height) ([]WalEntry[V, H, A], error)
6884
// SetWALEntry schedules the storage of a WAL message in the batch.
6985
SetWALEntry(entry types.Message[V, H, A]) error
70-
// DeleteWALMsgs schedules the deletion of all WAL messages for a specific height in the batch.
71-
DeleteWALMsgs(height types.Height) error
86+
// DeleteWALEntries schedules the deletion of all WAL messages for a specific height in the batch.
87+
DeleteWALEntries(height types.Height) error
7288
}
7389

7490
// tendermintDB provides database access for Tendermint consensus state.
@@ -92,8 +108,8 @@ func NewTendermintDB[V types.Hashable[H], H types.Hash, A types.Addr](db db.KeyV
92108
return &tmdb
93109
}
94110

95-
// CommitBatch implements TMDBInterface.
96-
func (s *tendermintDB[V, H, A]) CommitBatch() error {
111+
// Flush implements TMDBInterface.
112+
func (s *tendermintDB[V, H, A]) Flush() error {
97113
if err := s.batch.Write(); err != nil {
98114
return err
99115
}
@@ -105,7 +121,7 @@ func (s *tendermintDB[V, H, A]) CommitBatch() error {
105121
// getWALCount scans the DB for the number of WAL messages at a given height.
106122
// It panics if the DB scan fails.
107123
func (s *tendermintDB[V, H, A]) getWALCount(height types.Height) walMsgCount {
108-
prefix := WALEntry.Key(encodeHeight(height))
124+
prefix := WALEntryBucket.Key(encodeHeight(height))
109125
count := walMsgCount(0)
110126
err := s.db.View(func(snap db.Snapshot) error {
111127
defer snap.Close()
@@ -127,16 +143,16 @@ func (s *tendermintDB[V, H, A]) getWALCount(height types.Height) walMsgCount {
127143
return count
128144
}
129145

130-
// DeleteWALMsgs iterates through the expected message keys based on the stored count.
131-
// Note: This operates on the batch. Changes are only persisted after CommitBatch() is called.
132-
func (s *tendermintDB[V, H, A]) DeleteWALMsgs(height types.Height) error {
146+
// DeleteWALEntries iterates through the expected message keys based on the stored count.
147+
// Note: This operates on the batch. Changes are only persisted after Flush() is called.
148+
func (s *tendermintDB[V, H, A]) DeleteWALEntries(height types.Height) error {
133149
heightBytes := encodeHeight(height)
134150
startIterBytes := encodeNumMsgsAtHeight(walMsgCount(1))
135151

136-
startKey := WALEntry.Key(heightBytes, startIterBytes)
137-
endKey := WALEntry.Key(encodeHeight(height + 1))
152+
startKey := WALEntryBucket.Key(heightBytes, startIterBytes)
153+
endKey := WALEntryBucket.Key(encodeHeight(height + 1))
138154
if err := s.batch.DeleteRange(startKey, endKey); err != nil {
139-
return fmt.Errorf("DeleteWALMsgs: failed to add delete range [%x, %x) to batch: %w", startKey, endKey, err)
155+
return fmt.Errorf("DeleteWALEntries: failed to add delete range [%x, %x) to batch: %w", startKey, endKey, err)
140156
}
141157

142158
delete(s.walCount, height)
@@ -145,7 +161,7 @@ func (s *tendermintDB[V, H, A]) DeleteWALMsgs(height types.Height) error {
145161

146162
// SetWALEntry implements TMDBInterface.
147163
func (s *tendermintDB[V, H, A]) SetWALEntry(entry types.Message[V, H, A]) error {
148-
wrapper := walEntry[V, H, A]{
164+
wrapper := WalEntry[V, H, A]{
149165
Type: entry.MsgType(),
150166
Entry: entry,
151167
}
@@ -160,7 +176,7 @@ func (s *tendermintDB[V, H, A]) SetWALEntry(entry types.Message[V, H, A]) error
160176
}
161177
nextNumMsgsAtHeight := numMsgsAtHeight + 1
162178

163-
msgKey := WALEntry.Key(encodeHeight(height), encodeNumMsgsAtHeight(nextNumMsgsAtHeight))
179+
msgKey := WALEntryBucket.Key(encodeHeight(height), encodeNumMsgsAtHeight(nextNumMsgsAtHeight))
164180
if err := s.batch.Put(msgKey, wrappedEntry); err != nil {
165181
return fmt.Errorf("writeWALEntryToBatch: failed to set MsgsAtHeight: %w", err)
166182
}
@@ -169,14 +185,14 @@ func (s *tendermintDB[V, H, A]) SetWALEntry(entry types.Message[V, H, A]) error
169185
return nil
170186
}
171187

172-
// GetWALMsgs implements TMDBInterface.
173-
func (s *tendermintDB[V, H, A]) GetWALMsgs(height types.Height) ([]walEntry[V, H, A], error) {
188+
// GetWALEntries implements TMDBInterface.
189+
func (s *tendermintDB[V, H, A]) GetWALEntries(height types.Height) ([]WalEntry[V, H, A], error) {
174190
numEntries := s.walCount[height]
175-
walMsgs := make([]walEntry[V, H, A], numEntries)
191+
walMsgs := make([]WalEntry[V, H, A], numEntries)
176192
if numEntries == 0 {
177193
return walMsgs, nil
178194
}
179-
startKey := WALEntry.Key(encodeHeight(height))
195+
startKey := WALEntryBucket.Key(encodeHeight(height))
180196
err := s.db.View(func(snap db.Snapshot) error {
181197
defer snap.Close()
182198
iter, err := snap.NewIterator(startKey, true)

consensus/db/db_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestWALLifecycle(t *testing.T) {
6666
}
6767
timeoutMsg := types.Timeout{Height: testHeight, Round: testRound, Step: testStep}
6868

69-
expectedEntries := []walEntry[value, felt.Felt, felt.Felt]{
69+
expectedEntries := []WalEntry[value, felt.Felt, felt.Felt]{
7070
{Type: types.MessageTypeProposal, Entry: proposal},
7171
{Type: types.MessageTypePrevote, Entry: prevote},
7272
{Type: types.MessageTypePrecommit, Entry: precommit},
@@ -82,26 +82,26 @@ func TestWALLifecycle(t *testing.T) {
8282
})
8383

8484
t.Run("Commit batch and get entries", func(t *testing.T) {
85-
require.NoError(t, tmState.CommitBatch())
86-
retrieved, err := tmState.GetWALMsgs(testHeight)
85+
require.NoError(t, tmState.Flush())
86+
retrieved, err := tmState.GetWALEntries(testHeight)
8787
require.NoError(t, err)
8888
require.ElementsMatch(t, expectedEntries, retrieved)
8989
})
9090

9191
t.Run("Reload the db and get entries", func(t *testing.T) {
9292
tmState, _ = reopenTestTMDB(t, db, dbPath, testHeight)
93-
retrieved, err := tmState.GetWALMsgs(testHeight)
93+
retrieved, err := tmState.GetWALEntries(testHeight)
9494
require.NoError(t, err)
9595
require.Equal(t, expectedEntries, retrieved)
9696
})
9797

9898
t.Run("Delete entries", func(t *testing.T) {
99-
require.NoError(t, tmState.DeleteWALMsgs(testHeight))
99+
require.NoError(t, tmState.DeleteWALEntries(testHeight))
100100
})
101101

102102
t.Run("Commit batch and get entries (after deletion)", func(t *testing.T) {
103-
require.NoError(t, tmState.CommitBatch())
104-
_, err := tmState.GetWALMsgs(testHeight)
103+
require.NoError(t, tmState.Flush())
104+
_, err := tmState.GetWALEntries(testHeight)
105105
require.NoError(t, err)
106106
})
107107
}

consensus/driver/driver.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ func New[V types.Hashable[H], H types.Hash, A types.Addr](
5454
// The Driver executes these actions (namely broadcasting messages
5555
// and triggering scheduled timeouts).
5656
func (d *Driver[V, H, A]) Start() {
57+
d.stateMachine.ReplayWAL()
58+
5759
d.wg.Add(1)
5860
go func() {
5961
defer d.wg.Done()
@@ -92,16 +94,16 @@ func (d *Driver[V, H, A]) Stop() {
9294
}
9395
}
9496

95-
func (d *Driver[V, H, A]) execute(actions []tendermint.Action[V, H, A]) {
97+
func (d *Driver[V, H, A]) execute(actions []types.Action[V, H, A]) {
9698
for _, action := range actions {
9799
switch action := action.(type) {
98-
case *tendermint.BroadcastProposal[V, H, A]:
100+
case *types.BroadcastProposal[V, H, A]:
99101
d.broadcasters.ProposalBroadcaster.Broadcast(types.Proposal[V, H, A](*action))
100-
case *tendermint.BroadcastPrevote[H, A]:
102+
case *types.BroadcastPrevote[H, A]:
101103
d.broadcasters.PrevoteBroadcaster.Broadcast(types.Prevote[H, A](*action))
102-
case *tendermint.BroadcastPrecommit[H, A]:
104+
case *types.BroadcastPrecommit[H, A]:
103105
d.broadcasters.PrecommitBroadcaster.Broadcast(types.Precommit[H, A](*action))
104-
case *tendermint.ScheduleTimeout:
106+
case *types.ScheduleTimeout:
105107
d.scheduledTms[types.Timeout(*action)] = time.AfterFunc(d.getTimeout(action.Step, action.Round), func() {
106108
select {
107109
case <-d.quit:

consensus/driver/driver_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/NethermindEth/juno/consensus/driver"
1010
"github.com/NethermindEth/juno/consensus/mocks"
1111
"github.com/NethermindEth/juno/consensus/p2p"
12-
"github.com/NethermindEth/juno/consensus/tendermint"
1312
"github.com/NethermindEth/juno/consensus/types"
1413
"github.com/NethermindEth/juno/core/felt"
1514
"github.com/NethermindEth/juno/db/memory"
@@ -130,29 +129,29 @@ func getRandTimeout(random *rand.Rand, step types.Step) types.Timeout {
130129
func generateAndRegisterRandomActions(
131130
random *rand.Rand,
132131
expectedBroadcast *expectedBroadcast,
133-
) []tendermint.Action[value, felt.Felt, felt.Felt] {
134-
actions := make([]tendermint.Action[value, felt.Felt, felt.Felt], actionCount)
132+
) []types.Action[value, felt.Felt, felt.Felt] {
133+
actions := make([]types.Action[value, felt.Felt, felt.Felt], actionCount)
135134
for i := range actionCount {
136135
switch random.Int() % 3 {
137136
case 0:
138137
proposal := getRandProposal(random)
139138
expectedBroadcast.proposals = append(expectedBroadcast.proposals, proposal)
140-
actions[i] = utils.HeapPtr(tendermint.BroadcastProposal[value, felt.Felt, felt.Felt](proposal))
139+
actions[i] = utils.HeapPtr(types.BroadcastProposal[value, felt.Felt, felt.Felt](proposal))
141140
case 1:
142141
prevote := getRandPrevote(random)
143142
expectedBroadcast.prevotes = append(expectedBroadcast.prevotes, prevote)
144-
actions[i] = utils.HeapPtr(tendermint.BroadcastPrevote[felt.Felt, felt.Felt](prevote))
143+
actions[i] = utils.HeapPtr(types.BroadcastPrevote[felt.Felt, felt.Felt](prevote))
145144
case 2:
146145
precommit := getRandPrecommit(random)
147146
expectedBroadcast.precommits = append(expectedBroadcast.precommits, precommit)
148-
actions[i] = utils.HeapPtr(tendermint.BroadcastPrecommit[felt.Felt, felt.Felt](precommit))
147+
actions[i] = utils.HeapPtr(types.BroadcastPrecommit[felt.Felt, felt.Felt](precommit))
149148
}
150149
}
151150
return actions
152151
}
153152

154-
func toAction(timeout types.Timeout) tendermint.Action[value, felt.Felt, felt.Felt] {
155-
return utils.HeapPtr(tendermint.ScheduleTimeout(timeout))
153+
func toAction(timeout types.Timeout) types.Action[value, felt.Felt, felt.Felt] {
154+
return utils.HeapPtr(types.ScheduleTimeout(timeout))
156155
}
157156

158157
func increaseBroadcasterWaitGroup[M types.Message[value, felt.Felt, felt.Felt]](
@@ -189,6 +188,7 @@ func TestDriver(t *testing.T) {
189188
broadcasters := mockBroadcasters()
190189

191190
stateMachine := mocks.NewMockStateMachine[value, felt.Felt, felt.Felt](ctrl)
191+
stateMachine.EXPECT().ReplayWAL().AnyTimes().Return() // ignore WAL replay logic here
192192
driver := driver.New(memory.New(), stateMachine, mockListeners(proposalCh, prevoteCh, precommitCh), broadcasters, mockTimeoutFn)
193193

194194
inputTimeoutProposal := getRandTimeout(random, types.StepPropose)

consensus/integtest/integ_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ import (
55
"time"
66

77
"github.com/NethermindEth/juno/consensus/driver"
8+
"github.com/NethermindEth/juno/consensus/mocks"
89
"github.com/NethermindEth/juno/consensus/tendermint"
910
"github.com/NethermindEth/juno/consensus/types"
11+
"github.com/NethermindEth/juno/core/felt"
1012
"github.com/NethermindEth/juno/db/pebble"
13+
"github.com/NethermindEth/juno/utils"
1114
"github.com/stretchr/testify/assert"
1215
"github.com/stretchr/testify/require"
16+
"go.uber.org/mock/gomock"
1317
)
1418

1519
type testConfig struct {
@@ -35,6 +39,18 @@ func getTimeoutFn(cfg testConfig) func(types.Step, types.Round) time.Duration {
3539
}
3640
}
3741

42+
func newDB(t *testing.T) *mocks.MockTendermintDB[value, felt.Felt, felt.Felt] {
43+
t.Helper()
44+
ctrl := gomock.NewController(t)
45+
// Ignore WAL for tests that use this
46+
db := mocks.NewMockTendermintDB[value, felt.Felt, felt.Felt](ctrl)
47+
db.EXPECT().GetWALEntries(gomock.Any()).AnyTimes()
48+
db.EXPECT().SetWALEntry(gomock.Any()).AnyTimes()
49+
db.EXPECT().Flush().AnyTimes()
50+
db.EXPECT().DeleteWALEntries(gomock.Any()).AnyTimes()
51+
return db
52+
}
53+
3854
func runTest(t *testing.T, cfg testConfig) {
3955
t.Helper()
4056
honestNodeCount := cfg.nodeCount - cfg.faultyNodeCount
@@ -53,6 +69,8 @@ func runTest(t *testing.T, cfg testConfig) {
5369
nodeAddr := &allNodes.addr[i]
5470

5571
stateMachine := tendermint.New(
72+
newDB(t),
73+
utils.NewNopZapLogger(),
5674
*nodeAddr,
5775
&application{},
5876
newBlockchain(commits, nodeAddr),

0 commit comments

Comments
 (0)