diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index e5478feb02838..9b3900aae37e1 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4242,7 +4242,7 @@ func (e *executor) dropTableObject( tempTableType := tableInfo.Meta().TempTableType if config.CheckTableBeforeDrop && tempTableType == model.TempTableNone { - err := adminCheckTableBeforeDrop(ctx, fullti) + err := adminCheckTableBeforeDrop(e.sessPool, fullti) if err != nil { return err } @@ -4317,35 +4317,44 @@ func (e *executor) dropTableObject( // adminCheckTableBeforeDrop runs `admin check table` for the table to be dropped. // Actually this function doesn't do anything specific for `DROP TABLE`, but to avoid // using it in other places by mistake, it's named like this. -func adminCheckTableBeforeDrop(ctx sessionctx.Context, fullti ast.Ident) error { +func adminCheckTableBeforeDrop(sessPool *sess.Pool, fullti ast.Ident) error { logutil.DDLLogger().Warn("admin check table before drop", zap.String("database", fullti.Schema.O), zap.String("table", fullti.Name.O), ) - exec := ctx.GetRestrictedSQLExecutor() internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) - // `tidb_enable_fast_table_check` is already the default value, and some feature (e.g. partial index) - // doesn't support admin check with `tidb_enable_fast_table_check = OFF`, so we just set it to `ON` here. - // TODO: set the value of `tidb_enable_fast_table_check` to 'ON' for all internal sessions if it's OK. - originalFastTableCheck := ctx.GetSessionVars().FastCheckTable - _, _, err := exec.ExecRestrictedSQL(internalCtx, nil, "set tidb_enable_fast_table_check = 'ON';") + sctx, err := sessPool.Get() if err != nil { return err } + var discardErr error + defer func() { + if discardErr == nil { + sessPool.Put(sctx) + return + } + logutil.DDLLogger().Warn("discard internal session because tidb_enable_fast_table_check change failed", zap.Error(discardErr)) + sessPool.Destroy(sctx) + }() + s := sess.NewSession(sctx) + + // Some features (e.g. partial index) doesn't support admin check with `tidb_enable_fast_table_check = OFF`, + // so we temporarily enable it in the internal DDL session and restore it afterward. + originalFastTableCheck := sctx.GetSessionVars().FastCheckTable + if err := sctx.GetSessionVars().SetSystemVar(vardef.TiDBFastCheckTable, vardef.On); err != nil { + discardErr = err + return err + } if !originalFastTableCheck { defer func() { - _, _, err = exec.ExecRestrictedSQL(internalCtx, nil, "set tidb_enable_fast_table_check = 'OFF';") - if err != nil { - logutil.DDLLogger().Warn("set tidb_enable_fast_table_check = 'OFF' failed", zap.Error(err)) + if err := sctx.GetSessionVars().SetSystemVar(vardef.TiDBFastCheckTable, vardef.Off); err != nil { + discardErr = err } }() } - _, _, err = exec.ExecRestrictedSQL(internalCtx, nil, "admin check table %n.%n", fullti.Schema.O, fullti.Name.O) - if err != nil { - return err - } - return nil + _, err = s.Execute(internalCtx, "admin check table %n.%n", "admin_check_table_before_drop", fullti.Schema.O, fullti.Name.O) + return err } // DropTable will proceed even if some table in the list does not exists. diff --git a/pkg/ddl/integration_test.go b/pkg/ddl/integration_test.go index 7b6b445ba9fc0..25bf2eb56af57 100644 --- a/pkg/ddl/integration_test.go +++ b/pkg/ddl/integration_test.go @@ -19,8 +19,12 @@ import ( "fmt" "testing" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/dbterror" @@ -144,6 +148,37 @@ func TestPartialIndex(t *testing.T) { } } +func TestDropTableAdminCheckTableFastCheckTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, key(b) where a = 1);") + + dom := domain.GetDomain(tk.Session()) + require.NotNil(t, dom) + pool := dom.SysSessionPool() + + seOn, err := pool.Get() + require.NoError(t, err) + seOff, err := pool.Get() + require.NoError(t, err) + + seOffCtx := seOff.(sessionctx.Context) + require.NoError(t, seOffCtx.GetSessionVars().SetSystemVar(vardef.TiDBFastCheckTable, vardef.Off)) + + pool.Put(seOn) + pool.Put(seOff) + + oldCheckTableBeforeDrop := config.CheckTableBeforeDrop + config.CheckTableBeforeDrop = true + defer func() { + config.CheckTableBeforeDrop = oldCheckTableBeforeDrop + }() + tk.MustExec("drop table t;") +} + func TestMaintainAffectColumns(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/pkg/ddl/session/BUILD.bazel b/pkg/ddl/session/BUILD.bazel index 02783a5dde691..eba324327a6bd 100644 --- a/pkg/ddl/session/BUILD.bazel +++ b/pkg/ddl/session/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@org_uber_go_zap//:zap", ], ) @@ -34,7 +35,7 @@ go_test( timeout = "short", srcs = ["session_pool_test.go"], flaky = True, - shard_count = 2, + shard_count = 4, deps = [ ":session", "//pkg/testkit", diff --git a/pkg/ddl/session/session_pool.go b/pkg/ddl/session/session_pool.go index 63dd5714495f8..facf0b189ce1a 100644 --- a/pkg/ddl/session/session_pool.go +++ b/pkg/ddl/session/session_pool.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" + "go.uber.org/zap" ) // Pool is used to new Session. @@ -87,6 +88,40 @@ func (sg *Pool) Put(ctx sessionctx.Context) { infosync.DeleteInternalSession(ctx) } +// Destroy discards the sessionCtx and returns the pool slot. +// It is used when the session should not be reused. +func (sg *Pool) Destroy(ctx sessionctx.Context) { + intest.AssertNotNil(ctx) + intest.AssertFunc(func() bool { + txn, _ := ctx.Txn(false) + return txn == nil || !txn.Valid() + }) + ctx.RollbackTxn(context.Background()) + ctx.GetSessionVars().ClearDiskFullOpt() + infosync.DeleteInternalSession(ctx) + + // Destroy behavior depends on the underlying pool implementation. + switch p := sg.resPool.(type) { + case util.DestroyableSessionPool: + p.Destroy(ctx.(pools.Resource)) + return + case *pools.ResourcePool: + // *pools.ResourcePool requires a Put for every Get. Put(nil) returns the slot + // and causes a new resource to be created next time. + ctx.(pools.Resource).Close() + p.Put(nil) + return + default: + // Fallback: avoid putting a closed resource back. + // The underlying pool implementation may return the same resource later. + logutil.DDLLogger().Warn("session pool doesn't support Destroy, fall back to Put", + zap.String("poolType", fmt.Sprintf("%T", sg.resPool)), + ) + sg.resPool.Put(ctx.(pools.Resource)) + intest.Assert(false, "unsupported session pool type for Destroy: %T", sg.resPool) + } +} + // Close clean up the Pool. func (sg *Pool) Close() { sg.mu.Lock() diff --git a/pkg/ddl/session/session_pool_test.go b/pkg/ddl/session/session_pool_test.go index 1c8ec934789b0..7a55b0cd838fc 100644 --- a/pkg/ddl/session/session_pool_test.go +++ b/pkg/ddl/session/session_pool_test.go @@ -16,6 +16,7 @@ package session_test import ( "context" + "sync/atomic" "testing" "time" @@ -116,3 +117,67 @@ func TestPessimisticTxn(t *testing.T) { pool.Put(sessCtx) pool.Put(sessCtx2) } + +func TestSessionPoolDestroyResourcePool(t *testing.T) { + store := testkit.CreateMockStore(t) + resourcePool := pools.NewResourcePool(func() (pools.Resource, error) { + newTk := testkit.NewTestKit(t, store) + return newTk.Session(), nil + }, 1, 1, 0) + pool := session.NewSessionPool(resourcePool) + + sessCtx, err := pool.Get() + require.NoError(t, err) + + pool.Destroy(sessCtx) + + newRes, err := resourcePool.TryGet() + require.NoError(t, err) + require.NotNil(t, newRes) + + // Destroy on *pools.ResourcePool should Close the session and Put(nil), + // so the pool creates a new session next time. + require.NotEqual(t, sessCtx.(pools.Resource), newRes) + + newRes.Close() + resourcePool.Put(nil) +} + +type mockDestroyablePool struct { + factory func() (pools.Resource, error) + + putCnt int64 + destroyCnt int64 +} + +func (p *mockDestroyablePool) Get() (pools.Resource, error) { + return p.factory() +} + +func (p *mockDestroyablePool) Put(r pools.Resource) { + atomic.AddInt64(&p.putCnt, 1) + r.Close() +} + +func (p *mockDestroyablePool) Destroy(r pools.Resource) { + atomic.AddInt64(&p.destroyCnt, 1) + r.Close() +} + +func (p *mockDestroyablePool) Close() {} + +func TestSessionPoolDestroyDestroyableSessionPool(t *testing.T) { + store := testkit.CreateMockStore(t) + mp := &mockDestroyablePool{factory: func() (pools.Resource, error) { + newTk := testkit.NewTestKit(t, store) + return newTk.Session(), nil + }} + pool := session.NewSessionPool(mp) + + sessCtx, err := pool.Get() + require.NoError(t, err) + pool.Destroy(sessCtx) + + require.Equal(t, int64(0), atomic.LoadInt64(&mp.putCnt)) + require.Equal(t, int64(1), atomic.LoadInt64(&mp.destroyCnt)) +}