Skip to content

Commit d8a0a61

Browse files
yibin87ti-chi-bot
authored andcommitted
This is an automated cherry-pick of #63796
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 06bcd2d commit d8a0a61

File tree

10 files changed

+388
-76
lines changed

10 files changed

+388
-76
lines changed

pkg/executor/adapter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,13 +2193,13 @@ func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Contex
21932193
}
21942194
// Always attach the SQL and plan info uses to catch the running SQL when Top SQL is enabled in execution.
21952195
if stats != nil {
2196-
stats.OnExecutionBegin(sqlDigestByte, planDigestByte)
2196+
stats.OnExecutionBegin(sqlDigestByte, planDigestByte, vars.InPacketBytes.Load())
21972197
}
21982198
return topsql.AttachSQLAndPlanInfo(ctx, sqlDigest, planDigest)
21992199
}
22002200

22012201
if stats != nil {
2202-
stats.OnExecutionBegin(sqlDigestByte, planDigestByte)
2202+
stats.OnExecutionBegin(sqlDigestByte, planDigestByte, vars.InPacketBytes.Load())
22032203
// This is a special logic prepared for TiKV's SQLExecCount.
22042204
sc.KvExecCounter = stats.CreateKvExecCounter(sqlDigestByte, planDigestByte)
22052205
}
@@ -2224,7 +2224,7 @@ func (a *ExecStmt) observeStmtFinishedForTopSQL() {
22242224
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
22252225
sqlDigest, planDigest := a.getSQLPlanDigest()
22262226
execDuration := vars.GetTotalCostDuration()
2227-
stats.OnExecutionFinished(sqlDigest, planDigest, execDuration)
2227+
stats.OnExecutionFinished(sqlDigest, planDigest, execDuration, vars.OutPacketBytes.Load())
22282228
}
22292229
}
22302230

pkg/server/conn.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,11 @@ func (cc *clientConn) readPacket() ([]byte, error) {
487487
if cc.getCtx() != nil {
488488
cc.pkt.SetMaxAllowedPacket(cc.ctx.GetSessionVars().MaxAllowedPacket)
489489
}
490-
return cc.pkt.ReadPacket()
490+
data, err := cc.pkt.ReadPacket()
491+
if err == nil && cc.getCtx() != nil {
492+
cc.ctx.GetSessionVars().InPacketBytes.Add(uint64(len(data)))
493+
}
494+
return data, err
491495
}
492496

493497
func (cc *clientConn) writePacket(data []byte) error {
@@ -496,6 +500,9 @@ func (cc *clientConn) writePacket(data []byte) error {
496500
failpoint.Return(nil)
497501
}
498502
})
503+
if cc.getCtx() != nil {
504+
cc.ctx.GetSessionVars().OutPacketBytes.Add(uint64(len(data)))
505+
}
499506
return cc.pkt.WritePacket(data)
500507
}
501508

@@ -1279,6 +1286,8 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
12791286
defer func() {
12801287
// reset killed for each request
12811288
cc.ctx.GetSessionVars().SQLKiller.Reset()
1289+
cc.ctx.GetSessionVars().InPacketBytes.Store(0)
1290+
cc.ctx.GetSessionVars().OutPacketBytes.Store(0)
12821291
}()
12831292
t := time.Now()
12841293
if (cc.ctx.Status() & mysql.ServerStatusInTrans) > 0 {

pkg/sessionctx/variable/session.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,8 +1725,51 @@ type SessionVars struct {
17251725
// InternalSQLScanUserTable indicates whether to use user table for internal SQL. it will be used by TTL scan
17261726
InternalSQLScanUserTable bool
17271727

1728+
<<<<<<< HEAD
17281729
// IndexLookUpPushDownPolicy indicates the policy of index look up push down.
17291730
IndexLookUpPushDownPolicy string
1731+
=======
1732+
// MemArbitrator represents the properties to be controlled by the memory arbitrator.
1733+
MemArbitrator struct {
1734+
WaitAverse MemArbitratorWaitAverseMode
1735+
QueryReserved int64
1736+
}
1737+
1738+
// InPacketBytes records the total incoming packet bytes from clients for current session.
1739+
InPacketBytes atomic.Uint64
1740+
1741+
// OutPacketBytes records the total outcoming packet bytes to clients for current session.
1742+
OutPacketBytes atomic.Uint64
1743+
}
1744+
1745+
// ResetRelevantOptVarsAndFixes resets the relevant optimizer variables and fixes.
1746+
func (s *SessionVars) ResetRelevantOptVarsAndFixes(record bool) {
1747+
s.RecordRelevantOptVarsAndFixes = record
1748+
s.RelevantOptVars = nil
1749+
s.RelevantOptFixes = nil
1750+
}
1751+
1752+
// RecordRelevantOptVar records the optimizer variable that is relevant to the current query.
1753+
func (s *SessionVars) RecordRelevantOptVar(varName string) {
1754+
if !s.RecordRelevantOptVarsAndFixes {
1755+
return
1756+
}
1757+
if s.RelevantOptVars == nil {
1758+
s.RelevantOptVars = make(map[string]struct{})
1759+
}
1760+
s.RelevantOptVars[varName] = struct{}{}
1761+
}
1762+
1763+
// RecordRelevantOptFix records the optimizer fix that is relevant to the current query.
1764+
func (s *SessionVars) RecordRelevantOptFix(fixID uint64) {
1765+
if !s.RecordRelevantOptVarsAndFixes {
1766+
return
1767+
}
1768+
if s.RelevantOptFixes == nil {
1769+
s.RelevantOptFixes = make(map[uint64]struct{})
1770+
}
1771+
s.RelevantOptFixes[fixID] = struct{}{}
1772+
>>>>>>> e407943c073 (*: Collect client network info for topsql to pick topN sql with highest network traffic (#63796))
17301773
}
17311774

17321775
// GetSessionVars implements the `SessionVarsProvider` interface.

pkg/util/topsql/reporter/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ go_test(
4545
],
4646
embed = [":reporter"],
4747
flaky = True,
48-
shard_count = 36,
48+
shard_count = 37,
4949
deps = [
5050
"//pkg/config",
5151
"//pkg/testkit/testsetup",

pkg/util/topsql/reporter/datamodel.go

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ func zeroTsItem() tsItem {
8686
// toProto converts the tsItem to the corresponding protobuf representation.
8787
func (i *tsItem) toProto() *tipb.TopSQLRecordItem {
8888
return &tipb.TopSQLRecordItem{
89-
TimestampSec: i.timestamp,
90-
CpuTimeMs: i.cpuTimeMs,
91-
StmtExecCount: i.stmtStats.ExecCount,
92-
StmtKvExecCount: i.stmtStats.KvStatsItem.KvExecCount,
93-
StmtDurationSumNs: i.stmtStats.SumDurationNs,
94-
StmtDurationCount: i.stmtStats.DurationCount,
89+
TimestampSec: i.timestamp,
90+
CpuTimeMs: i.cpuTimeMs,
91+
StmtExecCount: i.stmtStats.ExecCount,
92+
StmtKvExecCount: i.stmtStats.KvStatsItem.KvExecCount,
93+
StmtDurationSumNs: i.stmtStats.SumDurationNs,
94+
StmtDurationCount: i.stmtStats.DurationCount,
95+
StmtNetworkInBytes: i.stmtStats.NetworkInBytes,
96+
StmtNetworkOutBytes: i.stmtStats.NetworkOutBytes,
9597
// Convert more indicators here.
9698
}
9799
}
@@ -200,20 +202,25 @@ func (r *record) appendCPUTime(timestamp uint64, cpuTimeMs uint32) {
200202
// Before:
201203
// tsIndex: [10000 => 0]
202204
// tsItems:
203-
// timestamp: [10000]
204-
// cpuTimeMs: [0]
205-
// stmtStats.ExecCount: [?]
206-
// stmtStats.KvExecCount: [map{"?": ?}]
207-
// stmtStats.DurationSum: [?]
205+
// timestamp: [10000]
206+
// cpuTimeMs: [0]
207+
// stmtStats.ExecCount: [?]
208+
// stmtStats.KvExecCount: [map{"?": ?}]
209+
// stmtStats.DurationSum: [?]
210+
// stmtStats.NetworkInBytes: [?]
211+
// stmtStats.NetworkOutBytes: [?]
208212
//
209213
// After:
210214
// tsIndex: [10000 => 0]
211215
// tsItems:
212-
// timestamp: [10000]
213-
// cpuTimeMs: [123]
214-
// stmtStats.ExecCount: [?]
215-
// stmtStats.KvExecCount: [map{"?": ?}]
216-
// stmtStats.DurationSum: [?]
216+
// timestamp: [10000]
217+
// cpuTimeMs: [123]
218+
// stmtStats.ExecCount: [?]
219+
// stmtStats.KvExecCount: [map{"?": ?}]
220+
// stmtStats.DurationSum: [?]
221+
// stmtStats.DurationSum: [?]
222+
// stmtStats.NetworkInBytes: [?]
223+
// stmtStats.NetworkOutBytes: [?]
217224
//
218225
r.tsItems[index].cpuTimeMs += cpuTimeMs
219226
} else {
@@ -225,20 +232,24 @@ func (r *record) appendCPUTime(timestamp uint64, cpuTimeMs uint32) {
225232
// Before:
226233
// tsIndex: []
227234
// tsItems:
228-
// timestamp: []
229-
// cpuTimeMs: []
230-
// stmtStats.ExecCount: []
231-
// stmtStats.KvExecCount: []
232-
// stmtStats.DurationSum: []
235+
// timestamp: []
236+
// cpuTimeMs: []
237+
// stmtStats.ExecCount: []
238+
// stmtStats.KvExecCount: []
239+
// stmtStats.DurationSum: []
240+
// stmtStats.NetworkInBytes: []
241+
// stmtStats.NetworkOutBytes: []
233242
//
234243
// After:
235244
// tsIndex: [10000 => 0]
236245
// tsItems:
237-
// timestamp: [10000]
238-
// cpuTimeMs: [123]
239-
// stmtStats.ExecCount: [0]
240-
// stmtStats.KvExecCount: [map{}]
241-
// stmtStats.DurationSum: [0]
246+
// timestamp: [10000]
247+
// cpuTimeMs: [123]
248+
// stmtStats.ExecCount: [0]
249+
// stmtStats.KvExecCount: [map{}]
250+
// stmtStats.DurationSum: [0]
251+
// stmtStats.NetworkInBytes: [0]
252+
// stmtStats.NetworkOutBytes: [0]
242253
//
243254
newItem := zeroTsItem()
244255
newItem.timestamp = timestamp
@@ -258,50 +269,59 @@ func (r *record) appendStmtStatsItem(timestamp uint64, item stmtstats.StatementS
258269
// corresponding stmtStats has been set to 0 (or other values,
259270
// although impossible), so we merge it.
260271
//
261-
// let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456
262-
//
272+
// let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456,
273+
// networkInBytes = 10, networkOutBytes = 20
263274
// Before:
264275
// tsIndex: [10000 => 0]
265276
// tsItems:
266-
// timestamp: [10000]
267-
// cpuTimeMs: [?]
268-
// stmtStats.ExecCount: [0]
269-
// stmtStats.KvExecCount: [map{}]
270-
// stmtStats.DurationSum: [0]
277+
// timestamp: [10000]
278+
// cpuTimeMs: [?]
279+
// stmtStats.ExecCount: [0]
280+
// stmtStats.KvExecCount: [map{}]
281+
// stmtStats.DurationSum: [0]
282+
// stmtStats.NetworkInBytes: [0]
283+
// stmtStats.NetworkOutBytes: [0]
271284
//
272285
// After:
273286
// tsIndex: [10000 => 0]
274287
// tsItems:
275-
// timestamp: [10000]
276-
// cpuTimeMs: [?]
277-
// stmtStats.ExecCount: [123]
278-
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
279-
// stmtStats.DurationSum: [456]
288+
// timestamp: [10000]
289+
// cpuTimeMs: [?]
290+
// stmtStats.ExecCount: [123]
291+
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
292+
// stmtStats.DurationSum: [456]
293+
// stmtStats.NetworkInBytes: [10]
294+
// stmtStats.NetworkOutBytes: [20]
280295
//
281296
r.tsItems[index].stmtStats.Merge(&item)
282297
} else {
283298
// For this timestamp, we have not appended any tsItem, so append it directly.
284299
// Other fields in tsItem except stmtStats will be initialized to 0.
285300
//
286301
// let timestamp = 10000, execCount = 123, kvExecCount = map{"1.1.1.1:1": 123}, durationSum = 456
302+
// networkInBytes = 10, networkOutBytes = 20
287303
//
288304
// Before:
289305
// tsIndex: []
290306
// tsItems:
291-
// timestamp: []
292-
// cpuTimeMs: []
293-
// stmtStats.ExecCount: []
294-
// stmtStats.KvExecCount: []
295-
// stmtStats.DurationSum: []
307+
// timestamp: []
308+
// cpuTimeMs: []
309+
// stmtStats.ExecCount: []
310+
// stmtStats.KvExecCount: []
311+
// stmtStats.DurationSum: []
312+
// stmtStats.NetworkInBytes: []
313+
// stmtStats.NetworkOutBytes: []
296314
//
297315
// After:
298316
// tsIndex: [10000 => 0]
299317
// tsItems:
300-
// timestamp: [10000]
301-
// cpuTimeMs: [0]
302-
// stmtStats.ExecCount: [123]
303-
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
304-
// stmtStats.DurationSum: [456]
318+
// timestamp: [10000]
319+
// cpuTimeMs: [0]
320+
// stmtStats.ExecCount: [123]
321+
// stmtStats.KvExecCount: [map{"1.1.1.1:1": 123}]
322+
// stmtStats.DurationSum: [456]
323+
// stmtStats.NetworkInBytes: [10]
324+
// stmtStats.NetworkOutBytes: [20]
305325
//
306326
newItem := zeroTsItem()
307327
newItem.timestamp = timestamp
@@ -543,7 +563,7 @@ func (c *collecting) getReportRecords() records {
543563
for _, v := range c.records {
544564
rs = append(rs, *v)
545565
}
546-
if others != nil && others.totalCPUTimeMs > 0 {
566+
if others != nil {
547567
rs = append(rs, *others)
548568
}
549569
return rs

pkg/util/topsql/reporter/reporter.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
reporter_metrics "github.com/pingcap/tidb/pkg/util/topsql/reporter/metrics"
2626
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
2727
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
28+
"github.com/wangjohn/quickselect"
2829
"go.uber.org/zap"
2930
)
3031

@@ -233,20 +234,62 @@ func (tsr *RemoteTopSQLReporter) processCPUTimeData(timestamp uint64, data cpuRe
233234
func (tsr *RemoteTopSQLReporter) processStmtStatsData() {
234235
defer util.Recover("top-sql", "processStmtStatsData", nil, false)
235236

237+
maxLen := 0
238+
for _, data := range tsr.stmtStatsBuffer {
239+
maxLen = max(maxLen, len(data))
240+
}
241+
u64Slice := make([]uint64, 0, maxLen)
242+
k := int(topsqlstate.GlobalState.MaxStatementCount.Load())
236243
for timestamp, data := range tsr.stmtStatsBuffer {
244+
kthNetworkBytes := findKthNetworkBytes(data, k, u64Slice)
237245
for digest, item := range data {
238246
sqlDigest, planDigest := []byte(digest.SQLDigest), []byte(digest.PlanDigest)
239-
if tsr.collecting.hasEvicted(timestamp, sqlDigest, planDigest) {
240-
// This timestamp+sql+plan has been evicted due to low CPUTime.
247+
// Note, by filtering with the kthNetworkBytes, we get fewer than N records(N - 1 records, at most time). The actual picked records
248+
// count is decided by the count of duplicated kthNetworkBytes,if kthNetworkBytes is unique,
249+
// For performance reason, do not convert the whole map into a slice and pick exactly topN records.
250+
if item.NetworkInBytes+item.NetworkOutBytes > kthNetworkBytes || !tsr.collecting.hasEvicted(timestamp, sqlDigest, planDigest) {
251+
tsr.collecting.getOrCreateRecord(sqlDigest, planDigest).appendStmtStatsItem(timestamp, *item)
252+
} else {
241253
tsr.collecting.appendOthersStmtStatsItem(timestamp, *item)
242-
continue
243254
}
244-
tsr.collecting.getOrCreateRecord(sqlDigest, planDigest).appendStmtStatsItem(timestamp, *item)
245255
}
246256
}
247257
tsr.stmtStatsBuffer = map[uint64]stmtstats.StatementStatsMap{}
248258
}
249259

260+
// The uint64Slice type attaches the QuickSelect interface to an array of uint64s. It
261+
// implements Interface so that you can call QuickSelect(k) on any IntSlice.
262+
type uint64Slice []uint64
263+
264+
func (t uint64Slice) Len() int {
265+
return len(t)
266+
}
267+
268+
func (t uint64Slice) Less(i, j int) bool {
269+
return t[i] > t[j]
270+
}
271+
272+
func (t uint64Slice) Swap(i, j int) {
273+
t[i], t[j] = t[j], t[i]
274+
}
275+
276+
// findKthNetworkBytes finds the k-th largest network bytes in data using quickselect algorithm.
277+
func findKthNetworkBytes(data stmtstats.StatementStatsMap, k int, u64Slice []uint64) uint64 {
278+
var kthNetworkBytes uint64
279+
if len(data) > k {
280+
u64Slice = u64Slice[:0]
281+
for _, item := range data {
282+
u64Slice = append(u64Slice, item.NetworkInBytes+item.NetworkOutBytes)
283+
}
284+
_ = quickselect.QuickSelect(uint64Slice(u64Slice), k)
285+
kthNetworkBytes = u64Slice[0]
286+
for i := range k {
287+
kthNetworkBytes = min(kthNetworkBytes, u64Slice[i])
288+
}
289+
}
290+
return kthNetworkBytes
291+
}
292+
250293
// takeDataAndSendToReportChan takes records data and then send to the report channel for reporting.
251294
func (tsr *RemoteTopSQLReporter) takeDataAndSendToReportChan() {
252295
// Send to report channel. When channel is full, data will be dropped.

0 commit comments

Comments
 (0)