Skip to content

mcs: init default resource group name when access new keyspace #9393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apis

import (
"errors"
"fmt"
"net/http"
"reflect"
Expand All @@ -28,6 +27,7 @@

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"

"github.com/tikv/pd/pkg/errs"
rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
Expand Down Expand Up @@ -175,9 +175,15 @@
return
}
keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue)
group := s.manager.GetResourceGroup(keyspaceID, c.Param("name"), withStats)
groupName := c.Param("name")
group, err := s.manager.GetResourceGroup(keyspaceID, groupName, withStats)
if err != nil {
c.String(http.StatusNotFound, err.Error())
return
}
if group == nil {
c.String(http.StatusNotFound, errors.New("resource group not found").Error())
c.String(http.StatusNotFound, errs.ErrResourceGroupNotExists.FastGenByArgs(groupName).Error())
return
}
c.IndentedJSON(http.StatusOK, group)
}
Expand All @@ -200,7 +206,11 @@
return
}
keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue)
groups := s.manager.GetResourceGroupList(keyspaceID, withStats)
groups, err := s.manager.GetResourceGroupList(keyspaceID, withStats)
if err != nil {
c.String(http.StatusNotFound, err.Error())
return
}

Check warning on line 213 in pkg/mcs/resourcemanager/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/apis/v1/api.go#L211-L213

Added lines #L211 - L213 were not covered by tests
c.IndentedJSON(http.StatusOK, groups)
}

Expand All @@ -223,6 +233,7 @@
keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue)
if err := s.manager.DeleteResourceGroup(keyspaceID, c.Param("name")); err != nil {
c.String(http.StatusNotFound, err.Error())
return
}
c.String(http.StatusOK, "Success!")
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@
if err := s.checkServing(); err != nil {
return nil, err
}
rg := s.manager.GetResourceGroup(ExtractKeyspaceID(req.GetKeyspaceId()), req.ResourceGroupName, req.WithRuStats)
rg, err := s.manager.GetResourceGroup(ExtractKeyspaceID(req.GetKeyspaceId()), req.ResourceGroupName, req.WithRuStats)
if err != nil {
return nil, err
}

Check warning on line 99 in pkg/mcs/resourcemanager/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/grpc_service.go#L98-L99

Added lines #L98 - L99 were not covered by tests
if rg == nil {
return nil, errors.New("resource group not found")
return nil, errs.ErrResourceGroupNotExists.FastGenByArgs(req.ResourceGroupName)
}
return &rmpb.GetResourceGroupResponse{
Group: rg.IntoProtoResourceGroup(),
Expand All @@ -107,7 +110,10 @@
if err := s.checkServing(); err != nil {
return nil, err
}
groups := s.manager.GetResourceGroupList(ExtractKeyspaceID(req.GetKeyspaceId()), req.WithRuStats)
groups, err := s.manager.GetResourceGroupList(ExtractKeyspaceID(req.GetKeyspaceId()), req.WithRuStats)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a status code?

return nil, err
}

Check warning on line 116 in pkg/mcs/resourcemanager/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/grpc_service.go#L115-L116

Added lines #L115 - L116 were not covered by tests
resp := &rmpb.ListResourceGroupsResponse{
Groups: make([]*rmpb.ResourceGroup, 0, len(groups)),
}
Expand Down Expand Up @@ -192,9 +198,9 @@
continue
}
// Get the resource group from manager to acquire token buckets.
rg := s.manager.GetMutableResourceGroup(keyspaceID, resourceGroupName)
rg, err := s.manager.GetMutableResourceGroup(keyspaceID, resourceGroupName)
if rg == nil {
log.Warn("resource group not found", logFields...)
log.Warn("resource group not found", append(logFields, zap.Error(err))...)
continue
}
// Send the consumption to update the metrics.
Expand Down
76 changes: 51 additions & 25 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@
return m.krgms[keyspaceID]
}

func (m *Manager) accessKeyspaceResourceGroupManager(keyspaceID uint32, groupName string) (*keyspaceResourceGroupManager, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add it to GetKeyspaceServiceLimiter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, we can return "Not found" error if the keyspace is not inited.

var krgm *keyspaceResourceGroupManager
if groupName == DefaultResourceGroupName {
// For the default resource group, if the keyspace manager doesn't exist yet
// and the group name is the default resource group name, we try to get or create it.
krgm = m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true)
} else {
krgm = m.getKeyspaceResourceGroupManager(keyspaceID)
}
if krgm == nil {
return nil, errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID)
}
return krgm, nil
}

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) error {
v, err := m.storage.LoadControllerConfig()
Expand Down Expand Up @@ -296,15 +311,16 @@
// ModifyResourceGroup modifies an existing resource group.
func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error {
keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId())
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
return errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID)
krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, grouppb.Name)
if err != nil {
return err

Check warning on line 316 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L316

Added line #L316 was not covered by tests
}
return krgm.modifyResourceGroup(grouppb)
}

// DeleteResourceGroup deletes a resource group.
func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error {
// "default" group can't be deleted, so there is not need to call accessKeyspaceResourceGroupManager
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
return errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID)
Expand All @@ -313,38 +329,38 @@
}

// GetResourceGroup returns a copy of a resource group.
func (m *Manager) GetResourceGroup(keyspaceID uint32, name string, withStats bool) *ResourceGroup {
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
return nil
func (m *Manager) GetResourceGroup(keyspaceID uint32, name string, withStats bool) (*ResourceGroup, error) {
krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, name)
if err != nil {
return nil, err
}
return krgm.getResourceGroup(name, withStats)
return krgm.getResourceGroup(name, withStats), nil
}

// GetMutableResourceGroup returns a mutable resource group.
func (m *Manager) GetMutableResourceGroup(keyspaceID uint32, name string) *ResourceGroup {
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
return nil
func (m *Manager) GetMutableResourceGroup(keyspaceID uint32, name string) (*ResourceGroup, error) {
krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, name)
if err != nil {
return nil, err

Check warning on line 344 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L344

Added line #L344 was not covered by tests
}
return krgm.getMutableResourceGroup(name)
return krgm.getMutableResourceGroup(name), nil
}

// GetResourceGroupList returns copies of resource group list.
func (m *Manager) GetResourceGroupList(keyspaceID uint32, withStats bool) []*ResourceGroup {
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
return nil
func (m *Manager) GetResourceGroupList(keyspaceID uint32, withStats bool) ([]*ResourceGroup, error) {
krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, DefaultResourceGroupName)
if err != nil {
return nil, err

Check warning on line 353 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L353

Added line #L353 was not covered by tests
}
return krgm.getResourceGroupList(withStats, true)
return krgm.getResourceGroupList(withStats, true), nil
}

func (m *Manager) getRUTracker(keyspaceID uint32, name string) *ruTracker {
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
return nil
func (m *Manager) getRUTracker(keyspaceID uint32, name string) (*ruTracker, error) {
krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, DefaultResourceGroupName)
if err != nil {
return nil, err

Check warning on line 361 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L361

Added line #L361 was not covered by tests
}
return krgm.getOrCreateRUTracker(name)
return krgm.getOrCreateRUTracker(name), nil
}

func (m *Manager) persistLoop(ctx context.Context) {
Expand Down Expand Up @@ -496,11 +512,21 @@
sinceLastRecord := m.metrics.recordConsumption(consumptionInfo, keyspaceName, m.controllerConfig, now)
resourceGroupName := consumptionInfo.resourceGroupName
// TODO: maybe we need to distinguish background ru.
if rg := m.GetMutableResourceGroup(keyspaceID, resourceGroupName); rg != nil {
if rg, err := m.GetMutableResourceGroup(keyspaceID, resourceGroupName); rg != nil {
rg.UpdateRUConsumption(consumptionInfo.Consumption)
} else {
log.Error("failed to get mutable resource group",
zap.Uint32("keyspace-id", keyspaceID),
zap.String("resource-group-name", resourceGroupName),
zap.Error(err))

Check warning on line 521 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L518-L521

Added lines #L518 - L521 were not covered by tests
}
if rt := m.getRUTracker(keyspaceID, resourceGroupName); rt != nil {
if rt, err := m.getRUTracker(keyspaceID, resourceGroupName); rt != nil {
rt.sample(now, consumptionInfo.RRU+consumptionInfo.WRU, sinceLastRecord)
} else {
log.Error("failed to get RU tracker",
zap.Uint32("keyspace-id", keyspaceID),
zap.String("resource-group-name", resourceGroupName),
zap.Error(err))

Check warning on line 529 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L526-L529

Added lines #L526 - L529 were not covered by tests
}
case <-cleanUpTicker.C:
// Clean up the metrics that have not been updated for a long time.
Expand Down
9 changes: 6 additions & 3 deletions pkg/mcs/resourcemanager/server/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func TestInitManager(t *testing.T) {
re.NoError(err)
re.Len(m.getKeyspaceResourceGroupManagers(), 2)
// Get the default resource group.
rg := m.GetResourceGroup(1, DefaultResourceGroupName, true)
rg, err := m.GetResourceGroup(1, DefaultResourceGroupName, true)
re.NoError(err)
re.NotNil(rg)
// Verify the default resource group settings are updated. This is to ensure the default resource group
// can be loaded from the storage correctly rather than created as a new one.
Expand Down Expand Up @@ -149,7 +150,8 @@ func checkBackgroundMetricsFlush(ctx context.Context, re *require.Assertions, ma
keyspaceID := ExtractKeyspaceID(req.GetKeyspaceId())
// Verify consumption was added to the resource group.
testutil.Eventually(re, func() bool {
updatedGroup := manager.GetResourceGroup(keyspaceID, req.GetResourceGroupName(), true)
updatedGroup, err := manager.GetResourceGroup(keyspaceID, req.GetResourceGroupName(), true)
re.NoError(err)
re.NotNil(updatedGroup)
return updatedGroup.RUConsumption.RRU == req.ConsumptionSinceLastRequest.RRU &&
updatedGroup.RUConsumption.WRU == req.ConsumptionSinceLastRequest.WRU
Expand Down Expand Up @@ -215,7 +217,8 @@ func checkAddAndModifyResourceGroup(re *require.Assertions, manager *Manager, ke

keyspaceID := ExtractKeyspaceID(keyspaceIDValue)
testutil.Eventually(re, func() bool {
rg := manager.GetResourceGroup(keyspaceID, group.Name, true)
rg, err := manager.GetResourceGroup(keyspaceID, group.Name, true)
re.NoError(err)
re.NotNil(rg)
return rg.Priority == group.Priority &&
rg.RUSettings.RU.Settings.BurstLimit == group.RUSettings.RU.Settings.BurstLimit
Expand Down
53 changes: 53 additions & 0 deletions tests/integrations/mcs/resourcemanager/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,59 @@ func (suite *resourceManagerAPITestSuite) TestResourceGroupAPI() {
}
}

func (suite *resourceManagerAPITestSuite) TestResourceGroupAPIInit() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)"))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
}()
testFuncs := []func(keyspaceID uint32, keyspaceName string){
func(_ uint32, keyspaceName string) {
group := suite.mustGetResourceGroup(re, "group", keyspaceName)
re.Nil(group)
group = suite.mustGetResourceGroup(re, "default", keyspaceName)
re.NotNil(group)
},
func(_ uint32, keyspaceName string) {
groups := suite.mustGetResourceGroupList(re, keyspaceName)
re.Len(groups, 1)
re.Equal(server.DefaultResourceGroupName, groups[0].Name)
},
func(keyspaceID uint32, _ string) {
groupToUpdate := &rmpb.ResourceGroup{
Name: server.DefaultResourceGroupName,
Mode: rmpb.GroupMode_RUMode,
Priority: 5,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 200,
BurstLimit: 200,
},
},
},
KeyspaceId: &rmpb.KeyspaceIDValue{
Value: keyspaceID,
},
}
suite.mustUpdateResourceGroup(re, groupToUpdate)
},
}
for i, testFunc := range testFuncs {
// Create keyspace
keyspaceName := fmt.Sprint("test_keyspace_", i)
leaderServer := suite.cluster.GetLeaderServer()
meta, err := leaderServer.GetKeyspaceManager().CreateKeyspace(
&keyspace.CreateKeyspaceRequest{
Name: keyspaceName,
},
)
re.NoError(err)
// Run test
testFunc(meta.GetId(), keyspaceName)
}
}

func (suite *resourceManagerAPITestSuite) mustAddResourceGroup(re *require.Assertions, group *rmpb.ResourceGroup) {
bodyBytes := suite.mustSendRequest(re, http.MethodPost, "/config/group", group)
re.Equal("Success!", string(bodyBytes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
re.NoError(err)
re.Contains(dresp, "Success!")
_, err = cli.GetResourceGroup(suite.ctx, g.Name)
re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = resource group not found", g.Name))
re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = [PD:resourcemanager:ErrGroupNotExists]the %v resource group does not exist", g.Name, g.Name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has the error message here changed? Can it be kept the same as before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace errors.New("resource group not found") with errs.ErrResourceGroupNotExists.FastGenByArgs(req.ResourceGroupName)

}

// to test the deletion of persistence
Expand Down Expand Up @@ -1620,7 +1620,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace()

// Get and List resource group without keyspace id
rg, err := cli.GetResourceGroup(suite.ctx, group.Name)
re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = resource group not found", group.Name))
re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = [PD:resourcemanager:ErrGroupNotExists]the %v resource group does not exist", group.Name, group.Name))
re.Nil(rg)
rgs, err := cli.ListResourceGroups(suite.ctx)
re.NoError(err)
Expand Down Expand Up @@ -1695,7 +1695,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace()
re.NoError(err)
re.Contains(resp, "Success!")
rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats)
re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = resource group not found", group.Name))
re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = [PD:resourcemanager:ErrGroupNotExists]the %v resource group does not exist", group.Name, group.Name))
re.Nil(rg)
}

Expand Down