Skip to content

Commit b2dfa3c

Browse files
committed
ddl: rely on error-path cleanup
1 parent 33abe11 commit b2dfa3c

File tree

2 files changed

+4
-44
lines changed

2 files changed

+4
-44
lines changed

pkg/ddl/backfilling_read_index.go

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,6 @@ type readIndexEngineRegistrar interface {
9494
Register(indexIDs []int64, uniques []bool, tbl table.Table) ([]ingest.Engine, error)
9595
}
9696

97-
const pebbleLockHeldByCurrentProcessMsg = "lock held by current process"
98-
99-
func isPebbleLockHeldByCurrentProcess(err error) bool {
100-
// Pebble's vfs.Lock returns a plain error containing `pebbleLockHeldByCurrentProcessMsg`
101-
// when the lock is already held by the current process (see pebble/vfs/file_lock_unix.go).
102-
// Pebble does not export a sentinel error/type for this case, so we must match on the
103-
// message.
104-
return err != nil && strings.Contains(err.Error(), pebbleLockHeldByCurrentProcessMsg)
105-
}
106-
10797
func newReadIndexExecutor(
10898
store kv.Storage,
10999
sessPool *sess.Pool,
@@ -415,7 +405,7 @@ func (r *readIndexStepExecutor) buildLocalStorePipeline(
415405
}
416406
idxNames.WriteString(index.Name.O)
417407
}
418-
engines, err := registerReadIndexEngines(wctx, r.job.ID, backendCtx, indexIDs, uniques, r.ptbl)
408+
engines, err := registerReadIndexEngines(backendCtx, indexIDs, uniques, r.ptbl)
419409
if err != nil {
420410
tidblogutil.Logger(wctx).Error("cannot register new engine",
421411
zap.Error(err),
@@ -513,24 +503,8 @@ func cleanupReadIndexLocalEngines(jobID, subtaskID int64, backendCtx readIndexLo
513503
}
514504
}
515505

516-
func registerReadIndexEngines(
517-
wctx *workerpool.Context,
518-
jobID int64,
519-
backendCtx readIndexEngineRegistrar,
520-
indexIDs []int64,
521-
uniques []bool,
522-
tbl table.Table,
523-
) ([]ingest.Engine, error) {
524-
engines, err := backendCtx.Register(indexIDs, uniques, tbl)
525-
if err != nil && isPebbleLockHeldByCurrentProcess(err) {
526-
// Cleanup happens in the caller's error-path defer; dist-task retry will re-run.
527-
tidblogutil.Logger(wctx).Warn("register ingest engine got lock held",
528-
zap.Error(err),
529-
zap.Int64("job ID", jobID),
530-
zap.Int64s("index IDs", indexIDs),
531-
)
532-
}
533-
return engines, err
506+
func registerReadIndexEngines(backendCtx readIndexEngineRegistrar, indexIDs []int64, uniques []bool, tbl table.Table) ([]ingest.Engine, error) {
507+
return backendCtx.Register(indexIDs, uniques, tbl)
534508
}
535509

536510
type distTaskRowCntCollector struct {

pkg/ddl/backfilling_test.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"bytes"
1919
"context"
2020
"errors"
21-
"fmt"
2221
"math"
2322
"sync/atomic"
2423
"testing"
@@ -110,21 +109,8 @@ func (c *testReadIndexBackendCtx) Register(indexIDs []int64, uniques []bool, tbl
110109

111110
func (c *testReadIndexBackendCtx) GetLocalBackend() *local.Backend { return c.localBackend }
112111

113-
func TestIsPebbleLockHeldByCurrentProcess(t *testing.T) {
114-
// Matches Pebble's lock error message (no exported sentinel).
115-
require.False(t, isPebbleLockHeldByCurrentProcess(nil))
116-
require.False(t, isPebbleLockHeldByCurrentProcess(errors.New("other error")))
117-
118-
baseErr := errors.New("lock held by current process")
119-
require.True(t, isPebbleLockHeldByCurrentProcess(baseErr))
120-
require.True(t, isPebbleLockHeldByCurrentProcess(fmt.Errorf("wrapped: %w", baseErr)))
121-
}
122-
123112
func TestRegisterReadIndexEnginesNoRetryOnLockHeld(t *testing.T) {
124113
// No in-function retry; cleanup is centralized in caller error-path.
125-
wctx := workerpool.NewContext(context.Background())
126-
defer wctx.Cancel()
127-
128114
var registerCalls int32
129115
backendCtx := &testReadIndexBackendCtx{
130116
localBackend: &local.Backend{},
@@ -134,7 +120,7 @@ func TestRegisterReadIndexEnginesNoRetryOnLockHeld(t *testing.T) {
134120
},
135121
}
136122

137-
engines, err := registerReadIndexEngines(wctx, 1, backendCtx, []int64{1}, []bool{false}, nil)
123+
engines, err := registerReadIndexEngines(backendCtx, []int64{1}, []bool{false}, nil)
138124
require.ErrorContains(t, err, "lock held by current process")
139125
require.Nil(t, engines)
140126
require.Equal(t, int32(1), registerCalls)

0 commit comments

Comments
 (0)