diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 0f83cddd485..75a6140553d 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -15,7 +15,6 @@ package apis import ( - "errors" "fmt" "net/http" "reflect" @@ -28,6 +27,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/tikv/pd/pkg/errs" rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" @@ -175,9 +175,15 @@ func (s *Service) getResourceGroup(c *gin.Context) { return } keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue) - group := s.manager.GetResourceGroup(keyspaceID, c.Param("name"), withStats) + groupName := c.Param("name") + group, err := s.manager.GetResourceGroup(keyspaceID, groupName, withStats) + if err != nil { + c.String(http.StatusNotFound, err.Error()) + return + } if group == nil { - c.String(http.StatusNotFound, errors.New("resource group not found").Error()) + c.String(http.StatusNotFound, errs.ErrResourceGroupNotExists.FastGenByArgs(groupName).Error()) + return } c.IndentedJSON(http.StatusOK, group) } @@ -200,7 +206,11 @@ func (s *Service) getResourceGroupList(c *gin.Context) { return } keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue) - groups := s.manager.GetResourceGroupList(keyspaceID, withStats) + groups, err := s.manager.GetResourceGroupList(keyspaceID, withStats) + if err != nil { + c.String(http.StatusNotFound, err.Error()) + return + } c.IndentedJSON(http.StatusOK, groups) } @@ -223,6 +233,7 @@ func (s *Service) deleteResourceGroup(c *gin.Context) { keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue) if err := s.manager.DeleteResourceGroup(keyspaceID, c.Param("name")); err != nil { c.String(http.StatusNotFound, err.Error()) + return } c.String(http.StatusOK, "Success!") } diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index eea708f882f..318007dd985 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -93,9 +93,12 @@ func (s *Service) GetResourceGroup(_ context.Context, req *rmpb.GetResourceGroup if err := s.checkServing(); err != nil { return nil, err } - rg := s.manager.GetResourceGroup(ExtractKeyspaceID(req.GetKeyspaceId()), req.ResourceGroupName, req.WithRuStats) + rg, err := s.manager.GetResourceGroup(ExtractKeyspaceID(req.GetKeyspaceId()), req.ResourceGroupName, req.WithRuStats) + if err != nil { + return nil, err + } if rg == nil { - return nil, errors.New("resource group not found") + return nil, errs.ErrResourceGroupNotExists.FastGenByArgs(req.ResourceGroupName) } return &rmpb.GetResourceGroupResponse{ Group: rg.IntoProtoResourceGroup(), @@ -107,7 +110,10 @@ func (s *Service) ListResourceGroups(_ context.Context, req *rmpb.ListResourceGr if err := s.checkServing(); err != nil { return nil, err } - groups := s.manager.GetResourceGroupList(ExtractKeyspaceID(req.GetKeyspaceId()), req.WithRuStats) + groups, err := s.manager.GetResourceGroupList(ExtractKeyspaceID(req.GetKeyspaceId()), req.WithRuStats) + if err != nil { + return nil, err + } resp := &rmpb.ListResourceGroupsResponse{ Groups: make([]*rmpb.ResourceGroup, 0, len(groups)), } @@ -192,9 +198,9 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu continue } // Get the resource group from manager to acquire token buckets. - rg := s.manager.GetMutableResourceGroup(keyspaceID, resourceGroupName) + rg, err := s.manager.GetMutableResourceGroup(keyspaceID, resourceGroupName) if rg == nil { - log.Warn("resource group not found", logFields...) + log.Warn("resource group not found", append(logFields, zap.Error(err))...) continue } // Send the consumption to update the metrics. diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index a53b3d0a03a..daf8d0c81be 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -157,6 +157,21 @@ func (m *Manager) getKeyspaceResourceGroupManager(keyspaceID uint32) *keyspaceRe return m.krgms[keyspaceID] } +func (m *Manager) accessKeyspaceResourceGroupManager(keyspaceID uint32, groupName string) (*keyspaceResourceGroupManager, error) { + var krgm *keyspaceResourceGroupManager + if groupName == DefaultResourceGroupName { + // For the default resource group, if the keyspace manager doesn't exist yet + // and the group name is the default resource group name, we try to get or create it. + krgm = m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true) + } else { + krgm = m.getKeyspaceResourceGroupManager(keyspaceID) + } + if krgm == nil { + return nil, errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID) + } + return krgm, nil +} + // Init initializes the resource group manager. func (m *Manager) Init(ctx context.Context) error { v, err := m.storage.LoadControllerConfig() @@ -296,15 +311,16 @@ func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error { // ModifyResourceGroup modifies an existing resource group. func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error { keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId()) - krgm := m.getKeyspaceResourceGroupManager(keyspaceID) - if krgm == nil { - return errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID) + krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, grouppb.Name) + if err != nil { + return err } return krgm.modifyResourceGroup(grouppb) } // DeleteResourceGroup deletes a resource group. func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error { + // "default" group can't be deleted, so there is not need to call accessKeyspaceResourceGroupManager krgm := m.getKeyspaceResourceGroupManager(keyspaceID) if krgm == nil { return errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID) @@ -313,38 +329,38 @@ func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error { } // GetResourceGroup returns a copy of a resource group. -func (m *Manager) GetResourceGroup(keyspaceID uint32, name string, withStats bool) *ResourceGroup { - krgm := m.getKeyspaceResourceGroupManager(keyspaceID) - if krgm == nil { - return nil +func (m *Manager) GetResourceGroup(keyspaceID uint32, name string, withStats bool) (*ResourceGroup, error) { + krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, name) + if err != nil { + return nil, err } - return krgm.getResourceGroup(name, withStats) + return krgm.getResourceGroup(name, withStats), nil } // GetMutableResourceGroup returns a mutable resource group. -func (m *Manager) GetMutableResourceGroup(keyspaceID uint32, name string) *ResourceGroup { - krgm := m.getKeyspaceResourceGroupManager(keyspaceID) - if krgm == nil { - return nil +func (m *Manager) GetMutableResourceGroup(keyspaceID uint32, name string) (*ResourceGroup, error) { + krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, name) + if err != nil { + return nil, err } - return krgm.getMutableResourceGroup(name) + return krgm.getMutableResourceGroup(name), nil } // GetResourceGroupList returns copies of resource group list. -func (m *Manager) GetResourceGroupList(keyspaceID uint32, withStats bool) []*ResourceGroup { - krgm := m.getKeyspaceResourceGroupManager(keyspaceID) - if krgm == nil { - return nil +func (m *Manager) GetResourceGroupList(keyspaceID uint32, withStats bool) ([]*ResourceGroup, error) { + krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, DefaultResourceGroupName) + if err != nil { + return nil, err } - return krgm.getResourceGroupList(withStats, true) + return krgm.getResourceGroupList(withStats, true), nil } -func (m *Manager) getRUTracker(keyspaceID uint32, name string) *ruTracker { - krgm := m.getKeyspaceResourceGroupManager(keyspaceID) - if krgm == nil { - return nil +func (m *Manager) getRUTracker(keyspaceID uint32, name string) (*ruTracker, error) { + krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, DefaultResourceGroupName) + if err != nil { + return nil, err } - return krgm.getOrCreateRUTracker(name) + return krgm.getOrCreateRUTracker(name), nil } func (m *Manager) persistLoop(ctx context.Context) { @@ -496,11 +512,21 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { sinceLastRecord := m.metrics.recordConsumption(consumptionInfo, keyspaceName, m.controllerConfig, now) resourceGroupName := consumptionInfo.resourceGroupName // TODO: maybe we need to distinguish background ru. - if rg := m.GetMutableResourceGroup(keyspaceID, resourceGroupName); rg != nil { + if rg, err := m.GetMutableResourceGroup(keyspaceID, resourceGroupName); rg != nil { rg.UpdateRUConsumption(consumptionInfo.Consumption) + } else { + log.Error("failed to get mutable resource group", + zap.Uint32("keyspace-id", keyspaceID), + zap.String("resource-group-name", resourceGroupName), + zap.Error(err)) } - if rt := m.getRUTracker(keyspaceID, resourceGroupName); rt != nil { + if rt, err := m.getRUTracker(keyspaceID, resourceGroupName); rt != nil { rt.sample(now, consumptionInfo.RRU+consumptionInfo.WRU, sinceLastRecord) + } else { + log.Error("failed to get RU tracker", + zap.Uint32("keyspace-id", keyspaceID), + zap.String("resource-group-name", resourceGroupName), + zap.Error(err)) } case <-cleanUpTicker.C: // Clean up the metrics that have not been updated for a long time. diff --git a/pkg/mcs/resourcemanager/server/manager_test.go b/pkg/mcs/resourcemanager/server/manager_test.go index 4bb73b1617b..09a3494bdcf 100644 --- a/pkg/mcs/resourcemanager/server/manager_test.go +++ b/pkg/mcs/resourcemanager/server/manager_test.go @@ -93,7 +93,8 @@ func TestInitManager(t *testing.T) { re.NoError(err) re.Len(m.getKeyspaceResourceGroupManagers(), 2) // Get the default resource group. - rg := m.GetResourceGroup(1, DefaultResourceGroupName, true) + rg, err := m.GetResourceGroup(1, DefaultResourceGroupName, true) + re.NoError(err) re.NotNil(rg) // Verify the default resource group settings are updated. This is to ensure the default resource group // can be loaded from the storage correctly rather than created as a new one. @@ -149,7 +150,8 @@ func checkBackgroundMetricsFlush(ctx context.Context, re *require.Assertions, ma keyspaceID := ExtractKeyspaceID(req.GetKeyspaceId()) // Verify consumption was added to the resource group. testutil.Eventually(re, func() bool { - updatedGroup := manager.GetResourceGroup(keyspaceID, req.GetResourceGroupName(), true) + updatedGroup, err := manager.GetResourceGroup(keyspaceID, req.GetResourceGroupName(), true) + re.NoError(err) re.NotNil(updatedGroup) return updatedGroup.RUConsumption.RRU == req.ConsumptionSinceLastRequest.RRU && updatedGroup.RUConsumption.WRU == req.ConsumptionSinceLastRequest.WRU @@ -215,7 +217,8 @@ func checkAddAndModifyResourceGroup(re *require.Assertions, manager *Manager, ke keyspaceID := ExtractKeyspaceID(keyspaceIDValue) testutil.Eventually(re, func() bool { - rg := manager.GetResourceGroup(keyspaceID, group.Name, true) + rg, err := manager.GetResourceGroup(keyspaceID, group.Name, true) + re.NoError(err) re.NotNil(rg) return rg.Priority == group.Priority && rg.RUSettings.RU.Settings.BurstLimit == group.RUSettings.RU.Settings.BurstLimit diff --git a/tests/integrations/mcs/resourcemanager/api_test.go b/tests/integrations/mcs/resourcemanager/api_test.go index fc5720d22bb..6fdafb9526e 100644 --- a/tests/integrations/mcs/resourcemanager/api_test.go +++ b/tests/integrations/mcs/resourcemanager/api_test.go @@ -201,6 +201,59 @@ func (suite *resourceManagerAPITestSuite) TestResourceGroupAPI() { } } +func (suite *resourceManagerAPITestSuite) TestResourceGroupAPIInit() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) + }() + testFuncs := []func(keyspaceID uint32, keyspaceName string){ + func(_ uint32, keyspaceName string) { + group := suite.mustGetResourceGroup(re, "group", keyspaceName) + re.Nil(group) + group = suite.mustGetResourceGroup(re, "default", keyspaceName) + re.NotNil(group) + }, + func(_ uint32, keyspaceName string) { + groups := suite.mustGetResourceGroupList(re, keyspaceName) + re.Len(groups, 1) + re.Equal(server.DefaultResourceGroupName, groups[0].Name) + }, + func(keyspaceID uint32, _ string) { + groupToUpdate := &rmpb.ResourceGroup{ + Name: server.DefaultResourceGroupName, + Mode: rmpb.GroupMode_RUMode, + Priority: 5, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 200, + BurstLimit: 200, + }, + }, + }, + KeyspaceId: &rmpb.KeyspaceIDValue{ + Value: keyspaceID, + }, + } + suite.mustUpdateResourceGroup(re, groupToUpdate) + }, + } + for i, testFunc := range testFuncs { + // Create keyspace + keyspaceName := fmt.Sprint("test_keyspace_", i) + leaderServer := suite.cluster.GetLeaderServer() + meta, err := leaderServer.GetKeyspaceManager().CreateKeyspace( + &keyspace.CreateKeyspaceRequest{ + Name: keyspaceName, + }, + ) + re.NoError(err) + // Run test + testFunc(meta.GetId(), keyspaceName) + } +} + func (suite *resourceManagerAPITestSuite) mustAddResourceGroup(re *require.Assertions, group *rmpb.ResourceGroup) { bodyBytes := suite.mustSendRequest(re, http.MethodPost, "/config/group", group) re.Equal("Success!", string(bodyBytes)) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 8c9338a8da5..12c848c6f13 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -973,7 +973,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { re.NoError(err) re.Contains(dresp, "Success!") _, err = cli.GetResourceGroup(suite.ctx, g.Name) - re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = resource group not found", g.Name)) + re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = [PD:resourcemanager:ErrGroupNotExists]the %v resource group does not exist", g.Name, g.Name)) } // to test the deletion of persistence @@ -1620,7 +1620,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace() // Get and List resource group without keyspace id rg, err := cli.GetResourceGroup(suite.ctx, group.Name) - re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = resource group not found", group.Name)) + re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = [PD:resourcemanager:ErrGroupNotExists]the %v resource group does not exist", group.Name, group.Name)) re.Nil(rg) rgs, err := cli.ListResourceGroups(suite.ctx) re.NoError(err) @@ -1695,7 +1695,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace() re.NoError(err) re.Contains(resp, "Success!") rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) - re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = resource group not found", group.Name)) + re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = [PD:resourcemanager:ErrGroupNotExists]the %v resource group does not exist", group.Name, group.Name)) re.Nil(rg) }