Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: fixup raftLog initialization #151

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package raft

import (
"fmt"
"log"

pb "go.etcd.io/raft/v3/raftpb"
)
Expand Down Expand Up @@ -72,14 +71,6 @@ func newLog(storage Storage, logger Logger) *raftLog {
// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEncodingSize) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
maxApplyingEntsSize: maxApplyingEntsSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
Expand All @@ -88,15 +79,22 @@ func newLogWithSize(storage Storage, logger Logger, maxApplyingEntsSize entryEnc
if err != nil {
panic(err) // TODO(bdarnell)
}
log.unstable.offset = lastIndex + 1
log.unstable.offsetInProgress = lastIndex + 1
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.applying = firstIndex - 1
log.applied = firstIndex - 1
return &raftLog{
storage: storage,
unstable: unstable{
offset: lastIndex + 1,
offsetInProgress: lastIndex + 1,
logger: logger,
},
maxApplyingEntsSize: maxApplyingEntsSize,

// Initialize our committed and applied pointers to the time of the last compaction.
committed: firstIndex - 1,
applying: firstIndex - 1,
applied: firstIndex - 1,

return log
logger: logger,
}
}

func (l *raftLog) String() string {
Expand Down
47 changes: 14 additions & 33 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,13 +1002,10 @@ func TestCandidateConcede(t *testing.T) {
if g := a.Term; g != 1 {
t.Errorf("term = %d, want %d", g, 1)
}
wantLog := ltoa(&raftLog{
storage: &MemoryStorage{
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
},
unstable: unstable{offset: 3},
committed: 2,
})

wantLog := ltoa(newLog(&MemoryStorage{
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
}, nil))
for i, p := range tt.peers {
if sm, ok := p.(*raft); ok {
l := ltoa(sm.raftLog)
Expand Down Expand Up @@ -1054,11 +1051,7 @@ func TestOldMessages(t *testing.T) {

ents := index(0).terms(0, 1, 2, 3, 3)
ents[4].Data = []byte("somedata")
ilog := &raftLog{
storage: &MemoryStorage{ents: ents},
unstable: unstable{offset: 5},
committed: 4,
}
ilog := newLog(&MemoryStorage{ents: ents}, nil)
base := ltoa(ilog)
for i, p := range tt.peers {
if sm, ok := p.(*raft); ok {
Expand Down Expand Up @@ -1109,12 +1102,9 @@ func TestProposal(t *testing.T) {

wantLog := newLog(NewMemoryStorage(), raftLogger)
if tt.success {
wantLog = &raftLog{
storage: &MemoryStorage{
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
},
unstable: unstable{offset: 3},
}
wantLog = newLog(&MemoryStorage{
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
}, nil)
}
base := ltoa(wantLog)
for i, p := range tt.peers {
Expand Down Expand Up @@ -1147,12 +1137,9 @@ func TestProposalByProxy(t *testing.T) {
// propose via follower
tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})

wantLog := &raftLog{
storage: &MemoryStorage{
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
},
unstable: unstable{offset: 3},
committed: 2}
wantLog := newLog(&MemoryStorage{
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
}, nil)
base := ltoa(wantLog)
for i, p := range tt.peers {
if sm, ok := p.(*raft); ok {
Expand Down Expand Up @@ -1566,10 +1553,7 @@ func testRecvMsgVote(t *testing.T, msgType pb.MessageType) {
sm.step = stepLeader
}
sm.Vote = tt.voteFor
sm.raftLog = &raftLog{
storage: &MemoryStorage{ents: index(0).terms(0, 2, 2)},
unstable: unstable{offset: 3},
}
sm.raftLog = newLog(&MemoryStorage{ents: index(0).terms(0, 2, 2)}, nil)

// raft.Term is greater than or equal to raft.raftLog.lastTerm. In this
// test we're only testing MsgVote responses when the campaigning node
Expand Down Expand Up @@ -2606,10 +2590,7 @@ func TestLeaderAppResp(t *testing.T) {
// sm term is 1 after it becomes the leader.
// thus the last log term must be 1 to be committed.
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{
storage: &MemoryStorage{ents: index(0).terms(0, 1, 1)},
unstable: unstable{offset: 3},
}
sm.raftLog = newLog(&MemoryStorage{ents: index(0).terms(0, 1, 1)}, nil)
sm.becomeCandidate()
sm.becomeLeader()
sm.readMessages()
Expand Down Expand Up @@ -2721,7 +2702,7 @@ func TestRecvMsgBeat(t *testing.T) {

for i, tt := range tests {
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{storage: &MemoryStorage{ents: index(0).terms(0, 1, 1)}}
sm.raftLog = newLog(&MemoryStorage{ents: index(0).terms(0, 1, 1)}, nil)
sm.Term = 1
sm.state = tt.state
switch tt.state {
Expand Down
Loading