Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: make TxnInfo() return even if process info is empty (#57044) #57164

Open
wants to merge 1 commit into
base: release-7.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,13 @@ func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error {
return errors.Trace(err)
}
defer se.Rollback()
var startTS uint64
sessVars := se.GetSessionVars()
sessVars.TxnCtxMu.Lock()
startTS = sessVars.TxnCtx.StartTS
sessVars.TxnCtxMu.Unlock()

txn, err := se.Txn()
if err != nil {
return err
}
startTS := txn.StartTS()
failpoint.InjectCall("wrapInBeginRollbackStartTS", startTS)
return f(startTS)
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
<<<<<<< HEAD
shard_count = 17,
=======
shard_count = 23,
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
deps = [
"//pkg/config",
"//pkg/ddl",
Expand All @@ -79,7 +83,11 @@ go_test(
"//pkg/errno",
"//pkg/parser/model",
"//pkg/testkit",
<<<<<<< HEAD
"//tests/realtikvtest",
=======
"//pkg/testkit/testfailpoint",
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
Expand Down
34 changes: 32 additions & 2 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ import (
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
<<<<<<< HEAD
"github.com/pingcap/tidb/tests/realtikvtest"
=======
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -121,7 +125,7 @@ func TestIngestError(t *testing.T) {
}

func TestAddIndexIngestPanic(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer ingesttestutil.InjectMockBackendMgr(t, store)()
Expand All @@ -142,8 +146,34 @@ func TestAddIndexIngestPanic(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockLocalWriterPanic"))
}

func TestAddIndexSetInternalSessions(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer ingesttestutil.InjectMockBackendMgr(t, store)()

tk.MustExec("set global tidb_enable_dist_task = 0;")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1);")
expectInternalTS := []uint64{}
actualInternalTS := []uint64{}
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/wrapInBeginRollbackStartTS", func(startTS uint64) {
expectInternalTS = append(expectInternalTS, startTS)
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/scanRecordExec", func() {
mgr := tk.Session().GetSessionManager()
actualInternalTS = mgr.GetInternalSessionStartTSList()
})
tk.MustExec("alter table t add index idx(a);")
require.Len(t, expectInternalTS, 1)
for _, ts := range expectInternalTS {
require.Contains(t, actualInternalTS, ts)
}
}

func TestAddIndexIngestCancel(t *testing.T) {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer ingesttestutil.InjectMockBackendMgr(t, store)()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/internal/session/session_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package session

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -84,6 +85,11 @@ func (sg *Pool) Put(ctx sessionctx.Context) {

// no need to protect sg.resPool, even the sg.resPool is closed, the ctx still need to
// Put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing.
intest.AssertFunc(func() bool {
txn, _ := ctx.Txn(false)
return txn == nil || !txn.Valid()
})
ctx.RollbackTxn(context.Background())
sg.resPool.Put(ctx.(pools.Resource))
infosync.DeleteInternalSession(ctx)
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool wil
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory) *Domain {
capacity := 200 // capacity of the sysSessionPool size
do := &Domain{
<<<<<<< HEAD
store: store,
exit: make(chan struct{}),
sysSessionPool: newSessionPool(capacity, factory),
Expand All @@ -1069,6 +1070,31 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
slowQuery: newTopNSlowQueries(config.GetGlobalConfig().InMemSlowQueryTopNNum, time.Hour*24*7, config.GetGlobalConfig().InMemSlowQueryRecentNum),
indexUsageSyncLease: idxUsageSyncLease,
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName(), GetExtractTaskDirName()}},
=======
store: store,
exit: make(chan struct{}),
sysSessionPool: util.NewSessionPool(
capacity, factory,
func(r pools.Resource) {
_, ok := r.(sessionctx.Context)
intest.Assert(ok)
infosync.StoreInternalSession(r)
},
func(r pools.Resource) {
sctx, ok := r.(sessionctx.Context)
intest.Assert(ok)
intest.AssertFunc(func() bool {
txn, _ := sctx.Txn(false)
return txn == nil || !txn.Valid()
})
infosync.DeleteInternalSession(r)
},
),
statsLease: statsLease,
schemaLease: schemaLease,
slowQuery: newTopNSlowQueries(config.GetGlobalConfig().InMemSlowQueryTopNNum, time.Hour*24*7, config.GetGlobalConfig().InMemSlowQueryRecentNum),
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName(), GetExtractTaskDirName()}},
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
mdlCheckTableInfo: &mdlCheckTableInfo{
mu: sync.Mutex{},
jobsVerMap: make(map[int64]int64),
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2499,7 +2499,7 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
// Otherwise, you can see only your own transactions.
if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
if !hasProcessPriv && loginUser != nil && info.ProcessInfo.Username != loginUser.Username {
continue
}
e.txnInfo = append(e.txnInfo, info)
Expand Down
16 changes: 10 additions & 6 deletions pkg/infoschema/test/clustertablestest/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,9 +1221,11 @@ func TestTiDBTrx(t *testing.T) {
CurrentSQLDigest: digest.String(),
State: txninfo.TxnIdle,
EntriesCount: 1,
ConnectionID: 2,
Username: "root",
CurrentDB: "test",
ProcessInfo: &txninfo.ProcessInfo{
ConnectionID: 2,
Username: "root",
CurrentDB: "test",
},
}

blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.Local)
Expand All @@ -1232,9 +1234,11 @@ func TestTiDBTrx(t *testing.T) {
CurrentSQLDigest: "",
AllSQLDigests: []string{"sql1", "sql2", digest.String()},
State: txninfo.TxnLockAcquiring,
ConnectionID: 10,
Username: "user1",
CurrentDB: "db1",
ProcessInfo: &txninfo.ProcessInfo{
ConnectionID: 10,
Username: "user1",
CurrentDB: "db1",
},
}
sm.TxnInfo[1].BlockStartTime.Valid = true
sm.TxnInfo[1].BlockStartTime.Time = blockTime2
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,15 +839,16 @@ func (s *Server) getUserProcessList() map[uint64]*util.ProcessInfo {
return rs
}

// ShowTxnList shows all txn info for displaying in `TIDB_TRX`
// ShowTxnList shows all txn info for displaying in `TIDB_TRX`.
// Internal sessions are not taken into consideration.
func (s *Server) ShowTxnList() []*txninfo.TxnInfo {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
rs := make([]*txninfo.TxnInfo, 0, len(s.clients))
for _, client := range s.clients {
if client.ctx.Session != nil {
info := client.ctx.Session.TxnInfo()
if info != nil {
if info != nil && info.ProcessInfo != nil {
rs = append(rs, info)
}
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) {
}

// TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only*
// Process field may not initialize if this is a session used internally.
func (s *session) TxnInfo() *txninfo.TxnInfo {
s.txn.mu.RLock()
// Copy on read to get a snapshot, this API shouldn't be frequently called.
Expand All @@ -519,17 +520,27 @@ func (s *session) TxnInfo() *txninfo.TxnInfo {

processInfo := s.ShowProcess()
if processInfo == nil {
return nil
return &txnInfo
}
txnInfo.ProcessInfo = &txninfo.ProcessInfo{
ConnectionID: processInfo.ID,
Username: processInfo.User,
CurrentDB: processInfo.DB,
RelatedTableIDs: make(map[int64]struct{}),
}
<<<<<<< HEAD
txnInfo.ConnectionID = processInfo.ID
txnInfo.Username = processInfo.User
txnInfo.CurrentDB = processInfo.DB
txnInfo.RelatedTableIDs = make(map[int64]struct{})
s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, value interface{}) bool {
txnInfo.RelatedTableIDs[key.(int64)] = struct{}{}
=======
s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, _ any) bool {
txnInfo.ProcessInfo.RelatedTableIDs[key.(int64)] = struct{}{}
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
return true
})

return &txnInfo
}

Expand Down Expand Up @@ -3853,9 +3864,10 @@ func GetStartTSFromSession(se interface{}) (startTS, processInfoID uint64) {
txnInfo := tmp.TxnInfo()
if txnInfo != nil {
startTS = txnInfo.StartTS
processInfoID = txnInfo.ConnectionID
if txnInfo.ProcessInfo != nil {
processInfoID = txnInfo.ProcessInfo.ConnectionID
}
}

logutil.BgLogger().Debug(
"GetStartTSFromSession getting startTS of internal session",
zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS)))
Expand Down
29 changes: 24 additions & 5 deletions pkg/session/txninfo/txn_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,12 @@ type TxnInfo struct {
// How many entries are in MemDB
EntriesCount uint64

// The following fields will be filled in `session` instead of `LazyTxn`
// The following field will be filled in `session` instead of `LazyTxn`
ProcessInfo *ProcessInfo
}

// ProcessInfo is part of fields of txnInfo, which will be filled in `session` instead of `LazyTxn`
type ProcessInfo struct {
// Which session this transaction belongs to
ConnectionID uint64
// The user who open this session
Expand Down Expand Up @@ -219,13 +223,25 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
return types.NewDatum(info.EntriesCount)
},
SessionIDStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.ConnectionID)
var connectionID uint64
if info.ProcessInfo != nil {
connectionID = info.ProcessInfo.ConnectionID
}
return types.NewDatum(connectionID)
},
UserStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.Username)
var userName string
if info.ProcessInfo != nil {
userName = info.ProcessInfo.Username
}
return types.NewDatum(userName)
},
DBStr: func(info *TxnInfo) types.Datum {
return types.NewDatum(info.CurrentDB)
var currentDB string
if info.ProcessInfo != nil {
currentDB = info.ProcessInfo.CurrentDB
}
return types.NewDatum(currentDB)
},
AllSQLDigestsStr: func(info *TxnInfo) types.Datum {
allSQLDigests := info.AllSQLDigests
Expand All @@ -241,7 +257,10 @@ var columnValueGetterMap = map[string]func(*TxnInfo) types.Datum{
return types.NewDatum(string(res))
},
RelatedTableIDsStr: func(info *TxnInfo) types.Datum {
relatedTableIDs := info.RelatedTableIDs
var relatedTableIDs map[int64]struct{}
if info.ProcessInfo != nil {
relatedTableIDs = info.ProcessInfo.RelatedTableIDs
}
str := strings.Builder{}
first := true
for tblID := range relatedTableIDs {
Expand Down
4 changes: 4 additions & 0 deletions pkg/testkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ go_library(
"//pkg/resourcemanager",
"//pkg/session",
"//pkg/session/txninfo",
<<<<<<< HEAD
"//pkg/sessionctx",
=======
"//pkg/session/types",
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
"//pkg/sessionctx/variable",
"//pkg/store/driver",
"//pkg/store/mockstore",
Expand Down
19 changes: 18 additions & 1 deletion pkg/testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/session/txninfo"
<<<<<<< HEAD
"github.com/pingcap/tidb/pkg/sessionctx"
=======
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
"github.com/pingcap/tidb/pkg/util"
)

Expand Down Expand Up @@ -52,7 +56,7 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn))
for _, se := range msm.Conn {
info := se.TxnInfo()
if info != nil {
if info != nil && info.ProcessInfo != nil {
rs = append(rs, info)
}
}
Expand Down Expand Up @@ -155,12 +159,25 @@ func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 {
defer msm.mu.Unlock()
ret := make([]uint64, 0, len(msm.internalSessions))
for internalSess := range msm.internalSessions {
<<<<<<< HEAD
se := internalSess.(sessionctx.Context)
sessVars := se.GetSessionVars()
sessVars.TxnCtxMu.Lock()
startTS := sessVars.TxnCtx.StartTS
sessVars.TxnCtxMu.Unlock()
ret = append(ret, startTS)
=======
// Ref the implementation of `GetInternalSessionStartTSList` on the real session manager. The `TxnInfo` is more
// accurate, because if a session is pending, the `StartTS` in `sessVars.TxnCtx` will not be updated. For example,
// if there is not DDL for a long time, the minimal internal session start ts will not have any progress.
if se, ok := internalSess.(interface{ TxnInfo() *txninfo.TxnInfo }); ok {
txn := se.TxnInfo()
if txn != nil {
ret = append(ret, txn.StartTS)
}
continue
}
>>>>>>> 865213c94e2 (session: make `TxnInfo()` return even if process info is empty (#57044))
}
return ret
}
Expand Down
6 changes: 3 additions & 3 deletions tests/realtikvtest/txntest/txn_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func TestBasicTxnState(t *testing.T) {
require.Equal(t, []string{beginDigest.String(), selectTSDigest.String(), expectedDigest.String()}, info.AllSQLDigests)

// len and size will be covered in TestLenAndSize
require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ConnectionID)
require.Equal(t, "", info.Username)
require.Equal(t, "test", info.CurrentDB)
require.Equal(t, tk.Session().GetSessionVars().ConnectionID, info.ProcessInfo.ConnectionID)
require.Equal(t, "", info.ProcessInfo.Username)
require.Equal(t, "test", info.ProcessInfo.CurrentDB)
require.Equal(t, startTS, info.StartTS)

require.NoError(t, failpoint.Enable("tikvclient/beforePrewrite", "pause"))
Expand Down