From 2f7289b0c199d9f928b8058cafb82eb35945d926 Mon Sep 17 00:00:00 2001 From: ykadowak Date: Wed, 2 Aug 2023 06:44:31 +0000 Subject: [PATCH] Use map for accumulating errors --- pkg/agent/core/ngt/handler/grpc/object.go | 37 ++++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/pkg/agent/core/ngt/handler/grpc/object.go b/pkg/agent/core/ngt/handler/grpc/object.go index 86c4a9223e..b7057dffc3 100644 --- a/pkg/agent/core/ngt/handler/grpc/object.go +++ b/pkg/agent/core/ngt/handler/grpc/object.go @@ -28,7 +28,6 @@ import ( "github.com/vdaas/vald/internal/net/grpc/errdetails" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" - "github.com/vdaas/vald/internal/slices" ) func (s *server) Exists(ctx context.Context, uid *payload.Object_ID) (res *payload.Object_ID, err error) { @@ -211,8 +210,9 @@ func (s *server) StreamListObject(_ *payload.Object_List_Request, stream vald.Ob var ( mu sync.Mutex - emu sync.Mutex + emu sync.RWMutex errs = make([]error, 0, s.ngt.Len()) + emap = make(map[string]struct{}) ) s.ngt.ListObjectFunc(ctx, func(uuid string, _ uint32, _ int64) bool { vec, ts, err := s.ngt.GetObject(uuid) @@ -241,9 +241,15 @@ func (s *server) StreamListObject(_ *payload.Object_List_Request, stream vald.Ob mu.Unlock() if err != nil { - emu.Lock() - errs = append(errs, err) - emu.Unlock() + emu.RLock() + _, ok := emap[err.Error()] + emu.RUnlock() + if !ok { + emu.Lock() + errs = append(errs, err) + emap[err.Error()] = struct{}{} + emu.Unlock() + } } // always return true to continue streaming and let the context cancel the Range when stream closes. @@ -251,17 +257,18 @@ func (s *server) StreamListObject(_ *payload.Object_List_Request, stream vald.Ob }) if len(errs) != 0 { - slices.RemoveDuplicates(errs, func(left, right error) bool { - return left.Error() < right.Error() - }) - err = errors.Join(errs...) - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamListObject final gRPC error response") - if span != nil { - span.RecordError(err) - span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) - span.SetStatus(trace.StatusError, msg) + // Register all the gRPC codes to the span. Doing this because ParseError cannot parse joined error. + for _, e := range errs { + st, msg, err := status.ParseError(e, codes.Internal, "failed to parse StreamListObject final gRPC error response") + if span != nil { + span.RecordError(err) + span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) + span.SetStatus(trace.StatusError, msg) + } } - return err + + // now join all the errors to return + return errors.Join(errs...) } return nil }