From 22ced010cdb04cb989f0eb1e4534acff468b5153 Mon Sep 17 00:00:00 2001 From: wei liu Date: Tue, 20 Aug 2024 10:30:55 +0800 Subject: [PATCH] enhance: make configure load param feature be compatible with old sdk (#35520) issue: #31570 #35521 --------- Signed-off-by: Wei Liu --- internal/querycoordv2/job/job_test.go | 14 +++++++------- internal/querycoordv2/services.go | 8 ++++++-- internal/querycoordv2/utils/meta.go | 19 ++++++++----------- .../testcases/test_resourcegroup.py | 6 +++--- 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index ad07022f6fb5e..d509d0ac51c04 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -362,7 +362,7 @@ func (suite *JobSuite) TestLoadCollection() { ) suite.scheduler.Add(job) err := job.Wait() - suite.ErrorContains(err, meta.ErrNodeNotEnough.Error()) + suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) // Load with 3 replica on 3 rg req = &querypb.LoadCollectionRequest{ @@ -384,7 +384,7 @@ func (suite *JobSuite) TestLoadCollection() { ) suite.scheduler.Add(job) err = job.Wait() - suite.ErrorContains(err, meta.ErrNodeNotEnough.Error()) + suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } func (suite *JobSuite) TestLoadCollectionWithReplicas() { @@ -414,7 +414,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() { ) suite.scheduler.Add(job) err := job.Wait() - suite.ErrorContains(err, meta.ErrNodeNotEnough.Error()) + suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } } @@ -596,7 +596,7 @@ func (suite *JobSuite) TestLoadPartition() { ) suite.scheduler.Add(job) err := job.Wait() - suite.Contains(err.Error(), meta.ErrNodeNotEnough.Error()) + suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) // test load 3 replica in 3 rg, should pass rg check req = &querypb.LoadPartitionsRequest{ @@ -619,7 +619,7 @@ func (suite *JobSuite) TestLoadPartition() { ) suite.scheduler.Add(job) err = job.Wait() - suite.Contains(err.Error(), meta.ErrNodeNotEnough.Error()) + suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } func (suite *JobSuite) TestDynamicLoad() { @@ -766,7 +766,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() { ) suite.scheduler.Add(job) err := job.Wait() - suite.ErrorContains(err, meta.ErrNodeNotEnough.Error()) + suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } } @@ -1107,7 +1107,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() { ) suite.scheduler.Add(job) err := job.Wait() - suite.ErrorIs(err, meta.ErrNodeNotEnough) + suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 6b3f4c43d1539..ea132a3a910e2 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -216,7 +216,9 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection return merr.Status(err), nil } - if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 { + // to be compatible with old sdk, which set replica=1 if replica is not specified + // so only both replica and resource groups didn't set in request, it will turn to use the configured load info + if req.GetReplicaNumber() <= 0 && len(req.GetResourceGroups()) == 0 { // when replica number or resource groups is not set, use pre-defined load config rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID()) if err != nil { @@ -333,7 +335,9 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions return merr.Status(err), nil } - if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 { + // to be compatible with old sdk, which set replica=1 if replica is not specified + // so only both replica and resource groups didn't set in request, it will turn to use the configured load info + if req.GetReplicaNumber() <= 0 && len(req.GetResourceGroups()) == 0 { // when replica number or resource groups is not set, use database level config rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID()) if err != nil { diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index b6ac15839e0b2..4fbb5a663a396 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -17,6 +17,8 @@ package utils import ( + "strings" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -27,13 +29,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var ( - ErrGetNodesFromRG = errors.New("failed to get node from rg") - ErrNoReplicaFound = errors.New("no replica found during assign nodes") - ErrReplicasInconsistent = errors.New("all replicas should belong to same collection during assign nodes") - ErrUseWrongNumRG = errors.New("resource group num can only be 0, 1 or same as replica number") -) - func GetPartitions(collectionMgr *meta.CollectionManager, collectionID int64) ([]int64, error) { collection := collectionMgr.GetCollection(collectionID) if collection != nil { @@ -117,7 +112,8 @@ func RecoverAllCollection(m *meta.Meta) { func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int32) (map[string]int, error) { if len(resourceGroups) != 0 && len(resourceGroups) != 1 && len(resourceGroups) != int(replicaNumber) { - return nil, ErrUseWrongNumRG + return nil, errors.Errorf( + "replica=[%d] resource group=[%s], resource group num can only be 0, 1 or same as replica number", replicaNumber, strings.Join(resourceGroups, ",")) } replicaNumInRG := make(map[string]int) @@ -140,15 +136,16 @@ func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int // 3. replica1 spawn finished, but cannot find related resource group. for rgName, num := range replicaNumInRG { if !m.ContainResourceGroup(rgName) { - return nil, ErrGetNodesFromRG + return nil, merr.WrapErrResourceGroupNotFound(rgName) } nodes, err := m.ResourceManager.GetNodes(rgName) if err != nil { return nil, err } if num > len(nodes) { - log.Warn("node not enough", zap.Error(meta.ErrNodeNotEnough), zap.Int("replicaNum", num), zap.Int("nodeNum", len(nodes)), zap.String("rgName", rgName)) - return nil, meta.ErrNodeNotEnough + err := merr.WrapErrResourceGroupNodeNotEnough(rgName, len(nodes), num) + log.Warn("failed to check resource group", zap.Error(err)) + return nil, err } } return replicaNumInRG, nil diff --git a/tests/python_client/testcases/test_resourcegroup.py b/tests/python_client/testcases/test_resourcegroup.py index d1e481cceae39..1cf9d5b892ad8 100644 --- a/tests/python_client/testcases/test_resourcegroup.py +++ b/tests/python_client/testcases/test_resourcegroup.py @@ -710,7 +710,7 @@ def test_load_collection_with_multi_replicas_multi_rgs(self, replicas): # load with different replicas error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'} + ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'} collection_w.load(replica_number=replicas, _resource_groups=[rgA_name, rgB_name], check_task=CheckTasks.err_res, check_items=error) @@ -877,7 +877,7 @@ def test_load_partition_with_multi_replicas_multi_rgs(self, replicas): # load with different replicas error = {ct.err_code: 999, - ct.err_msg: 'failed to load partitions, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'} + ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'} partition_w.load(replica_number=replicas, _resource_groups=[rgA_name, rgB_name], check_task=CheckTasks.err_res, check_items=error) @@ -1210,7 +1210,7 @@ def test_load_with_replicas_and_rgs_num(self, with_growing): # load 3 replicas in rgA and rgB replica_number = 3 error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'} + ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'} collection_w.load(replica_number=replica_number, _resource_groups=[rgA_name, rgB_name], check_task=CheckTasks.err_res,