Skip to content

Commit 53722e2

Browse files
committed
feat: Optimize the routing logic for segment full
1 parent 8236094 commit 53722e2

File tree

27 files changed

+1537
-728
lines changed

27 files changed

+1537
-728
lines changed

Diff for: client/internal/vanus/eventlog/name_service.go

+24-20
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ type NameService struct {
4747
tracer *tracing.Tracer
4848
}
4949

50-
func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64) (*record.Segment, error) {
50+
func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64) ([]*record.Segment, error) {
5151
ctx, span := ns.tracer.Start(ctx, "LookupWritableSegment")
5252
defer span.End()
5353

5454
req := &ctrlpb.GetAppendableSegmentRequest{
5555
EventLogId: logID,
56-
Limited: 1,
56+
Limited: 2,
5757
}
5858

5959
resp, err := ns.client.GetAppendableSegment(ctx, req)
@@ -65,26 +65,28 @@ func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64)
6565
if len(segments) == 0 {
6666
return nil, errors.ErrNotWritable
6767
}
68-
return segments[0], nil
68+
return segments, nil
6969
}
7070

7171
func (ns *NameService) LookupReadableSegments(ctx context.Context, logID uint64) ([]*record.Segment, error) {
7272
ctx, span := ns.tracer.Start(ctx, "LookupReadableSegments")
7373
defer span.End()
7474

75-
req := &ctrlpb.ListSegmentRequest{
76-
EventLogId: logID,
77-
StartOffset: 0,
78-
EndOffset: math.MaxInt64,
79-
Limited: math.MaxInt32,
75+
req := &ctrlpb.GetReadableSegmentRequest{
76+
EventLogId: logID,
77+
Limited: math.MaxInt32,
8078
}
8179

82-
resp, err := ns.client.ListSegment(ctx, req)
80+
resp, err := ns.client.GetReadableSegment(ctx, req)
8381
if err != nil {
8482
return nil, err
8583
}
8684

8785
segments := toSegments(resp.GetSegments())
86+
if len(segments) == 0 {
87+
return nil, errors.ErrNotReadable
88+
}
89+
8890
return segments, nil
8991
}
9092

@@ -97,9 +99,9 @@ func toSegments(pbs []*metapb.Segment) []*record.Segment {
9799
segment := toSegment(pb)
98100
segments = append(segments, segment)
99101
// only return first working segment
100-
if segment.Writable {
101-
break
102-
}
102+
// if segment.Writable {
103+
// break
104+
// }
103105
}
104106
return segments
105107
}
@@ -113,13 +115,15 @@ func toSegment(segment *metapb.Segment) *record.Segment {
113115
}
114116
}
115117
return &record.Segment{
116-
ID: segment.GetId(),
117-
StartOffset: segment.GetStartOffsetInLog(),
118-
EndOffset: segment.GetEndOffsetInLog(),
119-
FirstEventBornAt: time.UnixMilli(segment.FirstEventBornAtByUnixMs),
120-
LastEventBornAt: time.UnixMilli(segment.LastEvnetBornAtByUnixMs),
121-
Writable: segment.State == "working", // TODO: writable
122-
Blocks: blocks,
123-
LeaderBlockID: segment.GetLeaderBlockId(),
118+
ID: segment.GetId(),
119+
PreviousSegmentId: segment.GetPreviousSegmentId(),
120+
NextSegmentId: segment.GetNextSegmentId(),
121+
StartOffset: segment.GetStartOffsetInLog(),
122+
EndOffset: segment.GetEndOffsetInLog(),
123+
FirstEventBornAt: time.UnixMilli(segment.FirstEventBornAtByUnixMs),
124+
LastEventBornAt: time.UnixMilli(segment.LastEvnetBornAtByUnixMs),
125+
Writable: segment.State == "working", // TODO: writable
126+
Blocks: blocks,
127+
LeaderBlockID: segment.GetLeaderBlockId(),
124128
}
125129
}

Diff for: client/pkg/api/client.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ type Eventbus interface {
3131
}
3232

3333
type BusWriter interface {
34-
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (eid string, err error)
35-
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (eid string, err error)
34+
AppendOne(ctx context.Context, event *ce.Event, opts ...WriteOption) (string, error)
35+
AppendMany(ctx context.Context, events []*ce.Event, opts ...WriteOption) (string, error)
3636
}
3737

3838
type BusReader interface {

Diff for: client/pkg/eventbus/eventbus.go

+21-29
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ package eventbus
1717
import (
1818
// standard libraries.
1919
"context"
20-
"encoding/base64"
21-
"encoding/binary"
2220
stderrors "errors"
2321
"io"
2422
"strings"
@@ -37,7 +35,7 @@ import (
3735
"github.com/linkall-labs/vanus/client/pkg/errors"
3836
"github.com/linkall-labs/vanus/client/pkg/eventlog"
3937
"github.com/linkall-labs/vanus/client/pkg/policy"
40-
vlog "github.com/linkall-labs/vanus/observability/log"
38+
"github.com/linkall-labs/vanus/observability/log"
4139

4240
eb "github.com/linkall-labs/vanus/client/internal/vanus/eventbus"
4341
el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog"
@@ -66,7 +64,7 @@ func NewEventbus(cfg *eb.Config) *eventbus {
6664
for {
6765
re, ok := <-ch
6866
if !ok {
69-
vlog.Debug(context.Background(), "eventbus quits writable watcher", map[string]interface{}{
67+
log.Debug(context.Background(), "eventbus quits writable watcher", map[string]interface{}{
7068
"eventbus": bus.cfg.Name,
7169
})
7270
break
@@ -88,7 +86,7 @@ func NewEventbus(cfg *eb.Config) *eventbus {
8886
for {
8987
re, ok := <-ch
9088
if !ok {
91-
vlog.Debug(context.Background(), "eventbus quits readable watcher", map[string]interface{}{
89+
log.Debug(context.Background(), "eventbus quits readable watcher", map[string]interface{}{
9290
"eventbus": bus.cfg.Name,
9391
})
9492
break
@@ -187,16 +185,16 @@ func (b *eventbus) GetLog(ctx context.Context, logID uint64, opts ...api.LogOpti
187185
if len(b.readableLogs) == 0 {
188186
b.refreshReadableLogs(ctx)
189187
}
190-
if log, ok := b.readableLogs[logID]; ok {
191-
return log, nil
188+
if l, ok := b.readableLogs[logID]; ok {
189+
return l, nil
192190
}
193191
return nil, errors.ErrNotFound
194192
} else if op.Policy.AccessMode() == api.ReadWrite {
195193
if len(b.writableLogs) == 0 {
196194
b.refreshWritableLogs(ctx)
197195
}
198-
if log, ok := b.writableLogs[logID]; ok {
199-
return log, nil
196+
if l, ok := b.writableLogs[logID]; ok {
197+
return l, nil
200198
}
201199
return nil, errors.ErrNotFound
202200
} else {
@@ -312,8 +310,8 @@ func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResul
312310
Endpoints: b.cfg.Endpoints,
313311
ID: logID,
314312
}
315-
log := eventlog.NewEventLog(cfg)
316-
lws[logID] = log
313+
l := eventlog.NewEventLog(cfg)
314+
lws[logID] = l
317315
return true
318316
})
319317
b.setWritableLogs(s, lws)
@@ -407,8 +405,8 @@ func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResul
407405
Endpoints: b.cfg.Endpoints,
408406
ID: logID,
409407
}
410-
log := eventlog.NewEventLog(cfg)
411-
lws[logID] = log
408+
l := eventlog.NewEventLog(cfg)
409+
lws[logID] = l
412410
return true
413411
})
414412
b.setReadableLogs(s, lws)
@@ -451,7 +449,7 @@ type busWriter struct {
451449

452450
var _ api.BusWriter = (*busWriter)(nil)
453451

454-
func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.WriteOption) (eid string, err error) {
452+
func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.WriteOption) (string, error) {
455453
_ctx, span := w.tracer.Start(ctx, "AppendOne")
456454
defer span.End()
457455

@@ -470,21 +468,15 @@ func (w *busWriter) AppendOne(ctx context.Context, event *ce.Event, opts ...api.
470468
}
471469

472470
// 2. append the event to the eventlog
473-
off, err := lw.Append(_ctx, event)
471+
eid, err := lw.Append(_ctx, event)
474472
if err != nil {
475473
return "", err
476474
}
477475

478-
// 3. generate event ID
479-
var buf [16]byte
480-
binary.BigEndian.PutUint64(buf[0:8], lw.Log().ID())
481-
binary.BigEndian.PutUint64(buf[8:16], uint64(off))
482-
encoded := base64.StdEncoding.EncodeToString(buf[:])
483-
484-
return encoded, nil
476+
return eid, nil
485477
}
486478

487-
func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) (eid string, err error) {
479+
func (w *busWriter) AppendMany(ctx context.Context, events []*ce.Event, opts ...api.WriteOption) (string, error) {
488480
// TODO(jiangkai): implement this method, by jiangkai, 2022.10.24
489481
return "", nil
490482
}
@@ -497,17 +489,17 @@ func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions)
497489
_ctx, span := w.tracer.Start(ctx, "pickWritableLog")
498490
defer span.End()
499491

500-
log, err := opts.Policy.NextLog(ctx)
492+
l, err := opts.Policy.NextLog(ctx)
501493
if err != nil {
502494
return nil, err
503495
}
504496

505-
l := w.ebus.getWritableLog(_ctx, log.ID())
506-
if l == nil {
497+
lw := w.ebus.getWritableLog(_ctx, l.ID())
498+
if lw == nil {
507499
return nil, stderrors.New("can not pick writable log")
508500
}
509501

510-
return l.Writer(), nil
502+
return lw.Writer(), nil
511503
}
512504

513505
type busReader struct {
@@ -558,11 +550,11 @@ func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions)
558550
_ctx, span := r.tracer.Start(ctx, "pickReadableLog")
559551
defer span.End()
560552

561-
log, err := opts.Policy.NextLog(ctx)
553+
l, err := opts.Policy.NextLog(ctx)
562554
if err != nil {
563555
return nil, err
564556
}
565-
lr := r.ebus.getReadableLog(_ctx, log.ID())
557+
lr := r.ebus.getReadableLog(_ctx, l.ID())
566558
if lr == nil {
567559
return nil, stderrors.New("can not pick readable log")
568560
}

Diff for: client/pkg/eventlog/eventlog.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type LogWriter interface {
4949

5050
Close(ctx context.Context)
5151

52-
Append(ctx context.Context, event *ce.Event) (off int64, err error)
52+
Append(ctx context.Context, event *ce.Event) (eid string, err error)
5353
}
5454

5555
type LogReader interface {

0 commit comments

Comments
 (0)