Skip to content

Commit

Permalink
enhance: Move proxy client manager to util package (milvus-io#28955)
Browse files Browse the repository at this point in the history
issue:  milvus-io#28898

This PR move the `ProxyClientManager` to util package, in case of
reusing it's implementation in querycoord

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 20, 2023
1 parent 1b5ac82 commit e41fd6f
Show file tree
Hide file tree
Showing 17 changed files with 1,396 additions and 458 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,10 @@ generate-mockery-utils: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=internal/tso --output=internal/tso/mocks --filename=allocator.go --with-expecter --structname=Allocator --outpkg=mocktso
$(INSTALL_PATH)/mockery --name=SessionInterface --dir=$(PWD)/internal/util/sessionutil --output=$(PWD)/internal/util/sessionutil --filename=mock_session.go --with-expecter --structname=MockSession --inpackage
$(INSTALL_PATH)/mockery --name=GrpcClient --dir=$(PWD)/internal/util/grpcclient --output=$(PWD)/internal/mocks --filename=mock_grpc_client.go --with-expecter --structname=MockGrpcClient

# proxy_client_manager.go
$(INSTALL_PATH)/mockery --name=ProxyClientManagerInterface --dir=$(PWD)/internal/util/proxyutil --output=$(PWD)/internal/util/proxyutil --filename=mock_proxy_client_manager.go --with-expecter --structname=MockProxyClientManager --inpackage
$(INSTALL_PATH)/mockery --name=ProxyWatcherInterface --dir=$(PWD)/internal/util/proxyutil --output=$(PWD)/internal/util/proxyutil --filename=mock_proxy_watcher.go --with-expecter --structname=MockProxyWatcher --inpackage

generate-mockery-kv: getdeps
$(INSTALL_PATH)/mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=txn_kv.go --with-expecter
$(INSTALL_PATH)/mockery --name=MetaKv --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=meta_kv.go --with-expecter
Expand Down
3 changes: 2 additions & 1 deletion internal/rootcoord/drop_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -79,7 +80,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
collectionNames: append(aliases, collMeta.Name),
collectionID: collMeta.CollectionID,
ts: ts,
opts: []expireCacheOpt{expireCacheWithDropFlag()},
opts: []proxyutil.ExpireCacheOpt{proxyutil.ExpireCacheWithDropFlag()},
})
redoTask.AddSyncStep(&changeCollectionStateStep{
baseStep: baseStep{core: t.core},
Expand Down
30 changes: 2 additions & 28 deletions internal/rootcoord/expire_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,14 @@ package rootcoord
import (
"context"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type expireCacheConfig struct {
withDropFlag bool
}

func (c expireCacheConfig) apply(req *proxypb.InvalidateCollMetaCacheRequest) {
if !c.withDropFlag {
return
}
if req.GetBase() == nil {
req.Base = commonpbutil.NewMsgBase()
}
req.Base.MsgType = commonpb.MsgType_DropCollection
}

func defaultExpireCacheConfig() expireCacheConfig {
return expireCacheConfig{withDropFlag: false}
}

type expireCacheOpt func(c *expireCacheConfig)

func expireCacheWithDropFlag() expireCacheOpt {
return func(c *expireCacheConfig) {
c.withDropFlag = true
}
}

// ExpireMetaCache will call invalidate collection meta cache
func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []string, collectionID UniqueID, ts typeutil.Timestamp, opts ...expireCacheOpt) error {
func (c *Core) ExpireMetaCache(ctx context.Context, dbName string, collNames []string, collectionID UniqueID, ts typeutil.Timestamp, opts ...proxyutil.ExpireCacheOpt) error {
// if collectionID is specified, invalidate all the collection meta cache with the specified collectionID and return
if collectionID != InvalidCollectionID {
req := proxypb.InvalidateCollMetaCacheRequest{
Expand Down
9 changes: 5 additions & 4 deletions internal/rootcoord/expire_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
)

func Test_expireCacheConfig_apply(t *testing.T) {
c := defaultExpireCacheConfig()
c := proxyutil.DefaultExpireCacheConfig()
req := &proxypb.InvalidateCollMetaCacheRequest{}
c.apply(req)
c.Apply(req)
assert.Nil(t, req.GetBase())
opt := expireCacheWithDropFlag()
opt := proxyutil.ExpireCacheWithDropFlag()
opt(&c)
c.apply(req)
c.Apply(req)
assert.Equal(t, commonpb.MsgType_DropCollection, req.GetBase().GetMsgType())
}
15 changes: 7 additions & 8 deletions internal/rootcoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand Down Expand Up @@ -385,9 +386,7 @@ func newTestCore(opts ...Opt) *Core {

func withValidProxyManager() Opt {
return func(c *Core) {
c.proxyClientManager = &proxyClientManager{
proxyClient: typeutil.NewConcurrentMap[int64, types.ProxyClient](),
}
c.proxyClientManager = proxyutil.NewProxyClientManager(proxyutil.DefaultProxyCreator)
p := newMockProxy()
p.InvalidateCollectionMetaCacheFunc = func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return merr.Success(), nil
Expand All @@ -398,15 +397,14 @@ func withValidProxyManager() Opt {
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil
}
c.proxyClientManager.proxyClient.Insert(TestProxyID, p)
clients := c.proxyClientManager.GetProxyClients()
clients.Insert(TestProxyID, p)
}
}

func withInvalidProxyManager() Opt {
return func(c *Core) {
c.proxyClientManager = &proxyClientManager{
proxyClient: typeutil.NewConcurrentMap[int64, types.ProxyClient](),
}
c.proxyClientManager = proxyutil.NewProxyClientManager(proxyutil.DefaultProxyCreator)
p := newMockProxy()
p.InvalidateCollectionMetaCacheFunc = func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return merr.Success(), errors.New("error mock InvalidateCollectionMetaCache")
Expand All @@ -417,7 +415,8 @@ func withInvalidProxyManager() Opt {
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil
}
c.proxyClientManager.proxyClient.Insert(TestProxyID, p)
clients := c.proxyClientManager.GetProxyClients()
clients.Insert(TestProxyID, p)
}
}

Expand Down
Loading

0 comments on commit e41fd6f

Please sign in to comment.