Skip to content

Commit

Permalink
Use map for accumulating errors
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Aug 2, 2023
1 parent 467f55f commit 2f7289b
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions pkg/agent/core/ngt/handler/grpc/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/vdaas/vald/internal/net/grpc/errdetails"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/slices"
)

func (s *server) Exists(ctx context.Context, uid *payload.Object_ID) (res *payload.Object_ID, err error) {
Expand Down Expand Up @@ -211,8 +210,9 @@ func (s *server) StreamListObject(_ *payload.Object_List_Request, stream vald.Ob

var (
mu sync.Mutex
emu sync.Mutex
emu sync.RWMutex
errs = make([]error, 0, s.ngt.Len())
emap = make(map[string]struct{})
)
s.ngt.ListObjectFunc(ctx, func(uuid string, _ uint32, _ int64) bool {
vec, ts, err := s.ngt.GetObject(uuid)
Expand Down Expand Up @@ -241,27 +241,34 @@ func (s *server) StreamListObject(_ *payload.Object_List_Request, stream vald.Ob
mu.Unlock()

if err != nil {
emu.Lock()
errs = append(errs, err)
emu.Unlock()
emu.RLock()
_, ok := emap[err.Error()]
emu.RUnlock()
if !ok {
emu.Lock()
errs = append(errs, err)
emap[err.Error()] = struct{}{}
emu.Unlock()
}
}

// always return true to continue streaming and let the context cancel the Range when stream closes.
return true
})

if len(errs) != 0 {
slices.RemoveDuplicates(errs, func(left, right error) bool {
return left.Error() < right.Error()
})
err = errors.Join(errs...)
st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamListObject final gRPC error response")
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, msg)
// Register all the gRPC codes to the span. Doing this because ParseError cannot parse joined error.
for _, e := range errs {
st, msg, err := status.ParseError(e, codes.Internal, "failed to parse StreamListObject final gRPC error response")
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)
span.SetStatus(trace.StatusError, msg)
}

Check warning on line 267 in pkg/agent/core/ngt/handler/grpc/object.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/object.go#L264-L267

Added lines #L264 - L267 were not covered by tests
}
return err

// now join all the errors to return
return errors.Join(errs...)
}
return nil
}

0 comments on commit 2f7289b

Please sign in to comment.