Skip to content

Commit 1002491

Browse files
wddevriesxhebox
authored andcommitted
workload repo: Merge Workload Repository into master (pingcap#57148)
ref pingcap#57147
1 parent fea86c8 commit 1002491

File tree

13 files changed

+1382
-3
lines changed

13 files changed

+1382
-3
lines changed

cmd/tidb-server/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ go_library(
5656
"//pkg/util/tiflashcompute",
5757
"//pkg/util/topsql",
5858
"//pkg/util/versioninfo",
59+
"//pkg/util/workloadrepo",
5960
"@com_github_opentracing_opentracing_go//:opentracing-go",
6061
"@com_github_pingcap_errors//:errors",
6162
"@com_github_pingcap_failpoint//:failpoint",

cmd/tidb-server/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ import (
8080
"github.com/pingcap/tidb/pkg/util/tiflashcompute"
8181
"github.com/pingcap/tidb/pkg/util/topsql"
8282
"github.com/pingcap/tidb/pkg/util/versioninfo"
83+
repository "github.com/pingcap/tidb/pkg/util/workloadrepo"
8384
"github.com/prometheus/client_golang/prometheus"
8485
"github.com/prometheus/client_golang/prometheus/push"
8586
"github.com/tikv/client-go/v2/tikv"
@@ -318,6 +319,7 @@ func main() {
318319
executor.Start()
319320
resourcemanager.InstanceResourceManager.Start()
320321
storage, dom := createStoreDDLOwnerMgrAndDomain(keyspaceName)
322+
repository.SetupRepository(dom)
321323
svr := createServer(storage, dom)
322324

323325
exited := make(chan struct{})
@@ -921,6 +923,7 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain) {
921923
// See https://github.com/pingcap/tidb/issues/40038 for details.
922924
svr.KillSysProcesses()
923925
plugin.Shutdown(context.Background())
926+
repository.StopRepository()
924927
closeDDLOwnerMgrDomainAndStorage(storage, dom)
925928
disk.CleanUp()
926929
closeStmtSummary()

pkg/ddl/executor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4066,6 +4066,9 @@ var systemTables = map[string]struct{}{
40664066
}
40674067

40684068
func isUndroppableTable(schema, table string) bool {
4069+
if schema == "workload_schema" {
4070+
return true
4071+
}
40694072
if schema != mysql.SystemDB {
40704073
return false
40714074
}

pkg/domain/domain.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,7 +2024,7 @@ func (do *Domain) LoadBindInfoLoop(ctxForHandle sessionctx.Context, ctxForEvolve
20242024
return err
20252025
}
20262026

2027-
owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
2027+
owner := do.NewOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
20282028
err = owner.CampaignOwner()
20292029
if err != nil {
20302030
logutil.BgLogger().Warn("campaign owner failed", zap.Error(err))
@@ -2352,7 +2352,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
23522352
}
23532353
variable.EnableStatsOwner = do.enableStatsOwner
23542354
variable.DisableStatsOwner = do.disableStatsOwner
2355-
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
2355+
do.statsOwner = do.NewOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
23562356
do.statsOwner.SetListener(owner.NewListenersWrapper(statsHandle, do.ddlNotifier))
23572357
if config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load() {
23582358
err := do.statsOwner.CampaignOwner()
@@ -2458,7 +2458,8 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
24582458
logutil.BgLogger().Info("start load stats sub workers", zap.Int("worker count", len(ctxList)))
24592459
}
24602460

2461-
func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
2461+
// NewOwnerManager returns the owner manager for use outside of the domain.
2462+
func (do *Domain) NewOwnerManager(prompt, ownerKey string) owner.Manager {
24622463
id := do.ddl.OwnerManager().ID()
24632464
var statsOwner owner.Manager
24642465
if do.etcdClient == nil {

pkg/util/workloadrepo/BUILD.bazel

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "workloadrepo",
5+
srcs = [
6+
"const.go",
7+
"housekeeper.go",
8+
"sampling.go",
9+
"snapshot.go",
10+
"table.go",
11+
"utils.go",
12+
"worker.go",
13+
],
14+
importpath = "github.com/pingcap/tidb/pkg/util/workloadrepo",
15+
visibility = ["//visibility:public"],
16+
deps = [
17+
"//pkg/domain",
18+
"//pkg/domain/infosync",
19+
"//pkg/infoschema",
20+
"//pkg/kv",
21+
"//pkg/meta/model",
22+
"//pkg/owner",
23+
"//pkg/parser/model",
24+
"//pkg/sessionctx",
25+
"//pkg/sessionctx/variable",
26+
"//pkg/sessiontxn",
27+
"//pkg/util",
28+
"//pkg/util/chunk",
29+
"//pkg/util/logutil",
30+
"//pkg/util/slice",
31+
"//pkg/util/sqlescape",
32+
"//pkg/util/sqlexec",
33+
"@com_github_ngaut_pools//:pools",
34+
"@com_github_pingcap_errors//:errors",
35+
"@io_etcd_go_etcd_client_v3//:client",
36+
"@org_uber_go_zap//:zap",
37+
],
38+
)
39+
40+
go_test(
41+
name = "workloadrepo_test",
42+
timeout = "short",
43+
srcs = ["worker_test.go"],
44+
embed = [":workloadrepo"],
45+
flaky = True,
46+
deps = [
47+
"//pkg/domain",
48+
"//pkg/kv",
49+
"//pkg/owner",
50+
"//pkg/parser/model",
51+
"//pkg/testkit",
52+
"@com_github_stretchr_testify//require",
53+
"@io_etcd_go_etcd_client_v3//:client",
54+
"@io_etcd_go_etcd_server_v3//embed",
55+
],
56+
)

pkg/util/workloadrepo/const.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package workloadrepo
16+
17+
import (
18+
"time"
19+
20+
"github.com/pingcap/tidb/pkg/parser/model"
21+
)
22+
23+
const (
24+
ownerKey = "/tidb/workloadrepo/owner"
25+
promptKey = "workloadrepo"
26+
snapIDKey = "/tidb/workloadrepo/snap_id"
27+
28+
etcdOpTimeout = 5 * time.Second
29+
30+
defSamplingInterval = 5
31+
defSnapshotInterval = 3600
32+
defRententionDays = 7
33+
34+
// WorkloadSchema is the name of database for workloadrepo worker.
35+
WorkloadSchema = "WORKLOAD_SCHEMA"
36+
histSnapshotsTable = "HIST_SNAPSHOTS"
37+
)
38+
39+
var (
40+
workloadSchemaCIStr = model.NewCIStr(WorkloadSchema)
41+
zeroTime = time.Time{}
42+
)
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package workloadrepo
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"strings"
21+
"time"
22+
23+
"github.com/pingcap/tidb/pkg/infoschema"
24+
"github.com/pingcap/tidb/pkg/parser/model"
25+
"github.com/pingcap/tidb/pkg/sessionctx"
26+
"github.com/pingcap/tidb/pkg/sessiontxn"
27+
"github.com/pingcap/tidb/pkg/util/logutil"
28+
"github.com/pingcap/tidb/pkg/util/sqlescape"
29+
"go.uber.org/zap"
30+
)
31+
32+
func calcNextTick(now time.Time) time.Duration {
33+
// only activated at 2am
34+
next := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, time.Local)
35+
if !next.After(now) {
36+
next = next.AddDate(0, 0, 1)
37+
}
38+
return next.Sub(now)
39+
}
40+
41+
func createAllPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema) error {
42+
sb := &strings.Builder{}
43+
for _, tbl := range workloadTables {
44+
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, model.NewCIStr(tbl.destTable))
45+
if err != nil {
46+
logutil.BgLogger().Info("workload repository cannot get table", zap.String("tbl", tbl.destTable), zap.NamedError("err", err))
47+
return err
48+
}
49+
tbInfo := tbSchema.Meta()
50+
51+
sb.Reset()
52+
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n ADD PARTITION (", WorkloadSchema, tbl.destTable)
53+
if !generatePartitionRanges(sb, tbInfo) {
54+
fmt.Fprintf(sb, ")")
55+
_, err = execRetry(ctx, sess, sb.String())
56+
if err != nil {
57+
logutil.BgLogger().Info("workload repository cannot add partitions", zap.String("parts", sb.String()), zap.NamedError("err", err))
58+
return err
59+
}
60+
}
61+
}
62+
return nil
63+
}
64+
65+
func (w *worker) dropOldPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema, now time.Time) error {
66+
w.Lock()
67+
retention := int(w.retentionDays)
68+
w.Unlock()
69+
70+
if retention == 0 {
71+
// disabled housekeeping
72+
return nil
73+
}
74+
75+
sb := &strings.Builder{}
76+
for _, tbl := range workloadTables {
77+
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, model.NewCIStr(tbl.destTable))
78+
if err != nil {
79+
logutil.BgLogger().Info("workload repository cannot get table", zap.String("tbl", tbl.destTable), zap.NamedError("err", err))
80+
continue
81+
}
82+
tbInfo := tbSchema.Meta()
83+
for _, pt := range tbInfo.GetPartitionInfo().Definitions {
84+
ot, err := time.Parse("p20060102", pt.Name.L)
85+
if err != nil {
86+
logutil.BgLogger().Info("workload repository cannot parse partition name", zap.String("part", pt.Name.L), zap.NamedError("err", err))
87+
break
88+
}
89+
if int(now.Sub(ot).Hours()/24) < retention {
90+
continue
91+
}
92+
sb.Reset()
93+
sqlescape.MustFormatSQL(sb, "ALTER TABLE %s.%s DROP PARTITION %s",
94+
WorkloadSchema, tbl.destTable, pt.Name.L)
95+
_, err = execRetry(ctx, sess, sb.String())
96+
if err != nil {
97+
logutil.BgLogger().Info("workload repository cannot drop partition", zap.String("part", pt.Name.L), zap.NamedError("err", err))
98+
break
99+
}
100+
}
101+
}
102+
return nil
103+
}
104+
105+
func (w *worker) startHouseKeeper(ctx context.Context) func() {
106+
return func() {
107+
now := time.Now()
108+
timer := time.NewTimer(calcNextTick(now))
109+
defer timer.Stop()
110+
111+
_sessctx := w.getSessionWithRetry()
112+
defer w.sesspool.Put(_sessctx)
113+
sess := _sessctx.(sessionctx.Context)
114+
for {
115+
select {
116+
case <-ctx.Done():
117+
return
118+
case now := <-timer.C:
119+
// Owner only
120+
if !w.owner.IsOwner() {
121+
continue
122+
}
123+
124+
// get the latest infoschema
125+
is := sessiontxn.GetTxnManager(sess).GetTxnInfoSchema()
126+
127+
// create new partitions
128+
if err := createAllPartitions(ctx, sess, is); err != nil {
129+
continue
130+
}
131+
132+
// drop old partitions
133+
if err := w.dropOldPartitions(ctx, sess, is, now); err != nil {
134+
continue
135+
}
136+
137+
// reschedule, drain channel first
138+
if !timer.Stop() {
139+
<-timer.C
140+
}
141+
timer.Reset(calcNextTick(now))
142+
}
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)