diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index fb6d2af1dd2..822bd68e115 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -38,6 +38,7 @@ import ( "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" + "github.com/tikv/pd/pkg/schedule/keyrange" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" @@ -62,6 +63,7 @@ type Cluster struct { *core.BasicCluster persistConfig *config.PersistConfig ruleManager *placement.RuleManager + keyRangeManager *keyrange.Manager labelerManager *labeler.RegionLabeler regionStats *statistics.RegionStatistics labelStats *statistics.LabelStatistics @@ -114,6 +116,7 @@ func NewCluster( cancel: cancel, BasicCluster: basicCluster, ruleManager: ruleManager, + keyRangeManager: keyrange.NewManager(), labelerManager: labelerManager, persistConfig: persistConfig, hotStat: statistics.NewHotStat(ctx, basicCluster), @@ -176,6 +179,11 @@ func (c *Cluster) GetRuleManager() *placement.RuleManager { return c.ruleManager } +// GetKeyRangeManager returns the key range manager +func (c *Cluster) GetKeyRangeManager() *keyrange.Manager { + return c.keyRangeManager +} + // GetRegionLabeler returns the region labeler. func (c *Cluster) GetRegionLabeler() *labeler.RegionLabeler { return c.labelerManager diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 7edbaf4895e..a3f7fe26a54 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mock/mockid" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/keyrange" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" @@ -55,6 +56,7 @@ type Cluster struct { *core.BasicCluster *mockid.IDAllocator *placement.RuleManager + *keyrange.Manager *labeler.RegionLabeler *statistics.HotStat *config.PersistOptions @@ -73,6 +75,7 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { HotStat: statistics.NewHotStat(ctx, bc), HotBucketCache: buckets.NewBucketsCache(ctx), PersistOptions: opts, + Manager: keyrange.NewManager(), pendingProcessedRegions: map[uint64]struct{}{}, Storage: storage.NewStorageWithMemoryBackend(), } @@ -214,6 +217,11 @@ func (mc *Cluster) GetRuleManager() *placement.RuleManager { return mc.RuleManager } +// GetKeyRangeManager returns the key range manager of the cluster. +func (mc *Cluster) GetKeyRangeManager() *keyrange.Manager { + return mc.Manager +} + // GetRegionLabeler returns the region labeler of the cluster. func (mc *Cluster) GetRegionLabeler() *labeler.RegionLabeler { return mc.RegionLabeler diff --git a/pkg/schedule/core/cluster_informer.go b/pkg/schedule/core/cluster_informer.go index 305b3723942..38375843b2c 100644 --- a/pkg/schedule/core/cluster_informer.go +++ b/pkg/schedule/core/cluster_informer.go @@ -17,6 +17,7 @@ package core import ( "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/keyrange" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" @@ -61,6 +62,7 @@ type SharedCluster interface { GetBasicCluster() *core.BasicCluster GetSharedConfig() sc.SharedConfigProvider GetRuleManager() *placement.RuleManager + GetKeyRangeManager() *keyrange.Manager AllocID(uint32) (uint64, uint32, error) IsSchedulingHalted() bool } diff --git a/pkg/schedule/keyrange/manager.go b/pkg/schedule/keyrange/manager.go new file mode 100644 index 00000000000..10ef6cd0b55 --- /dev/null +++ b/pkg/schedule/keyrange/manager.go @@ -0,0 +1,60 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyrange + +import ( + "sync" + + "github.com/tikv/pd/pkg/utils/keyutil" +) + +// Manager is a manager for key ranges. +type Manager struct { + sync.Mutex + sortedKeyRanges *keyutil.KeyRanges +} + +// NewManager creates a new Manager. +func NewManager() *Manager { + return &Manager{ + sortedKeyRanges: &keyutil.KeyRanges{}, + } +} + +// GetNonOverlappingKeyRanges returns the non-overlapping key ranges of the given base key range. +func (s *Manager) GetNonOverlappingKeyRanges(base *keyutil.KeyRange) []keyutil.KeyRange { + s.Lock() + defer s.Unlock() + return s.sortedKeyRanges.SubtractKeyRanges(base) +} + +// Append appends the key ranges to the manager. +func (s *Manager) Append(rs []keyutil.KeyRange) { + s.Lock() + defer s.Unlock() + for _, r := range rs { + s.sortedKeyRanges.Append(r.StartKey, r.EndKey) + } + s.sortedKeyRanges.SortAndDeduce() +} + +// Delete deletes the overlapping key ranges from the manager. +func (s *Manager) Delete(rs []keyutil.KeyRange) { + s.Lock() + defer s.Unlock() + for _, r := range rs { + s.sortedKeyRanges.Delete(&r) + } +} diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 01074ac7ee9..a9c6281e000 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -429,7 +429,12 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. func (s *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.sourceStoreID(), s.conf.getRanges()), + rs := s.conf.getRanges() + if IsDefaultKeyRange(rs) { + km := solver.GetKeyRangeManager() + rs = km.GetNonOverlappingKeyRanges(&rs[0]) + } + solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.sourceStoreID(), rs), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { log.Debug("store has no leader", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.sourceStoreID())) @@ -473,7 +478,12 @@ func (s *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. func (s *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { - solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.targetStoreID(), s.conf.getRanges()), + rs := s.conf.getRanges() + if IsDefaultKeyRange(rs) { + km := solver.GetKeyRangeManager() + rs = km.GetNonOverlappingKeyRanges(&rs[0]) + } + solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.targetStoreID(), rs), nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { log.Debug("store has no follower", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", solver.targetStoreID())) diff --git a/pkg/schedule/schedulers/balance_leader_test.go b/pkg/schedule/schedulers/balance_leader_test.go index d5cdd26299e..fa279a22420 100644 --- a/pkg/schedule/schedulers/balance_leader_test.go +++ b/pkg/schedule/schedulers/balance_leader_test.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/types" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/keyutil" "github.com/tikv/pd/pkg/utils/operatorutil" "github.com/tikv/pd/pkg/versioninfo" ) @@ -449,6 +450,13 @@ func (suite *balanceLeaderRangeSchedulerTestSuite) TestSingleRangeBalance() { re.NoError(err) ops, _ = lb.Schedule(suite.tc, false) re.Empty(ops) + + kye := keyutil.NewKeyRange("a", "g") + suite.tc.Append([]keyutil.KeyRange{kye}) + lb, err = CreateScheduler(types.BalanceLeaderScheduler, suite.oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceLeaderScheduler, []string{"", ""})) + re.NoError(err) + ops, _ = lb.Schedule(suite.tc, false) + re.Empty(ops) } func (suite *balanceLeaderRangeSchedulerTestSuite) TestMultiRangeBalance() { diff --git a/pkg/schedule/schedulers/balance_range.go b/pkg/schedule/schedulers/balance_range.go index 2d58efe8047..e7464b590be 100644 --- a/pkg/schedule/schedulers/balance_range.go +++ b/pkg/schedule/schedulers/balance_range.go @@ -48,7 +48,7 @@ import ( ) var ( - defaultJobTimeout = time.Hour + defaultJobTimeout = 30 * time.Minute reserveDuration = 7 * 24 * time.Hour ) @@ -116,7 +116,7 @@ func (handler *balanceRangeSchedulerHandler) addJob(w http.ResponseWriter, r *ht handler.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("end key:%s can't be unescaped", input["end-key"].(string))) return } - log.Info("add balance key range job", zap.String("start-key", startKeyStr), zap.String("end-key", endKeyStr)) + log.Info("add balance key range job", zap.String("alias", job.Alias)) rs, err := decodeKeyRanges(endKeyStr, startKeyStr) if err != nil { handler.rd.JSON(w, http.StatusBadRequest, err.Error()) @@ -173,6 +173,14 @@ type balanceRangeSchedulerConfig struct { func (conf *balanceRangeSchedulerConfig) addJob(job *balanceRangeSchedulerJob) error { conf.Lock() defer conf.Unlock() + for _, c := range conf.jobs { + if c.isComplete() { + continue + } + if job.Alias == c.Alias { + return errors.New("job already exists") + } + } job.Status = pending if len(conf.jobs) == 0 { job.JobID = 1 @@ -394,12 +402,16 @@ func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) if err := s.conf.begin(index); err != nil { return false } + km := cluster.GetKeyRangeManager() + km.Append(job.Ranges) } // todo: add other conditions such as the diff of the score between the source and target store. if time.Since(*job.Start) > job.Timeout { if err := s.conf.finish(index); err != nil { return false } + km := cluster.GetKeyRangeManager() + km.Delete(job.Ranges) balanceRangeExpiredCounter.Inc() } } diff --git a/pkg/schedule/schedulers/balance_range_test.go b/pkg/schedule/schedulers/balance_range_test.go index 68044feb02f..14470d37a45 100644 --- a/pkg/schedule/schedulers/balance_range_test.go +++ b/pkg/schedule/schedulers/balance_range_test.go @@ -61,6 +61,14 @@ func TestTIKVEngine(t *testing.T) { scheduler, err := CreateScheduler(types.BalanceRangeScheduler, oc, storage.NewStorageWithMemoryBackend(), ConfigSliceDecoder(types.BalanceRangeScheduler, []string{"leader-scatter", "tikv", "1h", "test", "100", "200"})) + re.True(scheduler.IsScheduleAllowed(tc)) + km := tc.GetKeyRangeManager() + kr := keyutil.NewKeyRange("", "") + ranges := km.GetNonOverlappingKeyRanges(&kr) + re.Len(ranges, 2) + re.Equal(ranges[0], keyutil.NewKeyRange("", "100")) + re.Equal(ranges[1], keyutil.NewKeyRange("200", "")) + re.NoError(err) ops, _ := scheduler.Schedule(tc, true) re.Empty(ops) @@ -172,6 +180,7 @@ func TestFetchAllRegions(t *testing.T) { func TestCodecConfig(t *testing.T) { re := require.New(t) job := &balanceRangeSchedulerJob{ + Alias: "test.t", Engine: core.EngineTiKV, Rule: core.LeaderScatter, JobID: 1, @@ -189,6 +198,7 @@ func TestCodecConfig(t *testing.T) { re.Equal(conf1.jobs, conf.jobs) job1 := &balanceRangeSchedulerJob{ + Alias: "test.t2", Engine: core.EngineTiKV, Rule: core.LeaderScatter, Status: running, @@ -196,6 +206,7 @@ func TestCodecConfig(t *testing.T) { JobID: 2, } re.NoError(conf.addJob(job1)) + re.Error(conf.addJob(job1)) re.NoError(conf.load(&conf1)) re.Equal(conf1.jobs, conf.jobs) @@ -271,7 +282,9 @@ func TestPersistFail(t *testing.T) { } conf.init("test", storage.NewStorageWithMemoryBackend(), conf) errMsg := "fail to persist" - newJob := &balanceRangeSchedulerJob{} + newJob := &balanceRangeSchedulerJob{ + Alias: "test.t", + } re.ErrorContains(conf.addJob(newJob), errMsg) re.Len(conf.jobs, 1) diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index 0752e3eb11c..1496ed24fd6 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -137,6 +137,11 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun solver.Step++ var sourceIndex int + rs := s.conf.Ranges + if IsDefaultKeyRange(rs) { + km := solver.GetKeyRangeManager() + rs = km.GetNonOverlappingKeyRanges(&rs[0]) + } // sourcesStore is sorted by region score desc, so we pick the first store as source store. for sourceIndex, solver.Source = range sourceStores { retryLimit := s.getLimit(solver.Source) @@ -147,21 +152,21 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun for range retryLimit { // Priority pick the region that has a pending peer. // Pending region may mean the disk is overload, remove the pending region firstly. - solver.Region = filter.SelectOneRegion(cluster.RandPendingRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandPendingRegions(solver.sourceStoreID(), rs), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()))...) if solver.Region == nil { // Then pick the region that has a follower in the source store. - solver.Region = filter.SelectOneRegion(cluster.RandFollowerRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandFollowerRegions(solver.sourceStoreID(), rs), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()), pendingFilter)...) } if solver.Region == nil { // Then pick the region has the leader in the source store. - solver.Region = filter.SelectOneRegion(cluster.RandLeaderRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandLeaderRegions(solver.sourceStoreID(), rs), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()), pendingFilter)...) } if solver.Region == nil { // Finally, pick learner. - solver.Region = filter.SelectOneRegion(cluster.RandLearnerRegions(solver.sourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandLearnerRegions(solver.sourceStoreID(), rs), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.sourceStoreID()), pendingFilter)...) } if solver.Region == nil { diff --git a/pkg/schedule/schedulers/balance_region_test.go b/pkg/schedule/schedulers/balance_region_test.go index 5db754cebf2..b0c50c1de28 100644 --- a/pkg/schedule/schedulers/balance_region_test.go +++ b/pkg/schedule/schedulers/balance_region_test.go @@ -29,6 +29,7 @@ import ( "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/schedule/types" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/keyutil" "github.com/tikv/pd/pkg/utils/operatorutil" "github.com/tikv/pd/pkg/versioninfo" ) @@ -654,9 +655,16 @@ func TestBalanceRegionEmptyRegion(t *testing.T) { core.SetApproximateKeys(1), ) tc.PutRegion(region) + + tc.Append([]keyutil.KeyRange{keyutil.NewKeyRange("a", "b")}) operators, _ := sb.Schedule(tc, false) + re.Empty(operators) + tc.Delete([]keyutil.KeyRange{keyutil.NewKeyRange("a", "b")}) + + operators, _ = sb.Schedule(tc, false) re.NotEmpty(operators) + // reject the empty regions if the cluster has more regions. for i := uint64(10); i < 111; i++ { tc.PutRegionStores(i, 1, 3, 4) } diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 43edcef5c17..f455bb50b9b 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -411,3 +411,11 @@ func pauseAndResumeLeaderTransfer[T any](cluster *core.BasicCluster, direction c } } } + +// IsDefaultKeyRange checks if the given key ranges are the default key range. +func IsDefaultKeyRange(rs []keyutil.KeyRange) bool { + if len(rs) == 1 && len(rs[0].StartKey) == 0 && len(rs[0].EndKey) == 0 { + return true + } + return false +} diff --git a/pkg/schedule/schedulers/utils_test.go b/pkg/schedule/schedulers/utils_test.go index d85fba47ff4..3d71269f035 100644 --- a/pkg/schedule/schedulers/utils_test.go +++ b/pkg/schedule/schedulers/utils_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/utils/keyutil" ) func TestRetryQuota(t *testing.T) { @@ -53,3 +54,19 @@ func TestRetryQuota(t *testing.T) { q.resetLimit(store1) re.Equal(10, q.getLimit(store1)) } + +func TestIsDefaultKeyRange(t *testing.T) { + re := require.New(t) + rs := []keyutil.KeyRange{{ + StartKey: []byte(""), + EndKey: []byte(""), + }} + re.True(IsDefaultKeyRange(rs)) + rs = []keyutil.KeyRange{ + { + StartKey: []byte("a"), + EndKey: []byte(""), + }, + } + re.False(IsDefaultKeyRange(rs)) +} diff --git a/pkg/utils/keyutil/keyrange.go b/pkg/utils/keyutil/keyrange.go index 8c25737ec1a..a16473a9919 100644 --- a/pkg/utils/keyutil/keyrange.go +++ b/pkg/utils/keyutil/keyrange.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/hex" "encoding/json" + "sort" ) // KeyRange is a key range. @@ -29,9 +30,21 @@ type KeyRange struct { // OverLapped return true if the two KeyRanges are overlapped. // if the two KeyRanges are continuous, it will also return true. func (kr *KeyRange) OverLapped(other *KeyRange) bool { - leftMax := MaxKey(kr.StartKey, other.StartKey) - rightMin := MinKey(kr.EndKey, other.EndKey) - return bytes.Compare(leftMax, rightMin) <= 0 + leftMax := MaxKeyWithBoundary(kr.StartKey, other.StartKey, left) + rightMin := MinKeyWithBoundary(kr.EndKey, other.EndKey, right) + if len(leftMax) == 0 { + return true + } + if bytes.Equal(leftMax, rightMin) { + return true + } + return less(leftMax, rightMin, right) +} + +// IsAdjacent returns true if the two KeyRanges are adjacent. +func (kr *KeyRange) IsAdjacent(other *KeyRange) bool { + // Check if the end of one range is equal to the start of the other range + return bytes.Equal(kr.EndKey, other.StartKey) || bytes.Equal(other.EndKey, kr.StartKey) } var _ json.Marshaler = &KeyRange{} @@ -105,6 +118,82 @@ func (rs *KeyRanges) Append(startKey, endKey []byte) { }) } +// SortAndDeduce sorts the KeyRanges and deduces the overlapped KeyRanges. +func (rs *KeyRanges) SortAndDeduce() { + if len(rs.krs) <= 1 { + return + } + sort.Slice(rs.krs, func(i, j int) bool { + return less(rs.krs[i].StartKey, rs.krs[j].StartKey, left) + }) + res := make([]*KeyRange, 0) + res = append(res, rs.krs[0]) + for i := 1; i < len(rs.krs); i++ { + last := res[len(res)-1] + if last.IsAdjacent(rs.krs[i]) { + last.StartKey = MinKeyWithBoundary(last.StartKey, rs.krs[i].StartKey, left) + last.EndKey = MaxKeyWithBoundary(last.EndKey, rs.krs[i].EndKey, right) + } else { + res = append(res, rs.krs[i]) + } + } + rs.krs = res +} + +// Delete deletes the KeyRange from the KeyRanges. +func (rs *KeyRanges) Delete(base *KeyRange) { + res := make([]*KeyRange, 0) + for _, r := range rs.krs { + if !r.OverLapped(base) { + res = append(res, r) + continue + } + if less(r.StartKey, base.StartKey, left) { + res = append(res, &KeyRange{StartKey: r.StartKey, EndKey: MinKeyWithBoundary(r.EndKey, base.StartKey, right)}) + } + + if less(base.EndKey, r.EndKey, right) { + startKey := MaxKeyWithBoundary(r.StartKey, base.EndKey, right) + if len(r.StartKey) == 0 { + startKey = base.EndKey + } + res = append(res, &KeyRange{StartKey: startKey, EndKey: r.EndKey}) + } + } + rs.krs = res +} + +// SubtractKeyRanges returns the KeyRanges that are not overlapped with the given KeyRange. +func (rs *KeyRanges) SubtractKeyRanges(base *KeyRange) []KeyRange { + res := make([]KeyRange, 0) + start := base.StartKey + for _, kr := range rs.krs { + // if the last range is not overlapped with the current range, we can skip it. + if !base.OverLapped(kr) { + continue + } + // add new key range if start