Skip to content

Commit 5ec3dbc

Browse files
committed
Allow WAL entries to conflict if they are above commited index, select entry with the highest term
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent 41e5b86 commit 5ec3dbc

File tree

2 files changed

+59
-11
lines changed

2 files changed

+59
-11
lines changed

tests/robustness/report/wal.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"io"
21+
"math"
2122
"os"
2223
"path/filepath"
2324
"reflect"
@@ -62,14 +63,17 @@ func PersistedRequests(lg *zap.Logger, dataDirs []string) ([]model.EtcdRequest,
6263
return nil, errors.New("no data dirs")
6364
}
6465
entriesPersistedInWAL := make([][]raftpb.Entry, len(dataDirs))
66+
var minCommitIndex uint64 = math.MaxUint64
6567
for i, dir := range dataDirs {
66-
_, entries, err := ReadWAL(lg, dir)
68+
state, entries, err := ReadWAL(lg, dir)
6769
if err != nil {
6870
lg.Error("Failed to read WAL", zap.Error(err), zap.String("data-dir", dir))
71+
continue
6972
}
73+
minCommitIndex = min(minCommitIndex, state.Commit)
7074
entriesPersistedInWAL[i] = entries
7175
}
72-
entries, err := mergeMembersEntries(entriesPersistedInWAL)
76+
entries, err := mergeMembersEntries(minCommitIndex, entriesPersistedInWAL)
7377
if err != nil {
7478
return nil, err
7579
}
@@ -89,7 +93,7 @@ func PersistedRequests(lg *zap.Logger, dataDirs []string) ([]model.EtcdRequest,
8993
return persistedRequests, nil
9094
}
9195

92-
func mergeMembersEntries(memberEntries [][]raftpb.Entry) ([]raftpb.Entry, error) {
96+
func mergeMembersEntries(minCommitIndex uint64, memberEntries [][]raftpb.Entry) ([]raftpb.Entry, error) {
9397
for _, entries := range memberEntries {
9498
var lastIndex uint64
9599
for _, e := range entries {
@@ -162,10 +166,20 @@ func mergeMembersEntries(memberEntries [][]raftpb.Entry) ([]raftpb.Entry, error)
162166
if vote != topVotes {
163167
continue
164168
}
165-
if entryWithMostVotes != nil && !reflect.DeepEqual(*entryWithMostVotes, memberEntries[i][memberIndices[i]]) {
166-
return nil, fmt.Errorf("mismatching entries on raft index %d", raftIndex)
169+
entry := memberEntries[i][memberIndices[i]]
170+
if entryWithMostVotes == nil {
171+
entryWithMostVotes = &entry
172+
continue
173+
}
174+
if entryWithMostVotes.Term != entry.Term && entry.Index > minCommitIndex {
175+
if entryWithMostVotes.Term < entry.Term {
176+
entryWithMostVotes = &entry
177+
}
178+
continue
179+
}
180+
if !reflect.DeepEqual(*entryWithMostVotes, entry) {
181+
return nil, fmt.Errorf("mismatching entries on raft index %d, mostVotes: %+v, other: %+v", raftIndex, *entryWithMostVotes, entry)
167182
}
168-
entryWithMostVotes = &memberEntries[i][memberIndices[i]]
169183
}
170184
mergedHistory = append(mergedHistory, *entryWithMostVotes)
171185
}

tests/robustness/report/wal_test.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import (
2424

2525
func TestMergeMemberEntries(t *testing.T) {
2626
tcs := []struct {
27-
name string
28-
memberEntries [][]raftpb.Entry
29-
expectErr string
30-
expectEntries []raftpb.Entry
27+
name string
28+
minCommitIndex uint64
29+
memberEntries [][]raftpb.Entry
30+
expectErr string
31+
expectEntries []raftpb.Entry
3132
}{
3233
{
3334
name: "Error when empty data dir",
@@ -314,10 +315,43 @@ func TestMergeMemberEntries(t *testing.T) {
314315
},
315316
expectErr: "mismatching entries on raft index 1",
316317
},
318+
{
319+
name: "Error if entries mismatch on index before minCommitIndex",
320+
minCommitIndex: 2,
321+
memberEntries: [][]raftpb.Entry{
322+
{
323+
raftpb.Entry{Index: 1, Term: 1, Data: []byte("a")},
324+
raftpb.Entry{Index: 2, Term: 1, Data: []byte("b")},
325+
},
326+
{
327+
raftpb.Entry{Index: 1, Term: 1, Data: []byte("a")},
328+
raftpb.Entry{Index: 2, Term: 2, Data: []byte("c")},
329+
},
330+
},
331+
expectErr: "mismatching entries on raft index 2",
332+
},
333+
{
334+
name: "Select entry with higher term if they conflict on uncommitted index",
335+
minCommitIndex: 1,
336+
memberEntries: [][]raftpb.Entry{
337+
{
338+
raftpb.Entry{Index: 1, Term: 1, Data: []byte("a")},
339+
raftpb.Entry{Index: 2, Term: 1, Data: []byte("b")},
340+
},
341+
{
342+
raftpb.Entry{Index: 1, Term: 1, Data: []byte("a")},
343+
raftpb.Entry{Index: 2, Term: 2, Data: []byte("x")},
344+
},
345+
},
346+
expectEntries: []raftpb.Entry{
347+
{Index: 1, Term: 1, Data: []byte("a")},
348+
{Index: 2, Term: 2, Data: []byte("x")},
349+
},
350+
},
317351
}
318352
for _, tc := range tcs {
319353
t.Run(tc.name, func(t *testing.T) {
320-
entries, err := mergeMembersEntries(tc.memberEntries)
354+
entries, err := mergeMembersEntries(tc.minCommitIndex, tc.memberEntries)
321355
if tc.expectErr == "" {
322356
require.NoError(t, err)
323357
} else {

0 commit comments

Comments
 (0)