Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4242,7 +4242,7 @@ func (e *executor) dropTableObject(

tempTableType := tableInfo.Meta().TempTableType
if config.CheckTableBeforeDrop && tempTableType == model.TempTableNone {
err := adminCheckTableBeforeDrop(ctx, fullti)
err := adminCheckTableBeforeDrop(e.sessPool, fullti)
if err != nil {
return err
}
Expand Down Expand Up @@ -4317,35 +4317,44 @@ func (e *executor) dropTableObject(
// adminCheckTableBeforeDrop runs `admin check table` for the table to be dropped.
// Actually this function doesn't do anything specific for `DROP TABLE`, but to avoid
// using it in other places by mistake, it's named like this.
func adminCheckTableBeforeDrop(ctx sessionctx.Context, fullti ast.Ident) error {
func adminCheckTableBeforeDrop(sessPool *sess.Pool, fullti ast.Ident) error {
logutil.DDLLogger().Warn("admin check table before drop",
zap.String("database", fullti.Schema.O),
zap.String("table", fullti.Name.O),
)
exec := ctx.GetRestrictedSQLExecutor()
internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)

// `tidb_enable_fast_table_check` is already the default value, and some feature (e.g. partial index)
// doesn't support admin check with `tidb_enable_fast_table_check = OFF`, so we just set it to `ON` here.
// TODO: set the value of `tidb_enable_fast_table_check` to 'ON' for all internal sessions if it's OK.
originalFastTableCheck := ctx.GetSessionVars().FastCheckTable
_, _, err := exec.ExecRestrictedSQL(internalCtx, nil, "set tidb_enable_fast_table_check = 'ON';")
sctx, err := sessPool.Get()
if err != nil {
return err
}
var discardErr error
defer func() {
if discardErr == nil {
sessPool.Put(sctx)
return
}
logutil.DDLLogger().Warn("discard internal session because tidb_enable_fast_table_check change failed", zap.Error(discardErr))
sessPool.Destroy(sctx)
}()
s := sess.NewSession(sctx)

// Some features (e.g. partial index) doesn't support admin check with `tidb_enable_fast_table_check = OFF`,
// so we temporarily enable it in the internal DDL session and restore it afterward.
originalFastTableCheck := sctx.GetSessionVars().FastCheckTable
if err := sctx.GetSessionVars().SetSystemVar(vardef.TiDBFastCheckTable, vardef.On); err != nil {
discardErr = err
return err
}
if !originalFastTableCheck {
defer func() {
_, _, err = exec.ExecRestrictedSQL(internalCtx, nil, "set tidb_enable_fast_table_check = 'OFF';")
if err != nil {
logutil.DDLLogger().Warn("set tidb_enable_fast_table_check = 'OFF' failed", zap.Error(err))
if err := sctx.GetSessionVars().SetSystemVar(vardef.TiDBFastCheckTable, vardef.Off); err != nil {
discardErr = err
}
}()
}
_, _, err = exec.ExecRestrictedSQL(internalCtx, nil, "admin check table %n.%n", fullti.Schema.O, fullti.Name.O)
if err != nil {
return err
}
return nil
_, err = s.Execute(internalCtx, "admin check table %n.%n", "admin_check_table_before_drop", fullti.Schema.O, fullti.Name.O)
return err
}

// DropTable will proceed even if some table in the list does not exists.
Expand Down
35 changes: 35 additions & 0 deletions pkg/ddl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import (
"fmt"
"testing"

"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/dbterror"
Expand Down Expand Up @@ -144,6 +148,37 @@ func TestPartialIndex(t *testing.T) {
}
}

func TestDropTableAdminCheckTableFastCheckTable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, key(b) where a = 1);")

dom := domain.GetDomain(tk.Session())
require.NotNil(t, dom)
pool := dom.SysSessionPool()

seOn, err := pool.Get()
require.NoError(t, err)
seOff, err := pool.Get()
require.NoError(t, err)

seOffCtx := seOff.(sessionctx.Context)
require.NoError(t, seOffCtx.GetSessionVars().SetSystemVar(vardef.TiDBFastCheckTable, vardef.Off))

pool.Put(seOn)
pool.Put(seOff)

oldCheckTableBeforeDrop := config.CheckTableBeforeDrop
config.CheckTableBeforeDrop = true
defer func() {
config.CheckTableBeforeDrop = oldCheckTableBeforeDrop
}()
tk.MustExec("drop table t;")
}

func TestMaintainAffectColumns(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@org_uber_go_zap//:zap",
],
)

Expand All @@ -34,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["session_pool_test.go"],
flaky = True,
shard_count = 2,
shard_count = 4,
deps = [
":session",
"//pkg/testkit",
Expand Down
35 changes: 35 additions & 0 deletions pkg/ddl/session/session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)

// Pool is used to new Session.
Expand Down Expand Up @@ -87,6 +88,40 @@ func (sg *Pool) Put(ctx sessionctx.Context) {
infosync.DeleteInternalSession(ctx)
}

// Destroy discards the sessionCtx and returns the pool slot.
// It is used when the session should not be reused.
func (sg *Pool) Destroy(ctx sessionctx.Context) {
intest.AssertNotNil(ctx)
intest.AssertFunc(func() bool {
txn, _ := ctx.Txn(false)
return txn == nil || !txn.Valid()
})
ctx.RollbackTxn(context.Background())
ctx.GetSessionVars().ClearDiskFullOpt()
infosync.DeleteInternalSession(ctx)

// Destroy behavior depends on the underlying pool implementation.
switch p := sg.resPool.(type) {
case util.DestroyableSessionPool:
p.Destroy(ctx.(pools.Resource))
return
case *pools.ResourcePool:
// *pools.ResourcePool requires a Put for every Get. Put(nil) returns the slot
// and causes a new resource to be created next time.
ctx.(pools.Resource).Close()
p.Put(nil)
return
default:
// Fallback: avoid putting a closed resource back.
// The underlying pool implementation may return the same resource later.
logutil.DDLLogger().Warn("session pool doesn't support Destroy, fall back to Put",
zap.String("poolType", fmt.Sprintf("%T", sg.resPool)),
)
sg.resPool.Put(ctx.(pools.Resource))
intest.Assert(false, "unsupported session pool type for Destroy: %T", sg.resPool)
}
}

// Close clean up the Pool.
func (sg *Pool) Close() {
sg.mu.Lock()
Expand Down
65 changes: 65 additions & 0 deletions pkg/ddl/session/session_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package session_test

import (
"context"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -116,3 +117,67 @@ func TestPessimisticTxn(t *testing.T) {
pool.Put(sessCtx)
pool.Put(sessCtx2)
}

func TestSessionPoolDestroyResourcePool(t *testing.T) {
store := testkit.CreateMockStore(t)
resourcePool := pools.NewResourcePool(func() (pools.Resource, error) {
newTk := testkit.NewTestKit(t, store)
return newTk.Session(), nil
}, 1, 1, 0)
pool := session.NewSessionPool(resourcePool)

sessCtx, err := pool.Get()
require.NoError(t, err)

pool.Destroy(sessCtx)

newRes, err := resourcePool.TryGet()
require.NoError(t, err)
require.NotNil(t, newRes)

// Destroy on *pools.ResourcePool should Close the session and Put(nil),
// so the pool creates a new session next time.
require.NotEqual(t, sessCtx.(pools.Resource), newRes)

newRes.Close()
resourcePool.Put(nil)
}

type mockDestroyablePool struct {
factory func() (pools.Resource, error)

putCnt int64
destroyCnt int64
}

func (p *mockDestroyablePool) Get() (pools.Resource, error) {
return p.factory()
}

func (p *mockDestroyablePool) Put(r pools.Resource) {
atomic.AddInt64(&p.putCnt, 1)
r.Close()
}

func (p *mockDestroyablePool) Destroy(r pools.Resource) {
atomic.AddInt64(&p.destroyCnt, 1)
r.Close()
}

func (p *mockDestroyablePool) Close() {}

func TestSessionPoolDestroyDestroyableSessionPool(t *testing.T) {
store := testkit.CreateMockStore(t)
mp := &mockDestroyablePool{factory: func() (pools.Resource, error) {
newTk := testkit.NewTestKit(t, store)
return newTk.Session(), nil
}}
pool := session.NewSessionPool(mp)

sessCtx, err := pool.Get()
require.NoError(t, err)
pool.Destroy(sessCtx)

require.Equal(t, int64(0), atomic.LoadInt64(&mp.putCnt))
require.Equal(t, int64(1), atomic.LoadInt64(&mp.destroyCnt))
}