Skip to content

Commit

Permalink
enhance: make configure load param feature be compatible with old sdk (
Browse files Browse the repository at this point in the history
…milvus-io#35520)

issue: milvus-io#31570 milvus-io#35521

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Aug 20, 2024
1 parent 6e29d71 commit 22ced01
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 23 deletions.
14 changes: 7 additions & 7 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (suite *JobSuite) TestLoadCollection() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)

// Load with 3 replica on 3 rg
req = &querypb.LoadCollectionRequest{
Expand All @@ -384,7 +384,7 @@ func (suite *JobSuite) TestLoadCollection() {
)
suite.scheduler.Add(job)
err = job.Wait()
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
}

func (suite *JobSuite) TestLoadCollectionWithReplicas() {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
}
}

Expand Down Expand Up @@ -596,7 +596,7 @@ func (suite *JobSuite) TestLoadPartition() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.Contains(err.Error(), meta.ErrNodeNotEnough.Error())
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)

// test load 3 replica in 3 rg, should pass rg check
req = &querypb.LoadPartitionsRequest{
Expand All @@ -619,7 +619,7 @@ func (suite *JobSuite) TestLoadPartition() {
)
suite.scheduler.Add(job)
err = job.Wait()
suite.Contains(err.Error(), meta.ErrNodeNotEnough.Error())
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
}

func (suite *JobSuite) TestDynamicLoad() {
Expand Down Expand Up @@ -766,7 +766,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorContains(err, meta.ErrNodeNotEnough.Error())
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
}
}

Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() {
)
suite.scheduler.Add(job)
err := job.Wait()
suite.ErrorIs(err, meta.ErrNodeNotEnough)
suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough)
}
}

Expand Down
8 changes: 6 additions & 2 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
return merr.Status(err), nil
}

if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// to be compatible with old sdk, which set replica=1 if replica is not specified
// so only both replica and resource groups didn't set in request, it will turn to use the configured load info
if req.GetReplicaNumber() <= 0 && len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use pre-defined load config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
Expand Down Expand Up @@ -333,7 +335,9 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
return merr.Status(err), nil
}

if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// to be compatible with old sdk, which set replica=1 if replica is not specified
// so only both replica and resource groups didn't set in request, it will turn to use the configured load info
if req.GetReplicaNumber() <= 0 && len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use database level config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
Expand Down
19 changes: 8 additions & 11 deletions internal/querycoordv2/utils/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package utils

import (
"strings"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
Expand All @@ -27,13 +29,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var (
ErrGetNodesFromRG = errors.New("failed to get node from rg")
ErrNoReplicaFound = errors.New("no replica found during assign nodes")
ErrReplicasInconsistent = errors.New("all replicas should belong to same collection during assign nodes")
ErrUseWrongNumRG = errors.New("resource group num can only be 0, 1 or same as replica number")
)

func GetPartitions(collectionMgr *meta.CollectionManager, collectionID int64) ([]int64, error) {
collection := collectionMgr.GetCollection(collectionID)
if collection != nil {
Expand Down Expand Up @@ -117,7 +112,8 @@ func RecoverAllCollection(m *meta.Meta) {

func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int32) (map[string]int, error) {
if len(resourceGroups) != 0 && len(resourceGroups) != 1 && len(resourceGroups) != int(replicaNumber) {
return nil, ErrUseWrongNumRG
return nil, errors.Errorf(
"replica=[%d] resource group=[%s], resource group num can only be 0, 1 or same as replica number", replicaNumber, strings.Join(resourceGroups, ","))
}

replicaNumInRG := make(map[string]int)
Expand All @@ -140,15 +136,16 @@ func checkResourceGroup(m *meta.Meta, resourceGroups []string, replicaNumber int
// 3. replica1 spawn finished, but cannot find related resource group.
for rgName, num := range replicaNumInRG {
if !m.ContainResourceGroup(rgName) {
return nil, ErrGetNodesFromRG
return nil, merr.WrapErrResourceGroupNotFound(rgName)
}
nodes, err := m.ResourceManager.GetNodes(rgName)
if err != nil {
return nil, err
}
if num > len(nodes) {
log.Warn("node not enough", zap.Error(meta.ErrNodeNotEnough), zap.Int("replicaNum", num), zap.Int("nodeNum", len(nodes)), zap.String("rgName", rgName))
return nil, meta.ErrNodeNotEnough
err := merr.WrapErrResourceGroupNodeNotEnough(rgName, len(nodes), num)
log.Warn("failed to check resource group", zap.Error(err))
return nil, err
}
}
return replicaNumInRG, nil
Expand Down
6 changes: 3 additions & 3 deletions tests/python_client/testcases/test_resourcegroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ def test_load_collection_with_multi_replicas_multi_rgs(self, replicas):

# load with different replicas
error = {ct.err_code: 999,
ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'}
ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'}
collection_w.load(replica_number=replicas,
_resource_groups=[rgA_name, rgB_name],
check_task=CheckTasks.err_res, check_items=error)
Expand Down Expand Up @@ -877,7 +877,7 @@ def test_load_partition_with_multi_replicas_multi_rgs(self, replicas):

# load with different replicas
error = {ct.err_code: 999,
ct.err_msg: 'failed to load partitions, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'}
ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'}
partition_w.load(replica_number=replicas,
_resource_groups=[rgA_name, rgB_name],
check_task=CheckTasks.err_res, check_items=error)
Expand Down Expand Up @@ -1210,7 +1210,7 @@ def test_load_with_replicas_and_rgs_num(self, with_growing):
# load 3 replicas in rgA and rgB
replica_number = 3
error = {ct.err_code: 999,
ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'}
ct.err_msg: 'resource group num can only be 0, 1 or same as replica number'}
collection_w.load(replica_number=replica_number,
_resource_groups=[rgA_name, rgB_name],
check_task=CheckTasks.err_res,
Expand Down

0 comments on commit 22ced01

Please sign in to comment.