Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/cmd: added operator checksum-as|force-flush #58801

Merged
merged 11 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5867,13 +5867,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "db34e3f94e5ac8fc5465b5440583c9e037a7f16aea8d0d8a200cdff210b12038",
strip_prefix = "github.com/pingcap/[email protected]20250102071301-c35d2b410115",
sha256 = "db08607b0c90f3909b66577e9c568d0cbd6b2825d287d7b5caab86ea6e4b60ad",
strip_prefix = "github.com/pingcap/[email protected]20250108041715-3b77f2c65c63",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250102071301-c35d2b410115.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip",
],
)
go_repository(
Expand Down
42 changes: 42 additions & 0 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func newOperatorCommand() *cobra.Command {
cmd.AddCommand(newBase64ifyCommand())
cmd.AddCommand(newListMigrationsCommand())
cmd.AddCommand(newMigrateToCommand())
cmd.AddCommand(newForceFlushCommand())
cmd.AddCommand(newChecksumCommand())
return cmd
}

Expand Down Expand Up @@ -109,3 +111,43 @@ func newMigrateToCommand() *cobra.Command {
operator.DefineFlagsForMigrateToConfig(cmd.Flags())
return cmd
}

func newChecksumCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "checksum-as",
Short: "calculate the checksum with rewrite rules",
Long: "Calculate the checksum of the current cluster (specified by `-u`) " +
"with applying the rewrite rules generated from a backup (specified by `-s`). " +
"This can be used when you have the checksum of upstream elsewhere.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ChecksumWithRewriteRulesConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunChecksumTable(ctx, tidbGlue, cfg)
},
}
task.DefineFilterFlags(cmd, []string{"!*.*"}, false)
operator.DefineFlagsForChecksumTableConfig(cmd.Flags())
return cmd
}

func newForceFlushCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "force-flush",
Short: "force a log backup task to flush",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ForceFlushConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunForceFlush(ctx, &cfg)
},
}
operator.DefineFlagsForForceFlushConfig(cmd.Flags())
return cmd
}
17 changes: 17 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,47 @@ go_library(
name = "operator",
srcs = [
"base64ify.go",
"checksum_table.go",
"config.go",
"force_flush.go",
"list_migration.go",
"migrate_to.go",
"prepare_snap.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/backup",
"//br/pkg/backup/prepare_snap",
"//br/pkg/checksum",
"//br/pkg/conn",
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/logutil",
"//br/pkg/metautil",
"//br/pkg/pdutil",
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/task",
"//br/pkg/utils",
"//pkg/domain",
"//pkg/meta/model",
"//pkg/util",
"//pkg/util/engine",
"@com_github_fatih_color//:color",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//opt",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
Expand Down
269 changes: 269 additions & 0 deletions br/pkg/task/operator/checksum_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
package operator

import (
"context"
"encoding/json"
"os"
"sync"
"sync/atomic"

"github.com/pingcap/errors"
backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/checksum"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/util"
"github.com/tikv/client-go/v2/oracle"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type checksumTableCtx struct {
cfg ChecksumWithRewriteRulesConfig

mgr *conn.Mgr
dom *domain.Domain
}

type tableInDB struct {
info *model.TableInfo
dbName string
}

func RunChecksumTable(ctx context.Context, g glue.Glue, cfg ChecksumWithRewriteRulesConfig) error {
c := &checksumTableCtx{cfg: cfg}

if err := c.init(ctx, g); err != nil {
return errors.Trace(err)
}

curr, err := c.getTables(ctx)
if err != nil {
return errors.Trace(err)
}

old, err := c.loadOldTableIDs(ctx)
if err != nil {
return errors.Trace(err)
}

reqs, err := c.genRequests(ctx, old, curr)
if err != nil {
return errors.Trace(err)
}

results, err := c.runChecksum(ctx, reqs)
if err != nil {
return errors.Trace(err)
}

for _, result := range results {
log.Info("Checksum result", zap.String("db", result.DBName), zap.String("table", result.TableName), zap.Uint64("checksum", result.Checksum),
zap.Uint64("total_bytes", result.TotalBytes), zap.Uint64("total_kvs", result.TotalKVs))
}

return json.NewEncoder(os.Stdout).Encode(results)
}

func (c *checksumTableCtx) init(ctx context.Context, g glue.Glue) error {
cfg := c.cfg
var err error
c.mgr, err = task.NewMgr(ctx, g, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.NormalVersionChecker)
if err != nil {
return err
}

c.dom, err = g.GetDomain(c.mgr.GetStorage())
if err != nil {
return err
}
return nil
}

func (c *checksumTableCtx) getTables(ctx context.Context) (res []tableInDB, err error) {
sch := c.dom.InfoSchema()
dbs := sch.AllSchemas()
for _, db := range dbs {
if !c.cfg.TableFilter.MatchSchema(db.Name.L) {
continue
}

tbls, err := sch.SchemaTableInfos(ctx, db.Name)
if err != nil {
return nil, errors.Annotatef(err, "failed to load data for db %s", db.Name)
}
for _, tbl := range tbls {
if !c.cfg.TableFilter.MatchTable(db.Name.L, tbl.Name.L) {
continue
}
log.Info("Added table from cluster.", zap.String("db", db.Name.L), zap.String("table", tbl.Name.L))
res = append(res, tableInDB{
info: tbl,
dbName: db.Name.L,
})
}
}

return
}

func (c *checksumTableCtx) loadOldTableIDs(ctx context.Context) (res []*metautil.Table, err error) {
_, strg, err := task.GetStorage(ctx, c.cfg.Storage, &c.cfg.Config)
if err != nil {
return nil, errors.Annotate(err, "failed to create storage")
}

mPath := metautil.MetaFile
metaContent, err := strg.ReadFile(ctx, mPath)
if err != nil {
return nil, errors.Annotatef(err, "failed to open metafile %s", mPath)
}

var backupMeta backup.BackupMeta
if err := backupMeta.Unmarshal(metaContent); err != nil {
return nil, errors.Annotate(err, "failed to parse backupmeta")
}

metaReader := metautil.NewMetaReader(&backupMeta, strg, &c.cfg.CipherInfo)

tblCh := make(chan *metautil.Table, 1024)
errCh := make(chan error, 1)
go func() {
if err := metaReader.ReadSchemasFiles(ctx, tblCh, metautil.SkipFiles, metautil.SkipStats); err != nil {
errCh <- errors.Annotate(err, "failed to read schema files")
}
close(tblCh)
}()

for {
select {
case err := <-errCh:
return nil, err
case tbl, ok := <-tblCh:
if !ok {
return
}
if !c.cfg.TableFilter.MatchTable(tbl.DB.Name.L, tbl.Info.Name.L) {
continue
}
log.Info("Added table from backup data.", zap.String("db", tbl.DB.Name.L), zap.String("table", tbl.Info.Name.L))
res = append(res, tbl)
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

type request struct {
copReq *checksum.Executor
tableName string
dbName string
}

func (c *checksumTableCtx) genRequests(ctx context.Context, bkup []*metautil.Table, curr []tableInDB) (reqs []request, err error) {
phy, logi, err := c.mgr.GetPDClient().GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "failed to get TSO for checksumming")
}
tso := oracle.ComposeTS(phy, logi)

bkupTbls := map[string]map[string]*metautil.Table{}
for _, t := range bkup {
m, ok := bkupTbls[t.DB.Name.L]
if !ok {
m = make(map[string]*metautil.Table)
bkupTbls[t.DB.Name.L] = m
}

m[t.Info.Name.L] = t
}

for _, t := range curr {
rb := checksum.NewExecutorBuilder(t.info, tso)
rb.SetConcurrency(c.cfg.ChecksumConcurrency)
oldDB, ok := bkupTbls[t.dbName]
if !ok {
log.Warn("db not found, will skip", zap.String("db", t.dbName))
continue
}
oldTable, ok := oldDB[t.info.Name.L]
if !ok {
log.Warn("table not found, will skip", zap.String("db", t.dbName), zap.String("table", t.info.Name.L))
continue
}

rb.SetOldTable(oldTable)
rb.SetExplicitRequestSourceType(kvutil.ExplicitTypeBR)
req, err := rb.Build()
if err != nil {
return nil, errors.Annotatef(err, "failed to build checksum builder for table %s.%s", t.dbName, t.info.Name.L)
}
reqs = append(reqs, request{
copReq: req,
dbName: t.dbName,
tableName: t.info.Name.L,
})
}

return
}

type ChecksumResult struct {
DBName string `json:"db_name"`
TableName string `json:"table_name"`

Checksum uint64 `json:"checksum"`
TotalBytes uint64 `json:"total_bytes"`
TotalKVs uint64 `json:"total_kvs"`
}

func (c *checksumTableCtx) runChecksum(ctx context.Context, reqs []request) ([]ChecksumResult, error) {
wkPool := util.NewWorkerPool(c.cfg.TableConcurrency, "checksum")
eg, ectx := errgroup.WithContext(ctx)
results := make([]ChecksumResult, 0, len(reqs))
resultsMu := new(sync.Mutex)

for _, req := range reqs {
wkPool.ApplyOnErrorGroup(eg, func() error {
total := req.copReq.Len()
finished := new(atomic.Int64)
resp, err := req.copReq.Execute(ectx, c.mgr.GetStorage().GetClient(), func() {
finished.Add(1)
log.Info(
"Finish one request of a table.",
zap.String("db", req.dbName),
zap.String("table", req.tableName),
zap.Int64("finished", finished.Load()),
zap.Int64("total", int64(total)),
)
})
if err != nil {
return err
}
res := ChecksumResult{
DBName: req.dbName,
TableName: req.tableName,

Checksum: resp.Checksum,
TotalBytes: resp.TotalBytes,
TotalKVs: resp.TotalKvs,
}
resultsMu.Lock()
results = append(results, res)
resultsMu.Unlock()
return nil
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

return results, nil
}
Loading
Loading