From 41f84aeda225843e88b8d3259b849513d0ffb1ae Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 19 Sep 2024 18:40:08 +0900 Subject: [PATCH 1/4] fix: add option to disable dns resolve Signed-off-by: hlts2 --- apis/grpc/v1/payload/payload.pb.go | 1 + .../v1/rpc/errdetails/error_details.pb.go | 1 + internal/net/grpc/client.go | 54 ++++++++++++------- pkg/gateway/mirror/service/gateway.go | 13 ++++- pkg/gateway/mirror/service/mirror.go | 1 + 5 files changed, 49 insertions(+), 21 deletions(-) diff --git a/apis/grpc/v1/payload/payload.pb.go b/apis/grpc/v1/payload/payload.pb.go index bf541cb5cd..6ea1603db4 100644 --- a/apis/grpc/v1/payload/payload.pb.go +++ b/apis/grpc/v1/payload/payload.pb.go @@ -6960,6 +6960,7 @@ var ( (*anypb.Any)(nil), // 105: google.protobuf.Any } ) + var file_v1_payload_payload_proto_depIdxs = []int32{ 22, // 0: payload.v1.Search.Request.config:type_name -> payload.v1.Search.Config 16, // 1: payload.v1.Search.MultiRequest.requests:type_name -> payload.v1.Search.Request diff --git a/apis/grpc/v1/rpc/errdetails/error_details.pb.go b/apis/grpc/v1/rpc/errdetails/error_details.pb.go index 1ca92f9554..ff9e1c42cf 100644 --- a/apis/grpc/v1/rpc/errdetails/error_details.pb.go +++ b/apis/grpc/v1/rpc/errdetails/error_details.pb.go @@ -1108,6 +1108,7 @@ var ( (*durationpb.Duration)(nil), // 15: google.protobuf.Duration } ) + var file_v1_rpc_errdetails_error_details_proto_depIdxs = []int32{ 10, // 0: rpc.v1.ErrorInfo.metadata:type_name -> rpc.v1.ErrorInfo.MetadataEntry 15, // 1: rpc.v1.RetryInfo.retry_delay:type_name -> google.protobuf.Duration diff --git a/internal/net/grpc/client.go b/internal/net/grpc/client.go index b6a4f3eceb..de6b1ab285 100644 --- a/internal/net/grpc/client.go +++ b/internal/net/grpc/client.go @@ -87,30 +87,32 @@ type Client interface { GetDialOption() []DialOption GetCallOption() []CallOption GetBackoff() backoff.Backoff + SetDisableResolveDNSAddr(addr string, disabled bool) ConnectedAddrs() []string Close(ctx context.Context) error } type gRPCClient struct { - addrs map[string]struct{} - poolSize uint64 - clientCount uint64 - conns sync.Map[string, pool.Conn] - hcDur time.Duration - prDur time.Duration - dialer net.Dialer - enablePoolRebalance bool - resolveDNS bool - dopts []DialOption - copts []CallOption - roccd string // reconnection old connection closing duration - eg errgroup.Group - bo backoff.Backoff - cb circuitbreaker.CircuitBreaker - gbo gbackoff.Config // grpc's original backoff configuration - mcd time.Duration // minimum connection timeout duration - group singleflight.Group[pool.Conn] - crl sync.Map[string, bool] // connection request list + addrs map[string]struct{} + poolSize uint64 + clientCount uint64 + conns sync.Map[string, pool.Conn] + hcDur time.Duration + prDur time.Duration + dialer net.Dialer + enablePoolRebalance bool + disableResolveDNSAddrs sync.Map[string, bool] + resolveDNS bool + dopts []DialOption + copts []CallOption + roccd string // reconnection old connection closing duration + eg errgroup.Group + bo backoff.Backoff + cb circuitbreaker.CircuitBreaker + gbo gbackoff.Config // grpc's original backoff configuration + mcd time.Duration // minimum connection timeout duration + group singleflight.Group[pool.Conn] + crl sync.Map[string, bool] // connection request list ech <-chan error monitorRunning atomic.Bool @@ -946,6 +948,12 @@ func (g *gRPCClient) GetBackoff() backoff.Backoff { return g.bo } +func (g *gRPCClient) SetDisableResolveDNSAddr(addr string, disabled bool) { + // NOTE: When connecting to multiple locations, it was necessary to switch dynamically, so implementation was added. + // There is no setting for disable on the helm chart side, so I used this implementation. + g.disableResolveDNSAddrs.Store(addr, disabled) +} + func (g *gRPCClient) Connect( ctx context.Context, addr string, dopts ...DialOption, ) (conn pool.Conn, err error) { @@ -975,7 +983,13 @@ func (g *gRPCClient) Connect( pool.WithAddr(addr), pool.WithSize(g.poolSize), pool.WithDialOptions(append(g.dopts, dopts...)...), - pool.WithResolveDNS(g.resolveDNS), + pool.WithResolveDNS(func() bool { + disabled, ok := g.disableResolveDNSAddrs.Load(addr) + if ok && disabled { + return false + } + return g.resolveDNS + }()), } if g.bo != nil { opts = append(opts, pool.WithBackoff(g.bo)) diff --git a/pkg/gateway/mirror/service/gateway.go b/pkg/gateway/mirror/service/gateway.go index 674f7b93aa..2e4ed97974 100644 --- a/pkg/gateway/mirror/service/gateway.go +++ b/pkg/gateway/mirror/service/gateway.go @@ -16,6 +16,7 @@ package service import ( "context" "reflect" + "strings" "github.com/vdaas/vald/internal/client/v1/client/mirror" "github.com/vdaas/vald/internal/errors" @@ -143,11 +144,21 @@ func (g *gateway) Do( if target == "" { return nil, errors.ErrTargetNotFound } - return g.client.GRPCClient().Do(g.ForwardedContext(ctx, g.podName), target, + fctx := g.ForwardedContext(ctx, g.podName) + res, err = g.client.GRPCClient().Do(fctx, target, func(ictx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (any, error) { return f(ictx, target, NewMirrorClient(conn), copts...) }, ) + if err != nil { + return g.client.GRPCClient().RoundRobin(fctx, func(ictx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (any, error) { + if strings.EqualFold(conn.Target(), target) { + return nil, errors.ErrTargetNotFound + } + return f(ictx, conn.Target(), NewMirrorClient(conn), copts...) + }) + } + return res, nil } // DoMulti performs a gRPC operation on multiple targets using the provided function. diff --git a/pkg/gateway/mirror/service/mirror.go b/pkg/gateway/mirror/service/mirror.go index 5a2e68c920..ce4309b972 100644 --- a/pkg/gateway/mirror/service/mirror.go +++ b/pkg/gateway/mirror/service/mirror.go @@ -319,6 +319,7 @@ func (m *mirr) Connect(ctx context.Context, targets ...*payload.Mirror_Target) e if !m.isSelfMirrorAddr(addr) && !m.isGatewayAddr(addr) { _, ok := m.addrs.Load(addr) if !ok || !m.IsConnected(ctx, addr) { + m.gateway.GRPCClient().SetDisableResolveDNSAddr(addr, true) _, err := m.gateway.GRPCClient().Connect(ctx, addr) if err != nil { m.addrs.Delete(addr) From 202cd0806afc2caec127e978336d13d2c0c780cd Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 19 Sep 2024 19:31:37 +0900 Subject: [PATCH 2/4] fix: add mock function Signed-off-by: hlts2 --- internal/test/mock/grpc/grpc_client_mock.go | 14 ++++++++++---- pkg/gateway/mirror/service/mirror_test.go | 6 ++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/internal/test/mock/grpc/grpc_client_mock.go b/internal/test/mock/grpc/grpc_client_mock.go index e29fc50c5d..a8c7d43f4d 100644 --- a/internal/test/mock/grpc/grpc_client_mock.go +++ b/internal/test/mock/grpc/grpc_client_mock.go @@ -30,10 +30,11 @@ type GRPCClientMock struct { addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error) error - ConnectFunc func(ctx context.Context, addr string, dopts ...grpc.DialOption) (pool.Conn, error) - DisconnectFunc func(ctx context.Context, addr string) error - IsConnectedFunc func(ctx context.Context, addr string) bool - ConnectedAddrsFunc func() []string + ConnectFunc func(ctx context.Context, addr string, dopts ...grpc.DialOption) (pool.Conn, error) + DisconnectFunc func(ctx context.Context, addr string) error + IsConnectedFunc func(ctx context.Context, addr string) bool + ConnectedAddrsFunc func() []string + SetDisableResolveDNSAddrFunc func(addr string, disabled bool) } // OrderedRangeConcurrent calls the OrderedRangeConcurrentFunc object. @@ -70,3 +71,8 @@ func (gc *GRPCClientMock) Disconnect(ctx context.Context, addr string) error { func (gc *GRPCClientMock) IsConnected(ctx context.Context, addr string) bool { return gc.IsConnectedFunc(ctx, addr) } + +// SetDisableResolveDNSAddr calls the SetDisableResolveDNSAddr object. +func (gc *GRPCClientMock) SetDisableResolveDNSAddr(addr string, disabled bool) { + gc.SetDisableResolveDNSAddrFunc(addr, disabled) +} diff --git a/pkg/gateway/mirror/service/mirror_test.go b/pkg/gateway/mirror/service/mirror_test.go index edf41c8191..b96211d887 100644 --- a/pkg/gateway/mirror/service/mirror_test.go +++ b/pkg/gateway/mirror/service/mirror_test.go @@ -87,6 +87,7 @@ func Test_mirr_Connect(t *testing.T) { ConnectFunc: func(_ context.Context, _ string, _ ...grpc.DialOption) (conn pool.Conn, err error) { return conn, err }, + SetDisableResolveDNSAddrFunc: func(addr string, disabled bool) {}, } }, }, @@ -118,6 +119,7 @@ func Test_mirr_Connect(t *testing.T) { ConnectFunc: func(_ context.Context, _ string, _ ...grpc.DialOption) (pool.Conn, error) { return nil, errors.New("missing port in address") }, + SetDisableResolveDNSAddrFunc: func(addr string, disabled bool) {}, } }, }, @@ -221,6 +223,7 @@ func Test_mirr_Disconnect(t *testing.T) { DisconnectFunc: func(_ context.Context, _ string) error { return nil }, + SetDisableResolveDNSAddrFunc: func(addr string, disabled bool) {}, } }, }, @@ -252,6 +255,7 @@ func Test_mirr_Disconnect(t *testing.T) { DisconnectFunc: func(_ context.Context, _ string) error { return errors.New("missing port in address") }, + SetDisableResolveDNSAddrFunc: func(addr string, disabled bool) {}, } }, }, @@ -373,6 +377,7 @@ func Test_mirr_MirrorTargets(t *testing.T) { IsConnectedFunc: func(_ context.Context, addr string) bool { return connected[addr] }, + SetDisableResolveDNSAddrFunc: func(addr string, disabled bool) {}, } }, }, @@ -498,6 +503,7 @@ func Test_mirr_connectedOtherMirrorAddrs(t *testing.T) { IsConnectedFunc: func(_ context.Context, addr string) bool { return connected[addr] }, + SetDisableResolveDNSAddrFunc: func(addr string, disabled bool) {}, } }, }, From c71cd5b69de8b481c4c1435a8fda3ecb08f6bb8d Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 20 Sep 2024 13:49:17 +0900 Subject: [PATCH 3/4] fix: unimplemented error Signed-off-by: hlts2 --- internal/test/mock/grpc_testify_mock.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/test/mock/grpc_testify_mock.go b/internal/test/mock/grpc_testify_mock.go index c853f27d4e..cca58973f4 100644 --- a/internal/test/mock/grpc_testify_mock.go +++ b/internal/test/mock/grpc_testify_mock.go @@ -213,3 +213,5 @@ func (c *ClientInternal) Close(ctx context.Context) error { args := c.Called(ctx) return args.Error(0) } + +func (c *ClientInternal) SetDisableResolveDNSAddr(addr string, distributed bool) {} From cee80eea6ca9699f6840e8672a1f3f5e29c4410d Mon Sep 17 00:00:00 2001 From: hlts2 Date: Fri, 20 Sep 2024 13:52:51 +0900 Subject: [PATCH 4/4] fix: change ForwardedContext method to private Signed-off-by: hlts2 --- pkg/gateway/mirror/service/gateway.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/gateway/mirror/service/gateway.go b/pkg/gateway/mirror/service/gateway.go index 2e4ed97974..51b4a7ad2a 100644 --- a/pkg/gateway/mirror/service/gateway.go +++ b/pkg/gateway/mirror/service/gateway.go @@ -33,7 +33,6 @@ const ( // Gateway represents an interface for interacting with gRPC clients. type Gateway interface { - ForwardedContext(ctx context.Context, podName string) context.Context FromForwardedContext(ctx context.Context) string BroadCast(ctx context.Context, f func(ctx context.Context, target string, vc MirrorClient, copts ...grpc.CallOption) error) error @@ -74,9 +73,9 @@ func (g *gateway) GRPCClient() grpc.Client { return g.client.GRPCClient() } -// ForwardedContext takes a context and a podName, returning a new context +// forwardedContext takes a context and a podName, returning a new context // with additional information related to forwarding. -func (*gateway) ForwardedContext(ctx context.Context, podName string) context.Context { +func (*gateway) forwardedContext(ctx context.Context, podName string) context.Context { return grpc.NewOutgoingContext(ctx, grpc.MD{ forwardedContextKey: []string{ podName, @@ -114,7 +113,7 @@ func (g *gateway) BroadCast( span.End() } }() - return g.client.GRPCClient().RangeConcurrent(g.ForwardedContext(ctx, g.podName), -1, func(ictx context.Context, + return g.client.GRPCClient().RangeConcurrent(g.forwardedContext(ctx, g.podName), -1, func(ictx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption, ) (err error) { select { @@ -144,7 +143,7 @@ func (g *gateway) Do( if target == "" { return nil, errors.ErrTargetNotFound } - fctx := g.ForwardedContext(ctx, g.podName) + fctx := g.forwardedContext(ctx, g.podName) res, err = g.client.GRPCClient().Do(fctx, target, func(ictx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (any, error) { return f(ictx, target, NewMirrorClient(conn), copts...) @@ -179,7 +178,7 @@ func (g *gateway) DoMulti( if len(targets) == 0 { return errors.ErrTargetNotFound } - return g.client.GRPCClient().OrderedRangeConcurrent(g.ForwardedContext(ctx, g.podName), targets, -1, + return g.client.GRPCClient().OrderedRangeConcurrent(g.forwardedContext(ctx, g.podName), targets, -1, func(ictx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) (err error) { select { case <-ictx.Done():