Skip to content

Commit 9c92811

Browse files
committed
change
1 parent 7f1a029 commit 9c92811

File tree

17 files changed

+580
-207
lines changed

17 files changed

+580
-207
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ require (
348348
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect
349349
google.golang.org/protobuf v1.36.6
350350
gopkg.in/inf.v0 v0.9.1 // indirect
351-
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
351+
gopkg.in/natefinch/lumberjack.v2 v2.2.1
352352
gopkg.in/yaml.v3 v3.0.1 // indirect
353353
k8s.io/apimachinery v0.29.11 // indirect
354354
k8s.io/klog/v2 v2.120.1 // indirect
@@ -363,6 +363,7 @@ replace (
363363
// Downgrade grpc to v1.63.2, as well as other related modules.
364364
github.com/apache/arrow-go/v18 => github.com/joechenrh/arrow-go/v18 v18.0.0-20250911101656-62c34c9a3b82
365365
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
366+
github.com/pingcap/metering_sdk => ../metering_sdk
366367
github.com/pingcap/tidb/pkg/parser => ./pkg/parser
367368

368369
// TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed.

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,6 @@ github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuR
723723
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
724724
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9 h1:qG9BSvlWFEE5otQGamuWedx9LRm0nrHvsQRQiW8SxEs=
725725
github.com/pingcap/log v1.1.1-0.20250917021125-19901e015dc9/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
726-
github.com/pingcap/metering_sdk v0.0.0-20250918015914-468cd6feb1dc h1:WQYup3tMJq1tyDU8aWTpSZcMPJIKHV+LNlCu6S7OH1I=
727-
github.com/pingcap/metering_sdk v0.0.0-20250918015914-468cd6feb1dc/go.mod h1:Qj77xzm/Bscv47607+/BkP0ovAHnf4j7HWSLKQaKwBw=
728726
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
729727
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
730728
github.com/pingcap/tipb v0.0.0-20250928030846-9fd33ded6f2c h1:tddMjEiXU0d1VlJ+yHwim4gINeHmFR9CCkitatuby2c=

pkg/ddl/backfilling_import_cloud.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (e *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
135135
OnDup: engineapi.OnDuplicateKeyError,
136136
OnReaderClose: func(summary *external.ReaderSummary) {
137137
e.summary.GetReqCnt.Add(summary.GetRequestCount)
138-
e.GetMeterRecorder().RecordGetRequestCount(summary.GetRequestCount)
138+
e.GetMeterRecorder().IncGetRequest(summary.GetRequestCount)
139139
},
140140
},
141141
TS: sm.TS,

pkg/ddl/backfilling_merge_sort.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
8585
m.subtaskSortedKVMeta.MergeSummary(summary)
8686
m.mu.Unlock()
8787
m.summary.PutReqCnt.Add(summary.PutRequestCount)
88-
m.GetMeterRecorder().RecordPutRequestCount(summary.PutRequestCount)
88+
m.GetMeterRecorder().IncPutRequest(summary.PutRequestCount)
8989
}
9090
onReaderClose := func(summary *external.ReaderSummary) {
9191
m.summary.GetReqCnt.Add(summary.GetRequestCount)
92-
m.GetMeterRecorder().RecordGetRequestCount(summary.GetRequestCount)
92+
m.GetMeterRecorder().IncGetRequest(summary.GetRequestCount)
9393
}
9494

9595
storeBackend, err := storage.ParseBackend(m.cloudStoreURI, nil)

pkg/ddl/backfilling_read_index.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ func (r *readIndexStepExecutor) buildExternalStorePipeline(
410410
}
411411
kvMeta.MergeSummary(summary)
412412
r.summary.PutReqCnt.Add(summary.PutRequestCount)
413-
r.GetMeterRecorder().RecordPutRequestCount(summary.PutRequestCount)
413+
r.GetMeterRecorder().IncPutRequest(summary.PutRequestCount)
414414
s.mu.Unlock()
415415
}
416416
var idxNames strings.Builder
@@ -463,12 +463,12 @@ func newDistTaskRowCntCollector(
463463

464464
func (d *distTaskRowCntCollector) Accepted(bytes int64) {
465465
d.summary.ReadBytes.Add(bytes)
466-
d.meterRec.RecordReadDataBytes(uint64(bytes))
466+
d.meterRec.IncReadBytes(uint64(bytes))
467467
}
468468

469469
func (d *distTaskRowCntCollector) Processed(bytes, rowCnt int64) {
470470
d.summary.Bytes.Add(bytes)
471471
d.summary.RowCnt.Add(rowCnt)
472472
d.counter.Add(float64(rowCnt))
473-
d.meterRec.RecordWriteDataBytes(uint64(bytes))
473+
d.meterRec.IncWriteBytes(uint64(bytes))
474474
}

pkg/disttask/framework/metering/BUILD.bazel

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,23 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "metering",
5-
srcs = ["metering.go"],
5+
srcs = [
6+
"data.go",
7+
"metering.go",
8+
"recorder.go",
9+
],
610
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/metering",
711
visibility = ["//visibility:public"],
812
deps = [
913
"//pkg/config/kerneltype",
10-
"//pkg/kv",
14+
"//pkg/disttask/framework/proto",
1115
"//pkg/util/logutil",
1216
"@com_github_google_uuid//:uuid",
1317
"@com_github_pingcap_errors//:errors",
1418
"@com_github_pingcap_metering_sdk//common",
1519
"@com_github_pingcap_metering_sdk//config",
1620
"@com_github_pingcap_metering_sdk//storage",
21+
"@com_github_pingcap_metering_sdk//writer",
1722
"@com_github_pingcap_metering_sdk//writer/metering",
1823
"@org_uber_go_zap//:zap",
1924
],
@@ -22,15 +27,25 @@ go_library(
2227
go_test(
2328
name = "metering_test",
2429
timeout = "short",
25-
srcs = ["metering_test.go"],
30+
srcs = [
31+
"data_test.go",
32+
"metering_test.go",
33+
"recorder_test.go",
34+
],
2635
embed = [":metering"],
2736
flaky = True,
28-
shard_count = 5,
37+
shard_count = 10,
2938
deps = [
39+
"//pkg/config/kerneltype",
40+
"//pkg/disttask/framework/proto",
41+
"@com_github_pingcap_metering_sdk//common",
3042
"@com_github_pingcap_metering_sdk//config",
3143
"@com_github_pingcap_metering_sdk//reader/metering",
3244
"@com_github_pingcap_metering_sdk//storage",
3345
"@com_github_pingcap_metering_sdk//writer/metering",
46+
"@com_github_pingcap_metering_sdk//writer/mock",
3447
"@com_github_stretchr_testify//require",
48+
"@org_uber_go_mock//gomock",
49+
"@org_uber_go_zap//:zap",
3550
],
3651
)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package metering
16+
17+
import "fmt"
18+
19+
const (
20+
getRequestsField = "get_requests"
21+
putRequestsField = "put_requests"
22+
readBytesField = "read_bytes"
23+
writeBytesField = "write_bytes"
24+
)
25+
26+
// Data represents the metering data.
27+
// we might use this struct to store accumulated data.
28+
type Data struct {
29+
getRequests uint64
30+
putRequests uint64
31+
readBytes uint64
32+
writeBytes uint64
33+
34+
taskID int64
35+
keyspace string
36+
taskType string
37+
}
38+
39+
func (d *Data) equals(other *Data) bool {
40+
return d.getRequests == other.getRequests &&
41+
d.putRequests == other.putRequests &&
42+
d.readBytes == other.readBytes &&
43+
d.writeBytes == other.writeBytes
44+
}
45+
46+
func (d *Data) calMeterDataItem(other *Data) map[string]any {
47+
// since Data item is always monotonically increasing, so don't consider
48+
// negative delta here.
49+
if d.equals(other) {
50+
return nil
51+
}
52+
item := map[string]any{
53+
"version": "1",
54+
"cluster_id": d.keyspace,
55+
"source_name": category,
56+
"task_type": d.taskType,
57+
"task_id": d.taskID,
58+
}
59+
if d.getRequests > other.getRequests {
60+
item[getRequestsField] = d.getRequests - other.getRequests
61+
}
62+
if d.putRequests > other.putRequests {
63+
item[putRequestsField] = d.putRequests - other.putRequests
64+
}
65+
if d.readBytes > other.readBytes {
66+
item[readBytesField] = d.readBytes - other.readBytes
67+
}
68+
if d.writeBytes > other.writeBytes {
69+
item[writeBytesField] = d.writeBytes - other.writeBytes
70+
}
71+
return item
72+
}
73+
74+
// String implements fmt.Stringer interface.
75+
func (d *Data) String() string {
76+
return fmt.Sprintf("{taskID: %d, keyspace: %s, type: %s, getReqs: %d, putReqs: %d, readBytes: %d, writeBytes: %d}",
77+
d.taskID,
78+
d.keyspace,
79+
d.taskType,
80+
d.getRequests,
81+
d.putRequests,
82+
d.readBytes,
83+
d.writeBytes)
84+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package metering
16+
17+
import (
18+
"maps"
19+
"strconv"
20+
"testing"
21+
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
func TestDataEquals(t *testing.T) {
26+
cases := []struct {
27+
pair [2]Data
28+
equal bool
29+
}{
30+
{pair: [2]Data{{taskID: 1, getRequests: 1}, {taskID: 1, getRequests: 1}}, equal: true},
31+
{pair: [2]Data{{taskID: 1, getRequests: 1}, {taskID: 1, getRequests: 2}}, equal: false},
32+
{pair: [2]Data{{taskID: 1, putRequests: 1}, {taskID: 1, putRequests: 1}}, equal: true},
33+
{pair: [2]Data{{taskID: 1, putRequests: 1}, {taskID: 1, putRequests: 2}}, equal: false},
34+
// we only compare the data fields, not taskID
35+
{pair: [2]Data{{taskID: 1, getRequests: 1}, {taskID: 2, getRequests: 1}}, equal: true},
36+
}
37+
for i, c := range cases {
38+
t.Run(strconv.Itoa(i), func(t *testing.T) {
39+
require.Equal(t, c.equal, c.pair[0].equals(&c.pair[1]), "case %d failed", i)
40+
})
41+
}
42+
}
43+
44+
func TestDataCalMeterDataItem(t *testing.T) {
45+
currData := &Data{getRequests: 10, putRequests: 20, readBytes: 300, writeBytes: 400, taskID: 1, keyspace: "ks", taskType: "tt"}
46+
require.Nil(t, currData.calMeterDataItem(currData))
47+
getItemFn := func(in map[string]any) map[string]any {
48+
maps.Copy(in, map[string]any{
49+
"version": "1",
50+
"cluster_id": "ks",
51+
"source_name": category,
52+
"task_type": "tt",
53+
"task_id": int64(1),
54+
})
55+
return in
56+
}
57+
require.EqualValues(t, getItemFn(map[string]any{
58+
"get_requests": uint64(5),
59+
"put_requests": uint64(15),
60+
"read_bytes": uint64(200),
61+
"write_bytes": uint64(300),
62+
}), currData.calMeterDataItem(&Data{getRequests: 5, putRequests: 5, readBytes: 100, writeBytes: 100}))
63+
require.EqualValues(t, getItemFn(map[string]any{
64+
"get_requests": uint64(5),
65+
}), currData.calMeterDataItem(&Data{getRequests: 5, putRequests: 20, readBytes: 300, writeBytes: 400}))
66+
}

0 commit comments

Comments
 (0)