From bb692bce0388a781092904da74f3b452f67a1222 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 6 Nov 2024 00:04:11 +0800 Subject: [PATCH] This is an automated cherry-pick of #57044 Signed-off-by: ti-chi-bot --- pkg/ddl/index_cop.go | 12 ++++--- pkg/ddl/ingest/BUILD.bazel | 8 +++++ pkg/ddl/ingest/integration_test.go | 34 +++++++++++++++++-- pkg/ddl/internal/session/session_pool.go | 6 ++++ pkg/domain/domain.go | 26 ++++++++++++++ pkg/executor/infoschema_reader.go | 2 +- .../test/clustertablestest/tables_test.go | 16 +++++---- pkg/server/server.go | 5 +-- pkg/session/session.go | 20 ++++++++--- pkg/session/txninfo/txn_info.go | 29 +++++++++++++--- pkg/testkit/BUILD.bazel | 4 +++ pkg/testkit/mocksessionmanager.go | 19 ++++++++++- tests/realtikvtest/txntest/txn_state_test.go | 6 ++-- 13 files changed, 158 insertions(+), 29 deletions(-) diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index a158e1165bdd1..fec80746232cd 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -179,11 +179,13 @@ func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error { return errors.Trace(err) } defer se.Rollback() - var startTS uint64 - sessVars := se.GetSessionVars() - sessVars.TxnCtxMu.Lock() - startTS = sessVars.TxnCtx.StartTS - sessVars.TxnCtxMu.Unlock() + + txn, err := se.Txn() + if err != nil { + return err + } + startTS := txn.StartTS() + failpoint.InjectCall("wrapInBeginRollbackStartTS", startTS) return f(startTS) } diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index c8a9f76646349..1d0668a587100 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -67,7 +67,11 @@ go_test( embed = [":ingest"], flaky = True, race = "on", +<<<<<<< HEAD shard_count = 17, +======= + shard_count = 23, +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) deps = [ "//pkg/config", "//pkg/ddl", @@ -79,7 +83,11 @@ go_test( "//pkg/errno", "//pkg/parser/model", "//pkg/testkit", +<<<<<<< HEAD "//tests/realtikvtest", +======= + "//pkg/testkit/testfailpoint", +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) "@com_github_ngaut_pools//:pools", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index 27d5f831277b8..d1c511cb68ccc 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -29,7 +29,11 @@ import ( "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" +<<<<<<< HEAD "github.com/pingcap/tidb/tests/realtikvtest" +======= + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -121,7 +125,7 @@ func TestIngestError(t *testing.T) { } func TestAddIndexIngestPanic(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) + store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") defer ingesttestutil.InjectMockBackendMgr(t, store)() @@ -142,8 +146,34 @@ func TestAddIndexIngestPanic(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockLocalWriterPanic")) } +func TestAddIndexSetInternalSessions(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + defer ingesttestutil.InjectMockBackendMgr(t, store)() + + tk.MustExec("set global tidb_enable_dist_task = 0;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1);") + expectInternalTS := []uint64{} + actualInternalTS := []uint64{} + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) { + expectInternalTS = append(expectInternalTS, startTS) + }) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/scanRecordExec", func() { + mgr := tk.Session().GetSessionManager() + actualInternalTS = mgr.GetInternalSessionStartTSList() + }) + tk.MustExec("alter table t add index idx(a);") + require.Len(t, expectInternalTS, 1) + for _, ts := range expectInternalTS { + require.Contains(t, actualInternalTS, ts) + } +} + func TestAddIndexIngestCancel(t *testing.T) { - store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") defer ingesttestutil.InjectMockBackendMgr(t, store)() diff --git a/pkg/ddl/internal/session/session_pool.go b/pkg/ddl/internal/session/session_pool.go index ac10d872e73ed..ac07ae5902284 100644 --- a/pkg/ddl/internal/session/session_pool.go +++ b/pkg/ddl/internal/session/session_pool.go @@ -15,6 +15,7 @@ package session import ( + "context" "fmt" "sync" @@ -84,6 +85,11 @@ func (sg *Pool) Put(ctx sessionctx.Context) { // no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to // Put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing. + intest.AssertFunc(func() bool { + txn, _ := ctx.Txn(false) + return txn == nil || !txn.Valid() + }) + ctx.RollbackTxn(context.Background()) sg.resPool.Put(ctx.(pools.Resource)) infosync.DeleteInternalSession(ctx) } diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 16087df3ce365..b6df94438edc2 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1061,6 +1061,7 @@ const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool wil func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain { capacity := 200 // capacity of the sysSessionPool size do := &Domain{ +<<<<<<< HEAD store: store, exit: make(chan struct{}), sysSessionPool: newSessionPool(capacity, factory), @@ -1069,6 +1070,31 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio slowQuery: newTopNSlowQueries(config.GetGlobalConfig().InMemSlowQueryTopNNum, time.Hour*24*7, config.GetGlobalConfig().InMemSlowQueryRecentNum), indexUsageSyncLease: idxUsageSyncLease, dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName(), GetExtractTaskDirName()}}, +======= + store: store, + exit: make(chan struct{}), + sysSessionPool: util.NewSessionPool( + capacity, factory, + func(r pools.Resource) { + _, ok := r.(sessionctx.Context) + intest.Assert(ok) + infosync.StoreInternalSession(r) + }, + func(r pools.Resource) { + sctx, ok := r.(sessionctx.Context) + intest.Assert(ok) + intest.AssertFunc(func() bool { + txn, _ := sctx.Txn(false) + return txn == nil || !txn.Valid() + }) + infosync.DeleteInternalSession(r) + }, + ), + statsLease: statsLease, + schemaLease: schemaLease, + slowQuery: newTopNSlowQueries(config.GetGlobalConfig().InMemSlowQueryTopNNum, time.Hour*24*7, config.GetGlobalConfig().InMemSlowQueryRecentNum), + dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName(), GetExtractTaskDirName()}}, +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) mdlCheckTableInfo: &mdlCheckTableInfo{ mu: sync.Mutex{}, jobsVerMap: make(map[int64]int64), diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index 72c5305707605..764ffde78af58 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -2499,7 +2499,7 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co for _, info := range infoList { // If you have the PROCESS privilege, you can see all running transactions. // Otherwise, you can see only your own transactions. - if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username { + if !hasProcessPriv && loginUser != nil && info.ProcessInfo.Username != loginUser.Username { continue } e.txnInfo = append(e.txnInfo, info) diff --git a/pkg/infoschema/test/clustertablestest/tables_test.go b/pkg/infoschema/test/clustertablestest/tables_test.go index a19975a06008c..f8445b9f6b721 100644 --- a/pkg/infoschema/test/clustertablestest/tables_test.go +++ b/pkg/infoschema/test/clustertablestest/tables_test.go @@ -1221,9 +1221,11 @@ func TestTiDBTrx(t *testing.T) { CurrentSQLDigest: digest.String(), State: txninfo.TxnIdle, EntriesCount: 1, - ConnectionID: 2, - Username: "root", - CurrentDB: "test", + ProcessInfo: &txninfo.ProcessInfo{ + ConnectionID: 2, + Username: "root", + CurrentDB: "test", + }, } blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local) @@ -1232,9 +1234,11 @@ func TestTiDBTrx(t *testing.T) { CurrentSQLDigest: "", AllSQLDigests: []string{"sql1", "sql2", digest.String()}, State: txninfo.TxnLockAcquiring, - ConnectionID: 10, - Username: "user1", - CurrentDB: "db1", + ProcessInfo: &txninfo.ProcessInfo{ + ConnectionID: 10, + Username: "user1", + CurrentDB: "db1", + }, } sm.TxnInfo[1].BlockStartTime.Valid = true sm.TxnInfo[1].BlockStartTime.Time = blockTime2 diff --git a/pkg/server/server.go b/pkg/server/server.go index 272d816a61924..77d6a3ad2036c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -839,7 +839,8 @@ func (s *Server) getUserProcessList() map[uint64]*util.ProcessInfo { return rs } -// ShowTxnList shows all txn info for displaying in `TIDB_TRX` +// ShowTxnList shows all txn info for displaying in `TIDB_TRX`. +// Internal sessions are not taken into consideration. func (s *Server) ShowTxnList() []*txninfo.TxnInfo { s.rwlock.RLock() defer s.rwlock.RUnlock() @@ -847,7 +848,7 @@ func (s *Server) ShowTxnList() []*txninfo.TxnInfo { for _, client := range s.clients { if client.ctx.Session != nil { info := client.ctx.Session.TxnInfo() - if info != nil { + if info != nil && info.ProcessInfo != nil { rs = append(rs, info) } } diff --git a/pkg/session/session.go b/pkg/session/session.go index f18ce5ccfe574..59820b8fadadc 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -507,6 +507,7 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) { } // TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only* +// Process field may not initialize if this is a session used internally. func (s *session) TxnInfo() *txninfo.TxnInfo { s.txn.mu.RLock() // Copy on read to get a snapshot, this API shouldn't be frequently called. @@ -519,17 +520,27 @@ func (s *session) TxnInfo() *txninfo.TxnInfo { processInfo := s.ShowProcess() if processInfo == nil { - return nil + return &txnInfo + } + txnInfo.ProcessInfo = &txninfo.ProcessInfo{ + ConnectionID: processInfo.ID, + Username: processInfo.User, + CurrentDB: processInfo.DB, + RelatedTableIDs: make(map[int64]struct{}), } +<<<<<<< HEAD txnInfo.ConnectionID = processInfo.ID txnInfo.Username = processInfo.User txnInfo.CurrentDB = processInfo.DB txnInfo.RelatedTableIDs = make(map[int64]struct{}) s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, value interface{}) bool { txnInfo.RelatedTableIDs[key.(int64)] = struct{}{} +======= + s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, _ any) bool { + txnInfo.ProcessInfo.RelatedTableIDs[key.(int64)] = struct{}{} +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) return true }) - return &txnInfo } @@ -3853,9 +3864,10 @@ func GetStartTSFromSession(se interface{}) (startTS, processInfoID uint64) { txnInfo := tmp.TxnInfo() if txnInfo != nil { startTS = txnInfo.StartTS - processInfoID = txnInfo.ConnectionID + if txnInfo.ProcessInfo != nil { + processInfoID = txnInfo.ProcessInfo.ConnectionID + } } - logutil.BgLogger().Debug( "GetStartTSFromSession getting startTS of internal session", zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS))) diff --git a/pkg/session/txninfo/txn_info.go b/pkg/session/txninfo/txn_info.go index 50e60da829a46..5067487ebe577 100644 --- a/pkg/session/txninfo/txn_info.go +++ b/pkg/session/txninfo/txn_info.go @@ -174,8 +174,12 @@ type TxnInfo struct { // How many entries are in MemDB EntriesCount uint64 - // The following fields will be filled in `session` instead of `LazyTxn` + // The following field will be filled in `session` instead of `LazyTxn` + ProcessInfo *ProcessInfo +} +// ProcessInfo is part of fields of txnInfo, which will be filled in `session` instead of `LazyTxn` +type ProcessInfo struct { // Which session this transaction belongs to ConnectionID uint64 // The user who open this session @@ -219,13 +223,25 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{ return types.NewDatum(info.EntriesCount) }, SessionIDStr: func(info *TxnInfo) types.Datum { - return types.NewDatum(info.ConnectionID) + var connectionID uint64 + if info.ProcessInfo != nil { + connectionID = info.ProcessInfo.ConnectionID + } + return types.NewDatum(connectionID) }, UserStr: func(info *TxnInfo) types.Datum { - return types.NewDatum(info.Username) + var userName string + if info.ProcessInfo != nil { + userName = info.ProcessInfo.Username + } + return types.NewDatum(userName) }, DBStr: func(info *TxnInfo) types.Datum { - return types.NewDatum(info.CurrentDB) + var currentDB string + if info.ProcessInfo != nil { + currentDB = info.ProcessInfo.CurrentDB + } + return types.NewDatum(currentDB) }, AllSQLDigestsStr: func(info *TxnInfo) types.Datum { allSQLDigests := info.AllSQLDigests @@ -241,7 +257,10 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{ return types.NewDatum(string(res)) }, RelatedTableIDsStr: func(info *TxnInfo) types.Datum { - relatedTableIDs := info.RelatedTableIDs + var relatedTableIDs map[int64]struct{} + if info.ProcessInfo != nil { + relatedTableIDs = info.ProcessInfo.RelatedTableIDs + } str := strings.Builder{} first := true for tblID := range relatedTableIDs { diff --git a/pkg/testkit/BUILD.bazel b/pkg/testkit/BUILD.bazel index b9922d4065b45..7c14119502cd4 100644 --- a/pkg/testkit/BUILD.bazel +++ b/pkg/testkit/BUILD.bazel @@ -26,7 +26,11 @@ go_library( "//pkg/resourcemanager", "//pkg/session", "//pkg/session/txninfo", +<<<<<<< HEAD "//pkg/sessionctx", +======= + "//pkg/session/types", +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) "//pkg/sessionctx/variable", "//pkg/store/driver", "//pkg/store/mockstore", diff --git a/pkg/testkit/mocksessionmanager.go b/pkg/testkit/mocksessionmanager.go index af1aa1022660a..f1fded18647eb 100644 --- a/pkg/testkit/mocksessionmanager.go +++ b/pkg/testkit/mocksessionmanager.go @@ -24,7 +24,11 @@ import ( "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/session/txninfo" +<<<<<<< HEAD "github.com/pingcap/tidb/pkg/sessionctx" +======= + sessiontypes "github.com/pingcap/tidb/pkg/session/types" +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) "github.com/pingcap/tidb/pkg/util" ) @@ -52,7 +56,7 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo { rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn)) for _, se := range msm.Conn { info := se.TxnInfo() - if info != nil { + if info != nil && info.ProcessInfo != nil { rs = append(rs, info) } } @@ -155,12 +159,25 @@ func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 { defer msm.mu.Unlock() ret := make([]uint64, 0, len(msm.internalSessions)) for internalSess := range msm.internalSessions { +<<<<<<< HEAD se := internalSess.(sessionctx.Context) sessVars := se.GetSessionVars() sessVars.TxnCtxMu.Lock() startTS := sessVars.TxnCtx.StartTS sessVars.TxnCtxMu.Unlock() ret = append(ret, startTS) +======= + // Ref the implementation of `GetInternalSessionStartTSList` on the real session manager. The `TxnInfo` is more + // accurate, because if a session is pending, the `StartTS` in `sessVars.TxnCtx` will not be updated. For example, + // if there is not DDL for a long time, the minimal internal session start ts will not have any progress. + if se, ok := internalSess.(interface{ TxnInfo() *txninfo.TxnInfo }); ok { + txn := se.TxnInfo() + if txn != nil { + ret = append(ret, txn.StartTS) + } + continue + } +>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044)) } return ret } diff --git a/tests/realtikvtest/txntest/txn_state_test.go b/tests/realtikvtest/txntest/txn_state_test.go index 671e4b7e216e2..f2d8d57f0217a 100644 --- a/tests/realtikvtest/txntest/txn_state_test.go +++ b/tests/realtikvtest/txntest/txn_state_test.go @@ -74,9 +74,9 @@ func TestBasicTxnState(t *testing.T) { require.Equal(t, []string{beginDigest.String(), selectTSDigest.String(), expectedDigest.String()}, info.AllSQLDigests) // len and size will be covered in TestLenAndSize - require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ConnectionID) - require.Equal(t, "", info.Username) - require.Equal(t, "test", info.CurrentDB) + require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ProcessInfo.ConnectionID) + require.Equal(t, "", info.ProcessInfo.Username) + require.Equal(t, "test", info.ProcessInfo.CurrentDB) require.Equal(t, startTS, info.StartTS) require.NoError(t, failpoint.Enable("tikvclient/beforePrewrite", "pause"))