Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
20c31f4
*: workload repository init code (#1098)
xhebox Jul 24, 2024
18444f0
repository: create persistent tables table (#1220)
xhebox Aug 28, 2024
85fdc6d
repository: housekeeper thread (#1245)
xhebox Sep 13, 2024
def1c11
repository: add table create and insert statement generation support.…
wddevries Sep 24, 2024
b7a3832
repository: sample and snapshot threads (#1272)
xhebox Oct 8, 2024
585caca
repository: Add variables to control the sampling and snapshot interv…
wddevries Oct 18, 2024
6601452
repository: expose variables for users (#1353)
xhebox Oct 21, 2024
5c34c72
Add code to start owner compaign.
wddevries Oct 25, 2024
50e5681
repository: Separate the repository code from the domain. (#1342)
wddevries Oct 27, 2024
c440359
Fix bazel scripts.
wddevries Nov 6, 2024
d77e8fd
Use any instead of interface{} in worker.go.
wddevries Nov 6, 2024
7783930
Fix owner campaign patch.
wddevries Nov 7, 2024
3499a36
Remove unused error return value from Stop()
wddevries Nov 8, 2024
920ed62
Use tagged switch on dst.
wddevries Nov 8, 2024
91fd463
Check error result from res.Close() in worker.runQuery().
wddevries Nov 8, 2024
2ed51fa
Rename unused parameters to _.
wddevries Nov 8, 2024
1074f4c
Remove unused method receiver 'w' from several functions.
wddevries Nov 8, 2024
b280791
Fix type in description of comment for WR_VER field of HIST_SNAPSHOT.
wddevries Nov 8, 2024
0150dc3
Cancel() was removed from owner.Manger. Use Close() instead.
wddevries Nov 13, 2024
4a3a4a9
Remove unneeded return statement and unused method receivers.
wddevries Nov 13, 2024
94a5619
repository: unit test example (#1379)
xhebox Nov 13, 2024
28e5f18
Fix example tests.
wddevries Nov 13, 2024
0027670
Rework worker.stop() to fix race.
wddevries Nov 15, 2024
c0d62d7
Set owner.ManagerSessionTTL instead of starting a campaign in the uni…
wddevries Nov 15, 2024
4719db2
Move repository to pkg/util and rename it to workloadrepo. (#1418)
wddevries Nov 25, 2024
5368a64
Rewrite some loops to use range.
wddevries Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/tidb-server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"//pkg/util/tiflashcompute",
"//pkg/util/topsql",
"//pkg/util/versioninfo",
"//pkg/util/workloadrepo",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
3 changes: 3 additions & 0 deletions cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import (
"github.com/pingcap/tidb/pkg/util/tiflashcompute"
"github.com/pingcap/tidb/pkg/util/topsql"
"github.com/pingcap/tidb/pkg/util/versioninfo"
repository "github.com/pingcap/tidb/pkg/util/workloadrepo"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -318,6 +319,7 @@ func main() {
executor.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreDDLOwnerMgrAndDomain(keyspaceName)
repository.SetupRepository(dom)
svr := createServer(storage, dom)

exited := make(chan struct{})
Expand Down Expand Up @@ -921,6 +923,7 @@ func cleanup(svr *server.Server, storage kv.Storage, dom *domain.Domain) {
// See https://github.com/pingcap/tidb/issues/40038 for details.
svr.KillSysProcesses()
plugin.Shutdown(context.Background())
repository.StopRepository()
closeDDLOwnerMgrDomainAndStorage(storage, dom)
disk.CleanUp()
closeStmtSummary()
Expand Down
3 changes: 3 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4066,6 +4066,9 @@ var systemTables = map[string]struct{}{
}

func isUndroppableTable(schema, table string) bool {
if schema == "workload_schema" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use const in repository package. Removing depedency is great, but it will also be convenient when we want to know all occurences of schema table name by searching workloadSchemaCIStr.

return true
}
if schema != mysql.SystemDB {
return false
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,7 @@ func (do *Domain) LoadBindInfoLoop(ctxForHandle sessionctx.Context, ctxForEvolve
return err
}

owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
owner := do.NewOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
err = owner.CampaignOwner()
if err != nil {
logutil.BgLogger().Warn("campaign owner failed", zap.Error(err))
Expand Down Expand Up @@ -2337,7 +2337,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
}
variable.EnableStatsOwner = do.enableStatsOwner
variable.DisableStatsOwner = do.disableStatsOwner
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
do.statsOwner = do.NewOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
do.statsOwner.SetListener(owner.NewListenersWrapper(statsHandle, do.ddlNotifier))
if config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load() {
err := do.statsOwner.CampaignOwner()
Expand Down Expand Up @@ -2446,7 +2446,8 @@ func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
logutil.BgLogger().Info("start load stats sub workers", zap.Int("worker count", len(ctxList)))
}

func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
// NewOwnerManager returns the owner manager for use outside of the domain.
func (do *Domain) NewOwnerManager(prompt, ownerKey string) owner.Manager {
id := do.ddl.OwnerManager().ID()
var statsOwner owner.Manager
if do.etcdClient == nil {
Expand Down
56 changes: 56 additions & 0 deletions pkg/util/workloadrepo/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "workloadrepo",
srcs = [
"const.go",
"housekeeper.go",
"sampling.go",
"snapshot.go",
"table.go",
"utils.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/pkg/util/workloadrepo",
visibility = ["//visibility:public"],
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/owner",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/slice",
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "workloadrepo_test",
timeout = "short",
srcs = ["worker_test.go"],
embed = [":workloadrepo"],
flaky = True,
deps = [
"//pkg/domain",
"//pkg/kv",
"//pkg/owner",
"//pkg/parser/model",
"//pkg/testkit",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_server_v3//embed",
],
)
42 changes: 42 additions & 0 deletions pkg/util/workloadrepo/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workloadrepo

import (
"time"

"github.com/pingcap/tidb/pkg/parser/model"
)

const (
ownerKey = "/tidb/workloadrepo/owner"
promptKey = "workloadrepo"
snapIDKey = "/tidb/workloadrepo/snap_id"

etcdOpTimeout = 5 * time.Second

defSamplingInterval = 5
defSnapshotInterval = 3600
defRententionDays = 7

// WorkloadSchema is the name of database for workloadrepo worker.
WorkloadSchema = "WORKLOAD_SCHEMA"
histSnapshotsTable = "HIST_SNAPSHOTS"
)

var (
workloadSchemaCIStr = model.NewCIStr(WorkloadSchema)
zeroTime = time.Time{}
)
145 changes: 145 additions & 0 deletions pkg/util/workloadrepo/housekeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workloadrepo

import (
"context"
"fmt"
"strings"
"time"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"go.uber.org/zap"
)

func calcNextTick(now time.Time) time.Duration {
// only activated at 2am
next := time.Date(now.Year(), now.Month(), now.Day(), 2, 0, 0, 0, time.Local)
if !next.After(now) {
next = next.AddDate(0, 0, 1)
}
return next.Sub(now)
}

func createAllPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema) error {
sb := &strings.Builder{}
for _, tbl := range workloadTables {
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, model.NewCIStr(tbl.destTable))
if err != nil {
logutil.BgLogger().Info("workload repository cannot get table", zap.String("tbl", tbl.destTable), zap.NamedError("err", err))
return err
}
tbInfo := tbSchema.Meta()

sb.Reset()
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n ADD PARTITION (", WorkloadSchema, tbl.destTable)
if !generatePartitionRanges(sb, tbInfo) {
fmt.Fprintf(sb, ")")
_, err = execRetry(ctx, sess, sb.String())
if err != nil {
logutil.BgLogger().Info("workload repository cannot add partitions", zap.String("parts", sb.String()), zap.NamedError("err", err))
return err
}
}
}
return nil
}

func (w *worker) dropOldPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema, now time.Time) error {
w.Lock()
retention := int(w.retentionDays)
w.Unlock()

if retention == 0 {
// disabled housekeeping
return nil
}

sb := &strings.Builder{}
for _, tbl := range workloadTables {
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, model.NewCIStr(tbl.destTable))
if err != nil {
logutil.BgLogger().Info("workload repository cannot get table", zap.String("tbl", tbl.destTable), zap.NamedError("err", err))
continue
}
tbInfo := tbSchema.Meta()
for _, pt := range tbInfo.GetPartitionInfo().Definitions {
ot, err := time.Parse("p20060102", pt.Name.L)
if err != nil {
logutil.BgLogger().Info("workload repository cannot parse partition name", zap.String("part", pt.Name.L), zap.NamedError("err", err))
break
}
if int(now.Sub(ot).Hours()/24) < retention {
continue
}
sb.Reset()
sqlescape.MustFormatSQL(sb, "ALTER TABLE %s.%s DROP PARTITION %s",
WorkloadSchema, tbl.destTable, pt.Name.L)
_, err = execRetry(ctx, sess, sb.String())
if err != nil {
logutil.BgLogger().Info("workload repository cannot drop partition", zap.String("part", pt.Name.L), zap.NamedError("err", err))
break
}
}
}
return nil
}

func (w *worker) startHouseKeeper(ctx context.Context) func() {
return func() {
now := time.Now()
timer := time.NewTimer(calcNextTick(now))
defer timer.Stop()

_sessctx := w.getSessionWithRetry()
defer w.sesspool.Put(_sessctx)
sess := _sessctx.(sessionctx.Context)
for {
select {
case <-ctx.Done():
return
case now := <-timer.C:
// Owner only
if !w.owner.IsOwner() {
continue
}

// get the latest infoschema
is := sessiontxn.GetTxnManager(sess).GetTxnInfoSchema()

// create new partitions
if err := createAllPartitions(ctx, sess, is); err != nil {
continue
}

// drop old partitions
if err := w.dropOldPartitions(ctx, sess, is, now); err != nil {
continue
}

// reschedule, drain channel first
if !timer.Stop() {
<-timer.C
}
timer.Reset(calcNextTick(now))
}
}
}
}
Loading