Skip to content

scheduler: fix the recovery time of slow store #9388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type evictSlowStoreSchedulerConfig struct {
cluster *core.BasicCluster
// Last timestamp of the chosen slow store for eviction.
lastSlowStoreCaptureTS time.Time
isRecovered bool
// Duration gap for recovering the candidate, unit: s.
RecoveryDurationGap uint64 `json:"recovery-duration"`
EvictedStores []uint64 `json:"evict-stores"`
Expand Down Expand Up @@ -119,6 +120,21 @@ func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error {
return conf.save()
}

func (conf *evictSlowStoreSchedulerConfig) tryUpdateRecoverStatus(isRecovered bool) error {
conf.RLock()
if conf.isRecovered == isRecovered {
conf.RUnlock()
return nil
}
conf.RUnlock()

conf.Lock()
defer conf.Unlock()
conf.lastSlowStoreCaptureTS = time.Now()
conf.isRecovered = isRecovered
return conf.save()
}

func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) {
oldID = conf.evictStore()
conf.Lock()
Expand Down Expand Up @@ -299,14 +315,31 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
// slow node next time.
log.Info("slow store has been removed",
zap.Uint64("store-id", store.GetID()))
} else if store.GetSlowScore() <= slowStoreRecoverThreshold && s.conf.readyForRecovery() {
s.cleanupEvictLeader(cluster)
return nil, nil
}
// recover slow store if its score is below the threshold.
if store.GetSlowScore() <= slowStoreRecoverThreshold {
if err := s.conf.tryUpdateRecoverStatus(true); err != nil {
log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", store.GetID()), zap.Error(err))
return nil, nil
}

if !s.conf.readyForRecovery() {
return nil, nil
}

log.Info("slow store has been recovered",
zap.Uint64("store-id", store.GetID()))
} else {
return s.schedulerEvictLeader(cluster), nil
s.cleanupEvictLeader(cluster)
return nil, nil
}
s.cleanupEvictLeader(cluster)
return nil, nil
// If the slow store is still slow or slow again, we can continue to evict leaders from it.
if err := s.conf.tryUpdateRecoverStatus(false); err != nil {
log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", store.GetID()), zap.Error(err))
return nil, nil
}
return s.schedulerEvictLeader(cluster), nil
}

var slowStore *core.StoreInfo
Expand Down
106 changes: 106 additions & 0 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package schedulers
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -207,3 +208,108 @@ func TestEvictSlowStoreBatch(t *testing.T) {
re.Equal(5, persistValue.Batch)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap"))
}

func TestRecoveryTime(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()

// Add stores 1, 2, 3 with different leader counts
tc.AddLeaderStore(1, 10)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)

// Add regions with leader in store 1
for i := range 10 {
tc.AddLeaderRegion(uint64(i), 1, 2, 3)
}

storage := storage.NewStorageWithMemoryBackend()
es, err := CreateScheduler(types.EvictSlowStoreScheduler, oc, storage,
ConfigSliceDecoder(types.EvictSlowStoreScheduler, []string{}), nil)
re.NoError(err)
bs, err := CreateScheduler(types.BalanceLeaderScheduler, oc, storage,
ConfigSliceDecoder(types.BalanceLeaderScheduler, []string{}), nil)
re.NoError(err)

var recoveryTimeInSec uint64 = 1
recoveryTime := 1 * time.Second
es.(*evictSlowStoreScheduler).conf.RecoveryDurationGap = recoveryTimeInSec

// Mark store 1 as slow
storeInfo := tc.GetStore(1)
slowStore := storeInfo.Clone(func(store *core.StoreInfo) {
store.GetStoreStats().SlowScore = 100
})
tc.PutStore(slowStore)

// Verify store is marked for eviction
ops, _ := es.Schedule(tc, false)
re.NotEmpty(ops)
re.Equal(types.EvictSlowStoreScheduler.String(), ops[0].Desc())
re.Equal(uint64(1), es.(*evictSlowStoreScheduler).conf.evictStore())

// Store recovers from being slow
time.Sleep(recoveryTime)
recoveredStore := storeInfo.Clone(func(store *core.StoreInfo) {
store.GetStoreStats().SlowScore = 0
})
tc.PutStore(recoveredStore)

// Should not recover immediately due to recovery time window
for range 10 {
// trigger recovery check
es.Schedule(tc, false)
ops, _ = bs.Schedule(tc, false)
re.Empty(ops)
re.Equal(uint64(1), es.(*evictSlowStoreScheduler).conf.evictStore())
}

// Store is slow again before recovery time is over
time.Sleep(recoveryTime / 2)
slowStore = storeInfo.Clone(func(store *core.StoreInfo) {
store.GetStoreStats().SlowScore = 100
})
tc.PutStore(slowStore)
time.Sleep(recoveryTime / 2)
// Should not recover due to recovery time window recalculation
for range 10 {
// trigger recovery check
es.Schedule(tc, false)
ops, _ = bs.Schedule(tc, false)
re.Empty(ops)
re.Equal(uint64(1), es.(*evictSlowStoreScheduler).conf.evictStore())
}

// Store recovers from being slow
time.Sleep(recoveryTime)
recoveredStore = storeInfo.Clone(func(store *core.StoreInfo) {
store.GetStoreStats().SlowScore = 0
})
tc.PutStore(recoveredStore)

// Should not recover immediately due to recovery time window
for range 10 {
// trigger recovery check
es.Schedule(tc, false)
ops, _ = bs.Schedule(tc, false)
re.Empty(ops)
re.Equal(uint64(1), es.(*evictSlowStoreScheduler).conf.evictStore())
}

// Should now recover
time.Sleep(recoveryTime)
// trigger recovery check
es.Schedule(tc, false)

ops, _ = bs.Schedule(tc, false)
re.Empty(ops)
re.Empty(es.(*evictSlowStoreScheduler).conf.evictStore())

// Verify persistence
var persistValue evictSlowStoreSchedulerConfig
err = es.(*evictSlowStoreScheduler).conf.load(&persistValue)
re.NoError(err)
re.Zero(persistValue.evictStore())
re.True(persistValue.readyForRecovery())
}