diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 5eeded57a317f..945860c288a24 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "fmt" - "math" "strings" "sync" "sync/atomic" @@ -472,7 +471,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error return nil } -func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool { +func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool { cp := NewCheckpointWithSpan(s) if cp.TS < c.lastCheckpoint.TS { log.Warn("failed to update global checkpoint: stale", @@ -498,7 +497,7 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, return err } - if c.setCheckpoint(ctx, cp) { + if c.setCheckpoint(cp) { log.Info("uploading checkpoint for task", zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)), zap.Uint64("checkpoint", cp.Value), @@ -585,7 +584,7 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { c.checkpointsMu.Lock() - c.setCheckpoint(ctx, c.checkpoints.Min()) + c.setCheckpoint(c.checkpoints.Min()) c.checkpointsMu.Unlock() if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil { return errors.Annotate(err, "failed to upload global checkpoint") @@ -685,10 +684,14 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar // do not block main tick here go func() { failpoint.Inject("AsyncResolveLocks", func() {}) + maxTs := uint64(0) + for _, t := range targets { + maxTs = max(maxTs, t.Value) + } handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { // we will scan all locks and try to resolve them by check txn status. return tikv.ResolveLocksForRange( - ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit) + ctx, c.env, maxTs+1, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit) } workerPool := util.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks") var wg sync.WaitGroup diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 81e4d2454515e..e4d83c682f789 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -3,6 +3,7 @@ package streamhelper_test import ( + "bytes" "context" "fmt" "strings" @@ -356,15 +357,20 @@ func TestResolveLock(t *testing.T) { lockRegion := c.findRegionByKey([]byte("01")) allLocks := []*txnlock.Lock{ { - Key: []byte{1}, + Key: []byte("011"), // TxnID == minCheckpoint TxnID: minCheckpoint, }, { - Key: []byte{2}, + Key: []byte("012"), // TxnID > minCheckpoint TxnID: minCheckpoint + 1, }, + { + Key: []byte("013"), + // this lock cannot be resolved due to scan version + TxnID: oracle.GoTimeToTS(oracle.GetTimeFromTS(minCheckpoint).Add(2 * time.Minute)), + }, } c.LockRegion(lockRegion, allLocks) @@ -372,32 +378,39 @@ func TestResolveLock(t *testing.T) { resolveLockRef := atomic.NewBool(false) env.resolveLocks = func(locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { resolveLockRef.Store(true) - require.ElementsMatch(t, locks, allLocks) + // The third lock has skipped, because it's less than max version. + require.ElementsMatch(t, locks, allLocks[:2]) return loc, nil } adv := streamhelper.NewCheckpointAdvancer(env) - // make lastCheckpoint stuck at 123 - adv.UpdateLastCheckpoint(streamhelper.NewCheckpointWithSpan(spans.Valued{ - Key: kv.KeyRange{ - StartKey: kv.Key([]byte("1")), - EndKey: kv.Key([]byte("2")), - }, - Value: 123, - })) - adv.NewCheckpoints( - spans.Sorted(spans.NewFullWith([]kv.KeyRange{ - { - StartKey: kv.Key([]byte("1")), - EndKey: kv.Key([]byte("2")), - }, - }, 0)), - ) adv.StartTaskListener(ctx) - require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil }, - time.Second, 50*time.Millisecond) + + maxTargetTs := uint64(0) coll := streamhelper.NewClusterCollector(ctx, env) + coll.SetOnSuccessHook(func(u uint64, kr kv.KeyRange) { + adv.WithCheckpoints(func(s *spans.ValueSortedFull) { + for _, lock := range allLocks { + // if there is any lock key in the range + if bytes.Compare(kr.StartKey, lock.Key) <= 0 && (bytes.Compare(lock.Key, kr.EndKey) < 0 || len(kr.EndKey) == 0) { + // mock lock behavior, do not update checkpoint + s.Merge(spans.Valued{Key: kr, Value: minCheckpoint}) + return + } + } + s.Merge(spans.Valued{Key: kr, Value: u}) + maxTargetTs = max(maxTargetTs, u) + }) + }) err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll) require.NoError(t, err) + r, err := coll.Finish(ctx) + require.NoError(t, err) + require.Len(t, r.FailureSubRanges, 0) + require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) + + env.maxTs = maxTargetTs + 1 + require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil }, + time.Second, 50*time.Millisecond) // now the lock state must be ture. because tick finished and asyncResolveLocks got stuck. require.True(t, adv.GetInResolvingLock()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks")) @@ -406,10 +419,6 @@ func TestResolveLock(t *testing.T) { // state must set to false after tick require.Eventually(t, func() bool { return !adv.GetInResolvingLock() }, 8*time.Second, 50*time.Microsecond) - r, err := coll.Finish(ctx) - require.NoError(t, err) - require.Len(t, r.FailureSubRanges, 0) - require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint) } func TestOwnerDropped(t *testing.T) { diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 2e1baa89705a0..c59ed7a23acc0 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -99,6 +99,7 @@ type fakeCluster struct { idAlloced uint64 stores map[uint64]*fakeStore regions []*region + maxTs uint64 testCtx *testing.T onGetClient func(uint64) error @@ -773,14 +774,24 @@ func (t *testEnv) putTask() { } func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) { + if t.maxTs != maxVersion { + return nil, nil, errors.Errorf("unexpect max version in scan lock, expected %d, actual %d", t.maxTs, maxVersion) + } for _, r := range t.regions { if len(r.locks) != 0 { - return r.locks, &tikv.KeyLocation{ + locks := make([]*txnlock.Lock, 0, len(r.locks)) + for _, l := range r.locks { + // skip the lock larger than maxVersion + if l.TxnID < maxVersion { + locks = append(locks, l) + } + } + return locks, &tikv.KeyLocation{ Region: tikv.NewRegionVerID(r.id, 0, 0), }, nil } } - return nil, nil, nil + return nil, &tikv.KeyLocation{}, nil } func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) { @@ -791,7 +802,7 @@ func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.L return t.resolveLocks(locks, loc) } } - return nil, nil + return loc, nil } func (t *testEnv) Identifier() string { @@ -856,6 +867,16 @@ func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Stor }, nil } +func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) { + // only used for GetRegionCache once in resolve lock + return []*metapb.Store{ + { + Id: 1, + Address: "127.0.0.1" + }, + }, nil +} + func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 { return 1 }