Skip to content

Commit 49759c2

Browse files
committed
add a failpoint for test
Signed-off-by: ekexium <[email protected]>
1 parent a7ff0da commit 49759c2

File tree

3 files changed

+11
-10
lines changed

3 files changed

+11
-10
lines changed

txnkv/transaction/2pc.go

+1
Original file line numberDiff line numberDiff line change
@@ -1806,6 +1806,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
18061806
return errors.Errorf("unexpected empty pipelinedStart(%s) or pipelinedEnd(%s)",
18071807
c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd)
18081808
}
1809+
util.EvalFailpoint("beforePipelinedCommit")
18091810
return c.commitFlushedMutations(bo)
18101811
}
18111812

txnkv/transaction/pipelined_flush.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
401401
// Use time-based callback control
402402
lastCallback := atomic.Value{}
403403
lastCallback.Store(time.Now().Add(-minProgressUpdateInterval))
404-
404+
405405
return func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
406406
start := r.StartKey
407407
res := rangetask.TaskStat{}
@@ -450,7 +450,7 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
450450
}
451451
resolved.Add(1)
452452
res.CompletedRegions++
453-
453+
454454
// Update progress periodically
455455
if c.txn.pipelinedProgressCallback != nil {
456456
now := time.Now()
@@ -459,8 +459,8 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
459459
if timeSinceLastCallback < 0 {
460460
timeSinceLastCallback = minProgressUpdateInterval
461461
}
462-
if timeSinceLastCallback >= minProgressUpdateInterval &&
463-
lastCallback.CompareAndSwap(last, now) {
462+
if timeSinceLastCallback >= minProgressUpdateInterval &&
463+
lastCallback.CompareAndSwap(last, now) {
464464
resolvedCount := int64(resolved.Load())
465465
c.txn.pipelinedProgressCallback(
466466
c.startTS,
@@ -470,7 +470,7 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
470470
)
471471
}
472472
}
473-
473+
474474
done := loc.EndKey == nil || bytes.Compare(loc.EndKey, r.EndKey) >= 0
475475
if done {
476476
return res, nil
@@ -544,7 +544,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
544544
true,
545545
)
546546
}
547-
547+
548548
// wait a while before notifying txn_status_cache to evict the txn,
549549
// which tolerates slow followers and avoids the situation that the
550550
// txn is evicted before the follower catches up.

txnkv/transaction/txn.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ type KVTxn struct {
208208
pipelinedResolveLockConcurrency int
209209
writeThrottleRatio float64
210210
// flushBatchDurationEWMA is read before each flush, and written after each flush => no race
211-
flushBatchDurationEWMA ewma.MovingAverage
211+
flushBatchDurationEWMA ewma.MovingAverage
212212
pipelinedProgressCallback PipelinedProgressCallbackType
213213
// progressRecordInitiated is only updated in the flush process, which is single-threaded. No need to use atomic.
214214
progressRecordInitiated bool
@@ -1946,7 +1946,7 @@ type PipelinedDMLStatus int
19461946

19471947
const (
19481948
// PipelinedDMLExecuting means the transaction is executing
1949-
PipelinedDMLExecuting PipelinedDMLStatus = iota
1949+
PipelinedDMLExecuting PipelinedDMLStatus = iota
19501950
// PipelinedDMLRollingBack means the transaction is rolling back
19511951
PipelinedDMLRollingBack
19521952
// PipelinedDMLResolvingLocks means the transaction is resolving locks
@@ -1962,5 +1962,5 @@ type PipelinedProgressCallbackType func(startTS uint64, status PipelinedDMLStatu
19621962

19631963
// SetPipelinedProgressCallback sets the callback function for pipelined DML resolve lock progress
19641964
func (txn *KVTxn) SetPipelinedProgressCallback(callback PipelinedProgressCallbackType) {
1965-
txn.pipelinedProgressCallback = callback
1966-
}
1965+
txn.pipelinedProgressCallback = callback
1966+
}

0 commit comments

Comments
 (0)