Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vanus-labs/vanus/observability/log"
"github.com/vanus-labs/vanus/observability/tracing"
"github.com/vanus-labs/vanus/pkg/cluster"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials/insecure"

// this project.
Expand Down Expand Up @@ -109,6 +110,7 @@ func Connect(endpoints []string) Client {
}
return &client{
Endpoints: endpoints,
tracer: tracing.NewTracer("client.client", trace.SpanKindClient),
}
}

Expand Down
7 changes: 1 addition & 6 deletions client/internal/vanus/eventbus/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package eventbus
import (
// standard libraries.
"context"

// third-party libraries.
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -46,9 +47,6 @@ type NameService struct {
}

func (ns *NameService) LookupWritableLogs(ctx context.Context, eventbusID uint64) ([]*record.Eventlog, error) {
ctx, span := ns.tracer.Start(ctx, "LookupWritableLogs")
defer span.End()

req := &wrapperspb.UInt64Value{
Value: eventbusID,
}
Expand All @@ -62,9 +60,6 @@ func (ns *NameService) LookupWritableLogs(ctx context.Context, eventbusID uint64
}

func (ns *NameService) LookupReadableLogs(ctx context.Context, eventbusID uint64) ([]*record.Eventlog, error) {
ctx, span := ns.tracer.Start(ctx, "LookupReadableLogs")
defer span.End()

req := &wrapperspb.UInt64Value{
Value: eventbusID,
}
Expand Down
6 changes: 0 additions & 6 deletions client/internal/vanus/eventlog/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ type NameService struct {
}

func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64) (*record.Segment, error) {
ctx, span := ns.tracer.Start(ctx, "LookupWritableSegment")
defer span.End()

req := &ctrlpb.GetAppendableSegmentRequest{
EventlogId: logID,
Limited: 1,
Expand All @@ -74,9 +71,6 @@ func (ns *NameService) LookupWritableSegment(ctx context.Context, logID uint64)
}

func (ns *NameService) LookupReadableSegments(ctx context.Context, logID uint64) ([]*record.Segment, error) {
ctx, span := ns.tracer.Start(ctx, "LookupReadableSegments")
defer span.End()

req := &ctrlpb.ListSegmentRequest{
EventlogId: logID,
StartOffset: 0,
Expand Down
4 changes: 1 addition & 3 deletions client/internal/vanus/net/rpc/bare/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ func (c *client) Get(ctx context.Context) (interface{}, error) {
if c.closed.Load() {
return nil, errors.ErrClosed
}
_ctx, span := c.tracer.Start(ctx, "Get")
defer span.End()

if client := c.cachedClient(); client != nil {
return client, nil
}
return c.refreshClient(_ctx, false)
return c.refreshClient(ctx, false)
}

func (c *client) cachedClient() interface{} {
Expand Down
13 changes: 2 additions & 11 deletions client/internal/vanus/store/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ func (s *BlockStore) Close() {
func (s *BlockStore) Read(
ctx context.Context, block uint64, offset int64, size int16, pollingTimeout uint32,
) (*cloudevents.CloudEventBatch, error) {
ctx, span := s.tracer.Start(ctx, "Read")
defer span.End()

req := &segpb.ReadFromBlockRequest{
BlockId: block,
Offset: offset,
Expand All @@ -90,9 +87,6 @@ func (s *BlockStore) Read(
}

func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Time) (int64, error) {
ctx, span := s.tracer.Start(ctx, "LookupOffset")
defer span.End()

req := &segpb.LookupOffsetInBlockRequest{
BlockId: blockID,
Stime: t.UnixMilli(),
Expand All @@ -111,20 +105,17 @@ func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Ti
}

func (s *BlockStore) Append(ctx context.Context, block uint64, events *cloudevents.CloudEventBatch) ([]int64, error) {
_ctx, span := s.tracer.Start(ctx, "Append")
defer span.End()

req := &segpb.AppendToBlockRequest{
BlockId: block,
Events: events,
}

client, err := s.client.Get(_ctx)
client, err := s.client.Get(ctx)
if err != nil {
return nil, err
}

res, err := client.(segpb.SegmentServerClient).AppendToBlock(_ctx, req)
res, err := client.(segpb.SegmentServerClient).AppendToBlock(ctx, req)
if err != nil {
return nil, err
}
Expand Down
36 changes: 6 additions & 30 deletions client/pkg/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,10 @@ func NewEventbus(cfg *eb.Config) *eventbus {
break
}

ctx, span := bus.tracer.Start(context.Background(), "updateWritableLogsTask")
if bus.writableWatcher != nil {
bus.updateWritableLogs(ctx, re)
bus.updateWritableLogs(context.Background(), re)
}

bus.writableWatcher.Wakeup()
span.End()
}
}()
bus.writableWatcher.Start()
Expand All @@ -88,13 +85,10 @@ func NewEventbus(cfg *eb.Config) *eventbus {
break
}

ctx, span := bus.tracer.Start(context.Background(), "updateReadableLogsTask")
if bus.readableWatcher != nil {
bus.updateReadableLogs(ctx, re)
bus.updateReadableLogs(context.Background(), re)
}

bus.readableWatcher.Wakeup()
span.End()
}
}()
bus.readableWatcher.Start()
Expand Down Expand Up @@ -272,9 +266,6 @@ func (b *eventbus) isNeedUpdateWritableLogs(err error) bool {
}

func (b *eventbus) updateWritableLogs(ctx context.Context, re *WritableLogsResult) {
_, span := b.tracer.Start(ctx, "updateWritableLogs")
defer span.End()

if !b.isNeedUpdateWritableLogs(re.Err) {
return
}
Expand Down Expand Up @@ -333,10 +324,7 @@ func (b *eventbus) getWritableLog(ctx context.Context, logID uint64) eventlog.Ev
}

func (b *eventbus) refreshWritableLogs(ctx context.Context) {
_ctx, span := b.tracer.Start(ctx, "refreshWritableLogs")
defer span.End()

_ = b.writableWatcher.Refresh(_ctx)
_ = b.writableWatcher.Refresh(ctx)
}

func (b *eventbus) getReadableState() error {
Expand Down Expand Up @@ -364,9 +352,6 @@ func (b *eventbus) isNeedUpdateReadableLogs(err error) bool {
}

func (b *eventbus) updateReadableLogs(ctx context.Context, re *ReadableLogsResult) {
_, span := b.tracer.Start(ctx, "updateReadableLogs")
defer span.End()

if !b.isNeedUpdateReadableLogs(re.Err) {
return
}
Expand Down Expand Up @@ -425,10 +410,7 @@ func (b *eventbus) getReadableLog(ctx context.Context, logID uint64) eventlog.Ev
}

func (b *eventbus) refreshReadableLogs(ctx context.Context) {
_ctx, span := b.tracer.Start(ctx, "refreshReadableLogs")
defer span.End()

_ = b.readableWatcher.Refresh(_ctx)
_ = b.readableWatcher.Refresh(ctx)
}

type busWriter struct {
Expand Down Expand Up @@ -479,15 +461,12 @@ func (w *busWriter) Bus() api.Eventbus {
}

func (w *busWriter) pickWritableLog(ctx context.Context, opts *api.WriteOptions) (eventlog.LogWriter, error) {
_ctx, span := w.tracer.Start(ctx, "pickWritableLog")
defer span.End()

l, err := opts.Policy.NextLog(ctx)
if err != nil {
return nil, err
}

lw := w.ebus.getWritableLog(_ctx, l.ID())
lw := w.ebus.getWritableLog(ctx, l.ID())
if lw == nil {
return nil, stderrors.New("can not pick writable log")
}
Expand Down Expand Up @@ -554,14 +533,11 @@ func (r *busReader) Bus() api.Eventbus {
}

func (r *busReader) pickReadableLog(ctx context.Context, opts *api.ReadOptions) (eventlog.LogReader, error) {
_ctx, span := r.tracer.Start(ctx, "pickReadableLog")
defer span.End()

l, err := opts.Policy.NextLog(ctx)
if err != nil {
return nil, err
}
lr := r.ebus.getReadableLog(_ctx, l.ID())
lr := r.ebus.getReadableLog(ctx, l.ID())
if lr == nil {
return nil, stderrors.New("can not pick readable log")
}
Expand Down
11 changes: 3 additions & 8 deletions client/pkg/eventlog/eventlog_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,10 @@ func NewEventlog(cfg *el.Config) Eventlog {
break
}

ctx, span := l.tracer.Start(context.Background(), "updateReadableSegmentsTask")
if r != nil {
l.updateWritableSegment(ctx, r)
l.updateWritableSegment(context.Background(), r)
}

l.writableWatcher.Wakeup()
span.End()
}
}()
l.writableWatcher.Start()
Expand All @@ -84,13 +81,11 @@ func NewEventlog(cfg *el.Config) Eventlog {
Msg("eventlog quits readable watcher")
break
}
ctx, span := l.tracer.Start(context.Background(), "updateReadableSegmentsTask")

if rs != nil {
l.updateReadableSegments(ctx, rs)
l.updateReadableSegments(context.Background(), rs)
}

l.readableWatcher.Wakeup()
span.End()
}
}()
l.readableWatcher.Start()
Expand Down
11 changes: 1 addition & 10 deletions client/pkg/eventlog/log_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ func (s *segment) Update(ctx context.Context, r *record.Segment, towrite bool) e
return nil
}

_, span := s.tracer.Start(ctx, "Update")
defer span.End()

switchBlock := func() bool {
if towrite {
if s.prefer.id != r.LeaderBlockID {
Expand All @@ -155,14 +152,11 @@ func (s *segment) Update(ctx context.Context, r *record.Segment, towrite bool) e
}

func (s *segment) Append(ctx context.Context, event *cloudevents.CloudEventBatch) ([]int64, error) {
_ctx, span := s.tracer.Start(ctx, "Append")
defer span.End()

b := s.preferSegmentBlock()
if b == nil {
return nil, errors.ErrNotLeader
}
offs, err := b.Append(_ctx, event)
offs, err := b.Append(ctx, event)
if err != nil {
return nil, err
}
Expand All @@ -176,9 +170,6 @@ func (s *segment) Read(ctx context.Context, from int64, size int16, pollingTimeo
if from < s.startOffset {
return nil, errors.ErrOffsetUnderflow
}
ctx, span := s.tracer.Start(ctx, "Read")
defer span.End()

if eo := s.endOffset.Load(); eo >= 0 {
if from > eo {
return nil, errors.ErrOffsetOverflow
Expand Down
Loading