Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5685,13 +5685,13 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sha256 = "17915c59b6f2d3a8554434ed3683f207ce2d036a93f8461f10957e29c5811a92",
strip_prefix = "github.com/pingcap/[email protected]20251125085256-097db0b2c02a",
sha256 = "f0ef12d299ff4b0afca6f29723fa0f50fd6228fd38e1f99d925430f2d2dc7ea9",
strip_prefix = "github.com/pingcap/[email protected]20260202031324-4ce7b6c65c98",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20251125085256-097db0b2c02a.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20251125085256-097db0b2c02a.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20251125085256-097db0b2c02a.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20251125085256-097db0b2c02a.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20260202031324-4ce7b6c65c98.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ require (
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20251125085256-097db0b2c02a
github.com/pingcap/tipb v0.0.0-20260202031324-4ce7b6c65c98
github.com/prometheus/client_golang v1.23.0
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.65.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,8 @@ github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGa
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tipb v0.0.0-20251125085256-097db0b2c02a h1:ya+ghNkfGgXYpgi0L7109vzy3tAQn3a+zneOK7C3BQ8=
github.com/pingcap/tipb v0.0.0-20251125085256-097db0b2c02a/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20260202031324-4ce7b6c65c98 h1:rCUl6auTYEiq+k5hGVDTjInH++wY/XgZ8V5Lp24OaEo=
github.com/pingcap/tipb v0.0.0-20260202031324-4ce7b6c65c98/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2193,13 +2193,13 @@ func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Contex
}
// Always attach the SQL and plan info uses to catch the running SQL when Top SQL is enabled in execution.
if stats != nil {
stats.OnExecutionBegin(sqlDigestByte, planDigestByte)
stats.OnExecutionBegin(sqlDigestByte, planDigestByte, vars.InPacketBytes.Load())
}
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
}

if stats != nil {
stats.OnExecutionBegin(sqlDigestByte, planDigestByte)
stats.OnExecutionBegin(sqlDigestByte, planDigestByte, vars.InPacketBytes.Load())
// This is a special logic prepared for TiKV's SQLExecCount.
sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte)
}
Expand All @@ -2224,7 +2224,7 @@ func (a *ExecStmt) observeStmtFinishedForTopSQL() {
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
sqlDigest, planDigest := a.getSQLPlanDigest()
execDuration := vars.GetTotalCostDuration()
stats.OnExecutionFinished(sqlDigest, planDigest, execDuration)
stats.OnExecutionFinished(sqlDigest, planDigest, execDuration, vars.OutPacketBytes.Load())
}
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,11 @@ func (cc *clientConn) readPacket() ([]byte, error) {
if cc.getCtx() != nil {
cc.pkt.SetMaxAllowedPacket(cc.ctx.GetSessionVars().MaxAllowedPacket)
}
return cc.pkt.ReadPacket()
data, err := cc.pkt.ReadPacket()
if err == nil && cc.getCtx() != nil {
cc.ctx.GetSessionVars().InPacketBytes.Add(uint64(len(data)))
}
return data, err
}

func (cc *clientConn) writePacket(data []byte) error {
Expand All @@ -496,6 +500,9 @@ func (cc *clientConn) writePacket(data []byte) error {
failpoint.Return(nil)
}
})
if cc.getCtx() != nil {
cc.ctx.GetSessionVars().OutPacketBytes.Add(uint64(len(data)))
}
return cc.pkt.WritePacket(data)
}

Expand Down Expand Up @@ -1279,6 +1286,8 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
defer func() {
// reset killed for each request
cc.ctx.GetSessionVars().SQLKiller.Reset()
cc.ctx.GetSessionVars().InPacketBytes.Store(0)
cc.ctx.GetSessionVars().OutPacketBytes.Store(0)
}()
t := time.Now()
if (cc.ctx.Status() & mysql.ServerStatusInTrans) > 0 {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,12 @@ type SessionVars struct {

// IndexLookUpPushDownPolicy indicates the policy of index look up push down.
IndexLookUpPushDownPolicy string

// InPacketBytes records the total incoming packet bytes from clients for current session.
InPacketBytes atomic.Uint64

// OutPacketBytes records the total outcoming packet bytes to clients for current session.
OutPacketBytes atomic.Uint64
}

// GetSessionVars implements the `SessionVarsProvider` interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/topsql/reporter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
],
embed = [":reporter"],
flaky = True,
shard_count = 36,
shard_count = 37,
deps = [
"//pkg/config",
"//pkg/testkit/testsetup",
Expand Down
118 changes: 69 additions & 49 deletions pkg/util/topsql/reporter/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ func zeroTsItem() tsItem {
// toProto converts the tsItem to the corresponding protobuf representation.
func (i *tsItem) toProto() *tipb.TopSQLRecordItem {
return &tipb.TopSQLRecordItem{
TimestampSec: i.timestamp,
CpuTimeMs: i.cpuTimeMs,
StmtExecCount: i.stmtStats.ExecCount,
StmtKvExecCount: i.stmtStats.KvStatsItem.KvExecCount,
StmtDurationSumNs: i.stmtStats.SumDurationNs,
StmtDurationCount: i.stmtStats.DurationCount,
TimestampSec: i.timestamp,
CpuTimeMs: i.cpuTimeMs,
StmtExecCount: i.stmtStats.ExecCount,
StmtKvExecCount: i.stmtStats.KvStatsItem.KvExecCount,
StmtDurationSumNs: i.stmtStats.SumDurationNs,
StmtDurationCount: i.stmtStats.DurationCount,
StmtNetworkInBytes: i.stmtStats.NetworkInBytes,
StmtNetworkOutBytes: i.stmtStats.NetworkOutBytes,
// Convert more indicators here.
}
}
Expand Down Expand Up @@ -200,20 +202,25 @@ func (r *record) appendCPUTime(timestamp uint64, cpuTimeMs uint32) {
// Before:
// tsIndex: [10000 => 0]
// tsItems:
// timestamp: [10000]
// cpuTimeMs: [0]
// stmtStats.ExecCount: [?]
// stmtStats.KvExecCount: [map{"?": ?}]
// stmtStats.DurationSum: [?]
// timestamp: [10000]
// cpuTimeMs: [0]
// stmtStats.ExecCount: [?]
// stmtStats.KvExecCount: [map{"?": ?}]
// stmtStats.DurationSum: [?]
// stmtStats.NetworkInBytes: [?]
// stmtStats.NetworkOutBytes: [?]
//
// After:
// tsIndex: [10000 => 0]
// tsItems:
// timestamp: [10000]
// cpuTimeMs: [123]
// stmtStats.ExecCount: [?]
// stmtStats.KvExecCount: [map{"?": ?}]
// stmtStats.DurationSum: [?]
// timestamp: [10000]
// cpuTimeMs: [123]
// stmtStats.ExecCount: [?]
// stmtStats.KvExecCount: [map{"?": ?}]
// stmtStats.DurationSum: [?]
// stmtStats.DurationSum: [?]
// stmtStats.NetworkInBytes: [?]
// stmtStats.NetworkOutBytes: [?]
//
r.tsItems[index].cpuTimeMs += cpuTimeMs
} else {
Expand All @@ -225,20 +232,24 @@ func (r *record) appendCPUTime(timestamp uint64, cpuTimeMs uint32) {
// Before:
// tsIndex: []
// tsItems:
// timestamp: []
// cpuTimeMs: []
// stmtStats.ExecCount: []
// stmtStats.KvExecCount: []
// stmtStats.DurationSum: []
// timestamp: []
// cpuTimeMs: []
// stmtStats.ExecCount: []
// stmtStats.KvExecCount: []
// stmtStats.DurationSum: []
// stmtStats.NetworkInBytes: []
// stmtStats.NetworkOutBytes: []
//
// After:
// tsIndex: [10000 => 0]
// tsItems:
// timestamp: [10000]
// cpuTimeMs: [123]
// stmtStats.ExecCount: [0]
// stmtStats.KvExecCount: [map{}]
// stmtStats.DurationSum: [0]
// timestamp: [10000]
// cpuTimeMs: [123]
// stmtStats.ExecCount: [0]
// stmtStats.KvExecCount: [map{}]
// stmtStats.DurationSum: [0]
// stmtStats.NetworkInBytes: [0]
// stmtStats.NetworkOutBytes: [0]
//
newItem := zeroTsItem()
newItem.timestamp = timestamp
Expand All @@ -258,50 +269,59 @@ func (r *record) appendStmtStatsItem(timestamp uint64, item stmtstats.StatementS
// corresponding stmtStats has been set to 0 (or other values,
// although impossible), so we merge it.
//
// let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456
//
// let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456,
// networkInBytes = 10, networkOutBytes = 20
// Before:
// tsIndex: [10000 => 0]
// tsItems:
// timestamp: [10000]
// cpuTimeMs: [?]
// stmtStats.ExecCount: [0]
// stmtStats.KvExecCount: [map{}]
// stmtStats.DurationSum: [0]
// timestamp: [10000]
// cpuTimeMs: [?]
// stmtStats.ExecCount: [0]
// stmtStats.KvExecCount: [map{}]
// stmtStats.DurationSum: [0]
// stmtStats.NetworkInBytes: [0]
// stmtStats.NetworkOutBytes: [0]
//
// After:
// tsIndex: [10000 => 0]
// tsItems:
// timestamp: [10000]
// cpuTimeMs: [?]
// stmtStats.ExecCount: [123]
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
// stmtStats.DurationSum: [456]
// timestamp: [10000]
// cpuTimeMs: [?]
// stmtStats.ExecCount: [123]
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
// stmtStats.DurationSum: [456]
// stmtStats.NetworkInBytes: [10]
// stmtStats.NetworkOutBytes: [20]
//
r.tsItems[index].stmtStats.Merge(&item)
} else {
// For this timestamp, we have not appended any tsItem, so append it directly.
// Other fields in tsItem except stmtStats will be initialized to 0.
//
// let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456
// networkInBytes = 10, networkOutBytes = 20
//
// Before:
// tsIndex: []
// tsItems:
// timestamp: []
// cpuTimeMs: []
// stmtStats.ExecCount: []
// stmtStats.KvExecCount: []
// stmtStats.DurationSum: []
// timestamp: []
// cpuTimeMs: []
// stmtStats.ExecCount: []
// stmtStats.KvExecCount: []
// stmtStats.DurationSum: []
// stmtStats.NetworkInBytes: []
// stmtStats.NetworkOutBytes: []
//
// After:
// tsIndex: [10000 => 0]
// tsItems:
// timestamp: [10000]
// cpuTimeMs: [0]
// stmtStats.ExecCount: [123]
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
// stmtStats.DurationSum: [456]
// timestamp: [10000]
// cpuTimeMs: [0]
// stmtStats.ExecCount: [123]
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
// stmtStats.DurationSum: [456]
// stmtStats.NetworkInBytes: [10]
// stmtStats.NetworkOutBytes: [20]
//
newItem := zeroTsItem()
newItem.timestamp = timestamp
Expand Down Expand Up @@ -543,7 +563,7 @@ func (c *collecting) getReportRecords() records {
for _, v := range c.records {
rs = append(rs, *v)
}
if others != nil && others.totalCPUTimeMs > 0 {
if others != nil {
rs = append(rs, *others)
}
return rs
Expand Down
51 changes: 47 additions & 4 deletions pkg/util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
reporter_metrics "github.com/pingcap/tidb/pkg/util/topsql/reporter/metrics"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
"github.com/wangjohn/quickselect"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -233,20 +234,62 @@ func (tsr *RemoteTopSQLReporter) processCPUTimeData(timestamp uint64, data cpuRe
func (tsr *RemoteTopSQLReporter) processStmtStatsData() {
defer util.Recover("top-sql", "processStmtStatsData", nil, false)

maxLen := 0
for _, data := range tsr.stmtStatsBuffer {
maxLen = max(maxLen, len(data))
}
u64Slice := make([]uint64, 0, maxLen)
k := int(topsqlstate.GlobalState.MaxStatementCount.Load())
for timestamp, data := range tsr.stmtStatsBuffer {
kthNetworkBytes := findKthNetworkBytes(data, k, u64Slice)
for digest, item := range data {
sqlDigest, planDigest := []byte(digest.SQLDigest), []byte(digest.PlanDigest)
if tsr.collecting.hasEvicted(timestamp, sqlDigest, planDigest) {
// This timestamp+sql+plan has been evicted due to low CPUTime.
// Note, by filtering with the kthNetworkBytes, we get fewer than N records(N - 1 records, at most time). The actual picked records
// count is decided by the count of duplicated kthNetworkBytes,if kthNetworkBytes is unique,
// For performance reason, do not convert the whole map into a slice and pick exactly topN records.
if item.NetworkInBytes+item.NetworkOutBytes > kthNetworkBytes || !tsr.collecting.hasEvicted(timestamp, sqlDigest, planDigest) {
tsr.collecting.getOrCreateRecord(sqlDigest, planDigest).appendStmtStatsItem(timestamp, *item)
} else {
tsr.collecting.appendOthersStmtStatsItem(timestamp, *item)
continue
}
tsr.collecting.getOrCreateRecord(sqlDigest, planDigest).appendStmtStatsItem(timestamp, *item)
}
}
tsr.stmtStatsBuffer = map[uint64]stmtstats.StatementStatsMap{}
}

// The uint64Slice type attaches the QuickSelect interface to an array of uint64s. It
// implements Interface so that you can call QuickSelect(k) on any IntSlice.
type uint64Slice []uint64

func (t uint64Slice) Len() int {
return len(t)
}

func (t uint64Slice) Less(i, j int) bool {
return t[i] > t[j]
}

func (t uint64Slice) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}

// findKthNetworkBytes finds the k-th largest network bytes in data using quickselect algorithm.
func findKthNetworkBytes(data stmtstats.StatementStatsMap, k int, u64Slice []uint64) uint64 {
var kthNetworkBytes uint64
if len(data) > k {
u64Slice = u64Slice[:0]
for _, item := range data {
u64Slice = append(u64Slice, item.NetworkInBytes+item.NetworkOutBytes)
}
_ = quickselect.QuickSelect(uint64Slice(u64Slice), k)
kthNetworkBytes = u64Slice[0]
for i := range k {
kthNetworkBytes = min(kthNetworkBytes, u64Slice[i])
}
}
return kthNetworkBytes
}

// takeDataAndSendToReportChan takes records data and then send to the report channel for reporting.
func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan() {
// Send to report channel. When channel is full, data will be dropped.
Expand Down
Loading