Skip to content

Commit 7e5318d

Browse files
committed
feat(store): raft support propose callback
Signed-off-by: James Yin <[email protected]>
1 parent 7abde9a commit 7e5318d

File tree

12 files changed

+426
-311
lines changed

12 files changed

+426
-311
lines changed

internal/store/block/block.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,10 @@ type Reader interface {
4444
Read(ctx context.Context, seq int64, num int) ([]Entry, error)
4545
}
4646

47+
type AppendCallback = func(seqs []int64, err error)
48+
4749
type Appender interface {
48-
Append(ctx context.Context, entries ...Entry) ([]int64, error)
50+
Append(ctx context.Context, entries []Entry, cb AppendCallback)
4951
}
5052

5153
type Block interface {

internal/store/block/raft/appender.go

Lines changed: 36 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"time"
2525

2626
// third-party libraries.
27-
"go.opentelemetry.io/otel/attribute"
2827
"go.opentelemetry.io/otel/trace"
2928

3029
// first-party libraries.
@@ -67,11 +66,6 @@ type peer struct {
6766

6867
type LeaderChangedListener func(block, leader vanus.ID, term uint64)
6968

70-
type commitWaiter struct {
71-
offset int64
72-
c chan struct{}
73-
}
74-
7569
type Appender interface {
7670
block.Appender
7771

@@ -86,11 +80,6 @@ type appender struct {
8680
actx block.AppendContext
8781
appendMu sync.RWMutex
8882

89-
waiters []commitWaiter
90-
commitIndex uint64
91-
commitOffset int64
92-
waitMu sync.Mutex
93-
9483
leaderID vanus.ID
9584
listener LeaderChangedListener
9685

@@ -116,7 +105,6 @@ func NewAppender(
116105

117106
a := &appender{
118107
raw: raw,
119-
waiters: make([]commitWaiter, 0),
120108
listener: listener,
121109
log: raftLog,
122110
host: host,
@@ -126,7 +114,6 @@ func NewAppender(
126114
tracer: tracing.NewTracer("store.block.raft.appender", trace.SpanKindInternal),
127115
}
128116
a.actx = a.raw.NewAppendContext(nil)
129-
a.commitOffset = a.actx.WriteOffset()
130117

131118
a.log.SetSnapshotOperator(a)
132119
a.host.Register(a.ID().Uint64(), a)
@@ -145,9 +132,6 @@ func NewAppender(
145132
}
146133
a.node = raft.RestartNode(c)
147134

148-
// Access Commit after raft.RestartNode to ensure raft state is initialized.
149-
a.commitIndex = a.log.HardState().Commit
150-
151135
go a.run(ctx)
152136

153137
return a
@@ -207,13 +191,6 @@ func (a *appender) run(ctx context.Context) {
207191
case rd := <-a.node.Ready():
208192
rCtx, span := a.tracer.Start(ctx, "RaftReady", trace.WithNewRoot())
209193

210-
var partial bool
211-
stateChanged := !raft.IsEmptyHardState(rd.HardState)
212-
if stateChanged {
213-
// Wake up fast before writing logs.
214-
partial = a.wakeup(rCtx, rd.HardState.Commit)
215-
}
216-
217194
if len(rd.Entries) != 0 {
218195
log.Debug(rCtx, "Append entries to raft log.", map[string]interface{}{
219196
"node_id": a.ID(),
@@ -238,11 +215,7 @@ func (a *appender) run(ctx context.Context) {
238215
})
239216
}
240217

241-
if stateChanged {
242-
// Wake up after writing logs.
243-
if partial {
244-
_ = a.wakeup(rCtx, rd.HardState.Commit)
245-
}
218+
if !raft.IsEmptyHardState(rd.HardState) {
246219
log.Debug(rCtx, "Persist raft hard state.", map[string]interface{}{
247220
"node_id": a.ID(),
248221
"hard_state": rd.HardState,
@@ -336,40 +309,6 @@ func (a *appender) applyEntries(ctx context.Context, committedEntries []raftpb.E
336309
return committedEntries[num-1].Index
337310
}
338311

339-
// wakeup wakes up append requests to the smaller of the committed or last index.
340-
func (a *appender) wakeup(ctx context.Context, commit uint64) (partial bool) {
341-
_, span := a.tracer.Start(ctx, "wakeup", trace.WithAttributes(
342-
attribute.Int64("commit", int64(commit))))
343-
defer span.End()
344-
345-
li, _ := a.log.LastIndex()
346-
if commit > li {
347-
commit = li
348-
partial = true
349-
}
350-
351-
if commit <= a.commitIndex {
352-
return
353-
}
354-
a.commitIndex = commit
355-
356-
for off := commit; off > 0; off-- {
357-
pbEntries, err := a.log.Entries(off, off+1, 0)
358-
if err != nil {
359-
return
360-
}
361-
362-
pbEntry := pbEntries[0]
363-
if pbEntry.Type == raftpb.EntryNormal && len(pbEntry.Data) > 0 {
364-
frag := block.NewFragment(pbEntry.Data)
365-
a.doWakeup(ctx, frag.EndOffset())
366-
return
367-
}
368-
}
369-
370-
return partial
371-
}
372-
373312
func (a *appender) becomeLeader(ctx context.Context) {
374313
ctx, span := a.tracer.Start(ctx, "becomeLeader")
375314
defer span.End()
@@ -459,65 +398,65 @@ func (a *appender) reset(ctx context.Context) {
459398
}
460399

461400
// Append implements block.raw.
462-
func (a *appender) Append(ctx context.Context, entries ...block.Entry) ([]int64, error) {
401+
func (a *appender) Append(ctx context.Context, entries []block.Entry, cb block.AppendCallback) {
463402
ctx, span := a.tracer.Start(ctx, "Append")
464403
defer span.End()
465404

466-
seqs, offset, err := a.append(ctx, entries)
467-
if err != nil {
468-
if errors.Is(err, errors.ErrSegmentFull) {
469-
_ = a.waitCommit(ctx, offset)
470-
}
471-
return nil, err
472-
}
473-
474-
// Wait until entries is committed.
475-
err = a.waitCommit(ctx, offset)
476-
if err != nil {
477-
return nil, err
478-
}
479-
480-
return seqs, nil
481-
}
482-
483-
func (a *appender) append(ctx context.Context, entries []block.Entry) ([]int64, int64, error) {
484-
ctx, span := a.tracer.Start(ctx, "append")
485-
defer span.End()
486-
487405
span.AddEvent("Acquiring append lock")
488406
a.appendMu.Lock()
489407
span.AddEvent("Got append lock")
490408

491-
defer a.appendMu.Unlock()
492-
493409
if !a.isLeader() {
494-
return nil, 0, errors.ErrNotLeader
410+
a.appendMu.Unlock()
411+
cb(nil, errors.ErrNotLeader)
412+
return
495413
}
496414

497415
if a.actx.Archived() {
498-
return nil, a.actx.WriteOffset(), errors.ErrSegmentFull
416+
a.appendMu.Unlock()
417+
cb(nil, errors.ErrSegmentFull)
418+
return
499419
}
500420

501421
seqs, frag, enough, err := a.raw.PrepareAppend(ctx, a.actx, entries...)
502422
if err != nil {
503-
return nil, 0, err
423+
a.appendMu.Unlock()
424+
cb(nil, err)
425+
return
504426
}
505-
off := a.actx.WriteOffset()
506427

507428
data, _ := block.MarshalFragment(ctx, frag)
508-
if err = a.node.Propose(ctx, data); err != nil {
509-
return nil, 0, err
510-
}
511429

430+
var pds []raft.ProposeData
512431
if enough {
513-
if frag, err = a.raw.PrepareArchive(ctx, a.actx); err == nil {
514-
data, _ := block.MarshalFragment(ctx, frag)
515-
_ = a.node.Propose(ctx, data)
432+
if frag, err := a.raw.PrepareArchive(ctx, a.actx); err == nil {
433+
archivedData, _ := block.MarshalFragment(ctx, frag)
434+
pds = make([]raft.ProposeData, 2)
516435
// FIXME(james.yin): revert archived if propose failed.
436+
pds[1] = raft.ProposeData{
437+
Data: archivedData,
438+
}
439+
} else {
440+
pds = make([]raft.ProposeData, 1)
517441
}
442+
} else {
443+
pds = make([]raft.ProposeData, 1)
518444
}
519445

520-
return seqs, off, nil
446+
pds[0] = raft.ProposeData{
447+
Data: data,
448+
Callback: func(err error) {
449+
if err != nil {
450+
cb(nil, err)
451+
} else {
452+
cb(seqs, nil)
453+
}
454+
},
455+
}
456+
457+
a.node.Propose(ctx, pds...)
458+
459+
a.appendMu.Unlock()
521460
}
522461

523462
func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
@@ -527,57 +466,6 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
527466
_, _ = a.raw.CommitAppend(ctx, frags...)
528467
}
529468

530-
func (a *appender) waitCommit(ctx context.Context, offset int64) error {
531-
ctx, span := a.tracer.Start(ctx, "waitCommit")
532-
defer span.End()
533-
534-
span.AddEvent("Acquiring wait lock")
535-
a.waitMu.Lock()
536-
span.AddEvent("Got wait lock")
537-
538-
if offset <= a.commitOffset {
539-
a.waitMu.Unlock()
540-
return nil
541-
}
542-
543-
ch := make(chan struct{})
544-
a.waiters = append(a.waiters, commitWaiter{
545-
offset: offset,
546-
c: ch,
547-
})
548-
549-
a.waitMu.Unlock()
550-
551-
// FIXME(james.yin): lost leader
552-
select {
553-
case <-ch:
554-
return nil
555-
case <-ctx.Done():
556-
return ctx.Err()
557-
}
558-
}
559-
560-
func (a *appender) doWakeup(ctx context.Context, commit int64) {
561-
_, span := a.tracer.Start(ctx, "doWakeup")
562-
defer span.End()
563-
564-
span.AddEvent("Acquiring wait lock")
565-
a.waitMu.Lock()
566-
span.AddEvent("Got wait lock")
567-
568-
defer a.waitMu.Unlock()
569-
570-
for len(a.waiters) != 0 {
571-
waiter := a.waiters[0]
572-
if waiter.offset > commit {
573-
break
574-
}
575-
close(waiter.c)
576-
a.waiters = a.waiters[1:]
577-
}
578-
a.commitOffset = commit
579-
}
580-
581469
func (a *appender) Status() ClusterStatus {
582470
leader, term := a.leaderInfo()
583471
return ClusterStatus{

internal/store/segment/mock_replica.go

Lines changed: 4 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/store/segment/mock_server.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/store/segment/replica.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ func (r *replica) Read(ctx context.Context, seq int64, num int) ([]block.Entry,
8080
return r.raw.Read(ctx, seq, num)
8181
}
8282

83-
func (r *replica) Append(ctx context.Context, entries ...block.Entry) ([]int64, error) {
84-
return r.appender.Append(ctx, entries...)
83+
func (r *replica) Append(ctx context.Context, entries []block.Entry, cb block.AppendCallback) {
84+
r.appender.Append(ctx, entries, cb)
8585
}
8686

8787
func (r *replica) Status() *metapb.SegmentHealthInfo {

0 commit comments

Comments
 (0)