From 45d0c5bc605467097baa12470c40aabd92944310 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 8 Jan 2025 16:50:54 +0800 Subject: [PATCH 1/9] added operator checksum-as|force-flush --- DEPS.bzl | 12 +- br/cmd/br/operator.go | 42 ++++ br/pkg/task/BUILD.bazel | 3 + br/pkg/task/common.go | 4 + br/pkg/task/operator/BUILD.bazel | 17 ++ br/pkg/task/operator/checksum_table.go | 271 +++++++++++++++++++++++++ br/pkg/task/operator/config.go | 90 ++++++-- br/pkg/task/operator/force_flush.go | 76 +++++++ br/pkg/task/operator/list_migration.go | 8 +- br/pkg/task/operator/migrate_to.go | 20 +- br/pkg/task/operator/prepare_snap.go | 20 ++ br/pkg/task/restore.go | 35 +++- br/pkg/task/stream.go | 107 +++++++--- go.mod | 2 +- go.sum | 4 +- 15 files changed, 642 insertions(+), 69 deletions(-) create mode 100644 br/pkg/task/operator/checksum_table.go create mode 100644 br/pkg/task/operator/force_flush.go diff --git a/DEPS.bzl b/DEPS.bzl index bf4ee0e36d695..947c79bbfb1b1 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5867,13 +5867,13 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sha256 = "92a67bcc499c06fd3d76cc153362540b22eaf1b09c4bda62a1599ce876b8ed78", - strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20241120071417-b5b7843d9037", + sha256 = "db08607b0c90f3909b66577e9c568d0cbd6b2825d287d7b5caab86ea6e4b60ad", + strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20250108041715-3b77f2c65c63", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", ], ) go_repository( diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index 4e41adeab329f..abd0156a5457b 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -35,6 +35,8 @@ func newOperatorCommand() *cobra.Command { cmd.AddCommand(newBase64ifyCommand()) cmd.AddCommand(newListMigrationsCommand()) cmd.AddCommand(newMigrateToCommand()) + cmd.AddCommand(newForceFlushCommand()) + cmd.AddCommand(newChecksumCommand()) return cmd } @@ -109,3 +111,43 @@ func newMigrateToCommand() *cobra.Command { operator.DefineFlagsForMigrateToConfig(cmd.Flags()) return cmd } + +func newChecksumCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "checksum-as", + Short: "calculate the checksum with rewrite rules", + Long: "Calculate the checksum of the current cluster (specified by `-u`) " + + "with applying the rewrite rules generated from a backup (specified by `-s`). " + + "This can be used when you have the checksum of upstream elsewhere.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := operator.ChecksumWithRewriteRulesConfig{} + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err + } + ctx := GetDefaultContext() + return operator.RunChecksumTable(ctx, tidbGlue, cfg) + }, + } + task.DefineFilterFlags(cmd, []string{"!*.*"}, false) + operator.DefineFlagsForChecksumTableConfig(cmd.Flags()) + return cmd +} + +func newForceFlushCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "force-flush", + Short: "force a log backup task to flush", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := operator.ForceFlushConfig{} + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err + } + ctx := GetDefaultContext() + return operator.RunForceFlush(ctx, &cfg) + }, + } + operator.DefineFlagsForForceFlushConfig(cmd.Flags()) + return cmd +} diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index d6d4aaaf0291e..2fde9e8981389 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -50,6 +50,7 @@ go_library( "//br/pkg/streamhelper/daemon", "//br/pkg/summary", "//br/pkg/utils", + "//br/pkg/utils/iter", "//br/pkg/version", "//pkg/config", "//pkg/ddl", @@ -58,6 +59,7 @@ go_library( "//pkg/infoschema/context", "//pkg/kv", "//pkg/meta/model", + "//pkg/metrics", "//pkg/parser/ast", "//pkg/parser/mysql", "//pkg/sessionctx/stmtctx", @@ -98,6 +100,7 @@ go_library( "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", ], ) diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 1813741634609..c95bee08c59d8 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -596,6 +596,10 @@ func (cfg *Config) normalizePDURLs() error { return nil } +func (cfg *Config) UserFiltered() bool { + return len(cfg.Schemas) != 0 || len(cfg.Tables) != 0 || len(cfg.FilterStr) != 0 +} + // ParseFromFlags parses the config from the flag set. func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { var err error diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index 14760027a49b8..6d232d6c36bf0 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -4,7 +4,9 @@ go_library( name = "operator", srcs = [ "base64ify.go", + "checksum_table.go", "config.go", + "force_flush.go", "list_migration.go", "migrate_to.go", "prepare_snap.go", @@ -12,22 +14,37 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/task/operator", visibility = ["//visibility:public"], deps = [ + "//br/pkg/backup", "//br/pkg/backup/prepare_snap", + "//br/pkg/checksum", + "//br/pkg/conn", "//br/pkg/errors", "//br/pkg/glue", "//br/pkg/logutil", + "//br/pkg/metautil", "//br/pkg/pdutil", "//br/pkg/storage", "//br/pkg/stream", "//br/pkg/task", "//br/pkg/utils", + "//pkg/domain", + "//pkg/meta/model", + "//pkg/util", + "//pkg/util/engine", "@com_github_fatih_color//:color", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/logbackuppb", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//opt", + "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//keepalive", "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", diff --git a/br/pkg/task/operator/checksum_table.go b/br/pkg/task/operator/checksum_table.go new file mode 100644 index 0000000000000..dbf9d751bd370 --- /dev/null +++ b/br/pkg/task/operator/checksum_table.go @@ -0,0 +1,271 @@ +package operator + +import ( + "context" + "encoding/json" + "os" + "sync" + "sync/atomic" + + kvutil "github.com/tikv/client-go/v2/util" + "golang.org/x/sync/errgroup" + + "github.com/pingcap/errors" + backup "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/checksum" + "github.com/pingcap/tidb/br/pkg/conn" + "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/util" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" +) + +type checksumTableCtx struct { + cfg ChecksumWithRewriteRulesConfig + + mgr *conn.Mgr + dom *domain.Domain +} + +type tableInDB struct { + info *model.TableInfo + dbName string +} + +func RunChecksumTable(ctx context.Context, g glue.Glue, cfg ChecksumWithRewriteRulesConfig) error { + c := &checksumTableCtx{cfg: cfg} + + if err := c.init(ctx, g); err != nil { + return errors.Trace(err) + } + + curr, err := c.getTables(ctx) + if err != nil { + return errors.Trace(err) + } + + old, err := c.loadOldTableIDs(ctx) + if err != nil { + return errors.Trace(err) + } + + reqs, err := c.genRequests(ctx, old, curr) + if err != nil { + return errors.Trace(err) + } + + results, err := c.runChecksum(ctx, reqs) + if err != nil { + return errors.Trace(err) + } + + for _, result := range results { + log.Info("Checksum result", zap.String("db", result.DBName), zap.String("table", result.TableName), zap.Uint64("checksum", result.Checksum), + zap.Uint64("total_bytes", result.TotalBytes), zap.Uint64("total_kvs", result.TotalKVs)) + } + + return json.NewEncoder(os.Stdout).Encode(results) +} + +func (c *checksumTableCtx) init(ctx context.Context, g glue.Glue) error { + cfg := c.cfg + var err error + c.mgr, err = task.NewMgr(ctx, g, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.NormalVersionChecker) + if err != nil { + return err + } + + c.dom, err = g.GetDomain(c.mgr.GetStorage()) + if err != nil { + return err + } + return nil +} + +func (c *checksumTableCtx) getTables(ctx context.Context) (res []tableInDB, err error) { + sch := c.dom.InfoSchema() + dbs := sch.AllSchemas() + for _, db := range dbs { + if !c.cfg.TableFilter.MatchSchema(db.Name.L) { + continue + } + + tbls, err := sch.SchemaTableInfos(ctx, db.Name) + if err != nil { + return nil, errors.Annotatef(err, "failed to load data for db %s", db.Name) + } + for _, tbl := range tbls { + if !c.cfg.TableFilter.MatchTable(db.Name.L, tbl.Name.L) { + continue + } + log.Info("Added table from cluster.", zap.String("db", db.Name.L), zap.String("table", tbl.Name.L)) + res = append(res, tableInDB{ + info: tbl, + dbName: db.Name.L, + }) + } + } + + return +} + +func (c *checksumTableCtx) loadOldTableIDs(ctx context.Context) (res []*metautil.Table, err error) { + _, strg, err := task.GetStorage(ctx, c.cfg.Storage, &c.cfg.Config) + if err != nil { + return nil, errors.Annotate(err, "failed to create storage") + } + + mPath := metautil.MetaFile + metaContent, err := strg.ReadFile(ctx, mPath) + if err != nil { + return nil, errors.Annotatef(err, "failed to open metafile %s", mPath) + } + + var backupMeta backup.BackupMeta + if err := backupMeta.Unmarshal(metaContent); err != nil { + return nil, errors.Annotate(err, "failed to parse backupmeta") + } + + metaReader := metautil.NewMetaReader(&backupMeta, strg, &c.cfg.CipherInfo) + + tblCh := make(chan *metautil.Table, 1024) + errCh := make(chan error, 1) + go func() { + if err := metaReader.ReadSchemasFiles(ctx, tblCh, metautil.SkipFiles, metautil.SkipStats); err != nil { + errCh <- errors.Annotate(err, "failed to read schema files") + } + close(tblCh) + }() + + for { + select { + case err := <-errCh: + return nil, err + case tbl, ok := <-tblCh: + if !ok { + return + } + if !c.cfg.TableFilter.MatchTable(tbl.DB.Name.L, tbl.Info.Name.L) { + continue + } + log.Info("Added table from backup data.", zap.String("db", tbl.DB.Name.L), zap.String("table", tbl.Info.Name.L)) + res = append(res, tbl) + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +type request struct { + copReq *checksum.Executor + tableName string + dbName string +} + +func (c *checksumTableCtx) genRequests(ctx context.Context, bkup []*metautil.Table, curr []tableInDB) (reqs []request, err error) { + phy, logi, err := c.mgr.GetPDClient().GetTS(ctx) + if err != nil { + return nil, errors.Annotate(err, "failed to get TSO for checksumming") + } + tso := oracle.ComposeTS(phy, logi) + + bkupTbls := map[string]map[string]*metautil.Table{} + for _, t := range bkup { + m, ok := bkupTbls[t.DB.Name.L] + if !ok { + m = make(map[string]*metautil.Table) + bkupTbls[t.DB.Name.L] = m + } + + m[t.Info.Name.L] = t + } + + for _, t := range curr { + rb := checksum.NewExecutorBuilder(t.info, tso) + rb.SetConcurrency(c.cfg.ChecksumConcurrency) + oldDB, ok := bkupTbls[t.dbName] + if !ok { + log.Warn("db not found, will skip", zap.String("db", t.dbName)) + continue + } + oldTable, ok := oldDB[t.info.Name.L] + if !ok { + log.Warn("table not found, will skip", zap.String("db", t.dbName), zap.String("table", t.info.Name.L)) + continue + } + + rb.SetOldTable(oldTable) + rb.SetExplicitRequestSourceType(kvutil.ExplicitTypeBR) + req, err := rb.Build() + if err != nil { + return nil, errors.Annotatef(err, "failed to build checksum builder for table %s.%s", t.dbName, t.info.Name.L) + } + reqs = append(reqs, request{ + copReq: req, + dbName: t.dbName, + tableName: t.info.Name.L, + }) + } + + return +} + +type ChecksumResult struct { + DBName string `json:"db_name"` + TableName string `json:"table_name"` + + Checksum uint64 `json:"checksum"` + TotalBytes uint64 `json:"total_bytes"` + TotalKVs uint64 `json:"total_kvs"` +} + +func (c *checksumTableCtx) runChecksum(ctx context.Context, reqs []request) ([]ChecksumResult, error) { + wkPool := util.NewWorkerPool(c.cfg.TableConcurrency, "checksum") + eg, ectx := errgroup.WithContext(ctx) + results := make([]ChecksumResult, 0, len(reqs)) + resultsMu := new(sync.Mutex) + + for _, req := range reqs { + req := req + wkPool.ApplyOnErrorGroup(eg, func() error { + total := req.copReq.Len() + finished := new(atomic.Int64) + resp, err := req.copReq.Execute(ectx, c.mgr.GetStorage().GetClient(), func() { + finished.Add(1) + log.Info( + "Finish one request of a table.", + zap.String("db", req.dbName), + zap.String("table", req.tableName), + zap.Int64("finished", finished.Load()), + zap.Int64("total", int64(total)), + ) + }) + if err != nil { + return err + } + res := ChecksumResult{ + DBName: req.dbName, + TableName: req.tableName, + + Checksum: resp.Checksum, + TotalBytes: resp.TotalBytes, + TotalKVs: resp.TotalKvs, + } + resultsMu.Lock() + results = append(results, res) + resultsMu.Unlock() + return nil + }) + } + + if err := eg.Wait(); err != nil { + return nil, err + } + + return results, nil +} diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index c42382abe504d..03996beed3011 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -3,15 +3,32 @@ package operator import ( + "regexp" "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/backup" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/task" "github.com/spf13/pflag" ) +const ( + flagTableConcurrency = "table-concurrency" + flagStorePatterns = "stores" + flagTTL = "ttl" + flagSafePoint = "safepoint" + flagStorage = "storage" + flagLoadCreds = "load-creds" + flagJSON = "json" + flagRecent = "recent" + flagTo = "to" + flagBase = "base" + flagYes = "yes" + flagDryRun = "dry-run" +) + type PauseGcConfig struct { task.Config @@ -23,8 +40,8 @@ type PauseGcConfig struct { } func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) { - _ = f.DurationP("ttl", "i", 2*time.Minute, "The time-to-live of the safepoint.") - _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") + _ = f.DurationP(flagTTL, "i", 2*time.Minute, "The time-to-live of the safepoint.") + _ = f.Uint64P(flagSafePoint, "t", 0, "The GC safepoint to be kept.") } // ParseFromFlags fills the config via the flags. @@ -34,11 +51,11 @@ func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error { } var err error - cfg.SafePoint, err = flags.GetUint64("safepoint") + cfg.SafePoint, err = flags.GetUint64(flagSafePoint) if err != nil { return err } - cfg.TTL, err = flags.GetDuration("ttl") + cfg.TTL, err = flags.GetDuration(flagTTL) if err != nil { return err } @@ -54,8 +71,8 @@ type Base64ifyConfig struct { func DefineFlagsForBase64ifyConfig(flags *pflag.FlagSet) { storage.DefineFlags(flags) - flags.StringP("storage", "s", "", "The external storage input.") - flags.Bool("load-creds", false, "whether loading the credientials from current environment and marshal them to the base64 string. [!]") + flags.StringP(flagStorage, "s", "", "The external storage input.") + flags.Bool(flagLoadCreds, false, "whether loading the credientials from current environment and marshal them to the base64 string. [!]") } func (cfg *Base64ifyConfig) ParseFromFlags(flags *pflag.FlagSet) error { @@ -64,11 +81,11 @@ func (cfg *Base64ifyConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return err } - cfg.StorageURI, err = flags.GetString("storage") + cfg.StorageURI, err = flags.GetString(flagStorage) if err != nil { return err } - cfg.LoadCerd, err = flags.GetBool("load-creds") + cfg.LoadCerd, err = flags.GetBool(flagLoadCreds) if err != nil { return err } @@ -83,8 +100,8 @@ type ListMigrationConfig struct { func DefineFlagsForListMigrationConfig(flags *pflag.FlagSet) { storage.DefineFlags(flags) - flags.StringP("storage", "s", "", "the external storage input.") - flags.Bool("json", false, "output the result in json format.") + flags.StringP(flagStorage, "s", "", "the external storage input.") + flags.Bool(flagJSON, false, "output the result in json format.") } func (cfg *ListMigrationConfig) ParseFromFlags(flags *pflag.FlagSet) error { @@ -93,11 +110,11 @@ func (cfg *ListMigrationConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return err } - cfg.StorageURI, err = flags.GetString("storage") + cfg.StorageURI, err = flags.GetString(flagStorage) if err != nil { return err } - cfg.JSONOutput, err = flags.GetBool("json") + cfg.JSONOutput, err = flags.GetBool(flagJSON) if err != nil { return err } @@ -115,15 +132,6 @@ type MigrateToConfig struct { DryRun bool } -const ( - flagStorage = "storage" - flagRecent = "recent" - flagTo = "to" - flagBase = "base" - flagYes = "yes" - flagDryRun = "dry-run" -) - func DefineFlagsForMigrateToConfig(flags *pflag.FlagSet) { storage.DefineFlags(flags) flags.StringP(flagStorage, "s", "", "the external storage input.") @@ -180,3 +188,43 @@ func (cfg *MigrateToConfig) Verify() error { } return nil } + +type ForceFlushConfig struct { + task.Config + + StoresPattern *regexp.Regexp +} + +func DefineFlagsForForceFlushConfig(f *pflag.FlagSet) { + f.String(flagStorePatterns, ".*", "The regexp to match the store peer address to be force flushed.") +} + +func (cfg *ForceFlushConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) { + storePat, err := flags.GetString(flagStorePatterns) + if err != nil { + return err + } + cfg.StoresPattern, err = regexp.Compile(storePat) + if err != nil { + return errors.Annotatef(err, "invalid expression in --%s", flagStorePatterns) + } + + return cfg.Config.ParseFromFlags(flags) +} + +type ChecksumWithRewriteRulesConfig struct { + task.Config +} + +func DefineFlagsForChecksumTableConfig(f *pflag.FlagSet) { + f.Uint(flagTableConcurrency, backup.DefaultSchemaConcurrency, "The size of a BR thread pool used for backup table metas, "+ + "including tableInfo/checksum and stats.") +} + +func (cfg *ChecksumWithRewriteRulesConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) { + cfg.TableConcurrency, err = flags.GetUint(flagTableConcurrency) + if err != nil { + return + } + return cfg.Config.ParseFromFlags(flags) +} diff --git a/br/pkg/task/operator/force_flush.go b/br/pkg/task/operator/force_flush.go new file mode 100644 index 0000000000000..4f9f622812c5b --- /dev/null +++ b/br/pkg/task/operator/force_flush.go @@ -0,0 +1,76 @@ +package operator + +import ( + "context" + "slices" + + "github.com/pingcap/errors" + logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/util/engine" + pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +func getAllTiKVs(ctx context.Context, p pd.Client) ([]*metapb.Store, error) { + stores, err := p.GetAllStores(ctx, opt.WithExcludeTombstone()) + if err != nil { + return nil, err + } + withoutTiFlash := slices.DeleteFunc(stores, engine.IsTiFlash) + return withoutTiFlash, err +} + +func RunForceFlush(ctx context.Context, cfg *ForceFlushConfig) error { + pdMgr, err := dialPD(ctx, &cfg.Config) + if err != nil { + return err + } + defer pdMgr.Close() + + stores, err := createStoreManager(pdMgr.GetPDClient(), &cfg.Config) + if err != nil { + return err + } + defer stores.Close() + + tikvs, err := getAllTiKVs(ctx, pdMgr.GetPDClient()) + if err != nil { + return err + } + eg, ectx := errgroup.WithContext(ctx) + log.Info("About to start force flushing.", zap.Stringer("stores-pattern", cfg.StoresPattern)) + for _, s := range tikvs { + s := s + if !cfg.StoresPattern.MatchString(s.Address) { + log.Info("Skipping not matched TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address)) + } + log.Info("Starting force flush TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address)) + eg.Go(func() error { + var logBackupCli logbackup.LogBackupClient + err := stores.WithConn(ectx, s.GetId(), func(cc *grpc.ClientConn) { + logBackupCli = logbackup.NewLogBackupClient(cc) + }) + if err != nil { + return err + } + + resp, err := logBackupCli.FlushNow(ectx, &logbackup.FlushNowRequest{}) + if err != nil { + return errors.Annotatef(err, "failed to flush store %d", s.GetId()) + } + for _, res := range resp.Results { + if !res.Success { + return errors.Errorf("failed to flush task %s at store %d: %s", res.TaskName, s.GetId(), res.ErrorMessage) + } + log.Info("Force flushed task of TiKV store.", zap.Uint64("store", s.Id), zap.String("task", res.TaskName)) + } + return nil + }) + } + return eg.Wait() +} diff --git a/br/pkg/task/operator/list_migration.go b/br/pkg/task/operator/list_migration.go index d6c7efd57197a..1e030d7e0f3d8 100644 --- a/br/pkg/task/operator/list_migration.go +++ b/br/pkg/task/operator/list_migration.go @@ -26,8 +26,8 @@ func RunListMigrations(ctx context.Context, cfg ListMigrationConfig) error { if err != nil { return err } - ext := stream.MigerationExtension(st) - migs, err := ext.Load(ctx) + ext := stream.MigrationExtension(st) + migs, err := ext.Load(ctx, stream.MLNotFoundIsErr()) if err != nil { return err } @@ -40,12 +40,12 @@ func RunListMigrations(ctx context.Context, cfg ListMigrationConfig) error { console.Println(statusOK(fmt.Sprintf("Total %d Migrations.", len(migs.Layers)+1))) console.Printf("> BASE <\n") tbl := console.CreateTable() - stream.AddMigrationToTable(migs.Base, tbl) + ext.AddMigrationToTable(ctx, migs.Base, tbl) tbl.Print() for _, t := range migs.Layers { console.Printf("> %08d <\n", t.SeqNum) tbl := console.CreateTable() - stream.AddMigrationToTable(&t.Content, tbl) + ext.AddMigrationToTable(ctx, &t.Content, tbl) tbl.Print() } } diff --git a/br/pkg/task/operator/migrate_to.go b/br/pkg/task/operator/migrate_to.go index 282e82784ecb9..2a086b9868db1 100644 --- a/br/pkg/task/operator/migrate_to.go +++ b/br/pkg/task/operator/migrate_to.go @@ -5,7 +5,7 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" + backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" @@ -39,16 +39,16 @@ func (cx migrateToCtx) printErr(errs []error, msg string) { } } -func (cx migrateToCtx) askForContinue(targetMig *backuppb.Migration) bool { +func (cx migrateToCtx) askForContinue(ctx context.Context, targetMig *backup.Migration) bool { tbl := cx.console.CreateTable() - stream.AddMigrationToTable(targetMig, tbl) + cx.est.AddMigrationToTable(ctx, targetMig, tbl) cx.console.Println("The migration going to be executed will be like: ") tbl.Print() return cx.console.PromptBool("Continue? ") } -func (cx migrateToCtx) dryRun(f func(stream.MigrationExt) stream.MergeAndMigratedTo) error { +func (cx migrateToCtx) dryRun(ctx context.Context, f func(stream.MigrationExt) stream.MergeAndMigratedTo) error { var ( est = cx.est console = cx.console @@ -60,7 +60,7 @@ func (cx migrateToCtx) dryRun(f func(stream.MigrationExt) stream.MergeAndMigrate }) tbl := console.CreateTable() - stream.AddMigrationToTable(estBase.NewBase, tbl) + cx.est.AddMigrationToTable(ctx, estBase.NewBase, tbl) console.Println("The new BASE migration will be like: ") tbl.Print() file, err := storage.SaveJSONEffectsToTmp(effects) @@ -90,7 +90,7 @@ func RunMigrateTo(ctx context.Context, cfg MigrateToConfig) error { console := glue.ConsoleOperations{ConsoleGlue: glue.StdIOGlue{}} - est := stream.MigerationExtension(st) + est := stream.MigrationExtension(st) est.Hooks = stream.NewProgressBarHooks(console) migs, err := est.Load(ctx) if err != nil { @@ -120,12 +120,14 @@ func RunMigrateTo(ctx context.Context, cfg MigrateToConfig) error { return nil } if cfg.DryRun { - run = cx.dryRun + run = func(f func(stream.MigrationExt) stream.MergeAndMigratedTo) error { + return cx.dryRun(ctx, f) + } } return run(func(est stream.MigrationExt) stream.MergeAndMigratedTo { - return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { - return cfg.Yes || cx.askForContinue(m) + return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backup.Migration) bool { + return cfg.Yes || cx.askForContinue(ctx, m) })) }) } diff --git a/br/pkg/task/operator/prepare_snap.go b/br/pkg/task/operator/prepare_snap.go index 2f846e2ac9dc2..4bf6ed5b1b8e9 100644 --- a/br/pkg/task/operator/prepare_snap.go +++ b/br/pkg/task/operator/prepare_snap.go @@ -19,12 +19,32 @@ import ( "github.com/pingcap/tidb/br/pkg/task" "github.com/pingcap/tidb/br/pkg/utils" "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc/keepalive" ) +func createStoreManager(pd pd.Client, cfg *task.Config) (*utils.StoreManager, error) { + var ( + tconf *tls.Config + err error + ) + + if cfg.TLS.IsEnabled() { + tconf, err = cfg.TLS.ToTLSConfig() + if err != nil { + return nil, errors.Annotate(err, "invalid tls config") + } + } + kvMgr := utils.NewStoreManager(pd, keepalive.ClientParameters{ + Time: cfg.GRPCKeepaliveTime, + Timeout: cfg.GRPCKeepaliveTimeout, + }, tconf) + return kvMgr, nil +} + func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { var tc *tls.Config if cfg.TLS.IsEnabled() { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 1cb3fd9e92fe7..9b575bcea6c4b 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -15,7 +15,9 @@ import ( "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/checkpoint" pconfig "github.com/pingcap/tidb/br/pkg/config" @@ -38,7 +40,9 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/engine" "github.com/spf13/cobra" @@ -276,6 +280,10 @@ type RestoreConfig struct { UseFSR bool `json:"use-fsr" toml:"use-fsr"` } +func (r *RestoreConfig) LocalEncryptionEnabled() bool { + return r.CipherInfo.CipherType != encryptionpb.EncryptionMethod_PLAINTEXT +} + // DefineRestoreFlags defines common flags for the restore tidb command. func DefineRestoreFlags(flags *pflag.FlagSet) { flags.Bool(flagNoSchema, false, "skip creating schemas and tables, reuse existing empty ones") @@ -665,6 +673,12 @@ func DefaultRestoreConfig(commonConfig Config) RestoreConfig { return cfg } +func printRestoreMetrics() { + log.Info("Metric: import_file_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreImportFileSeconds))) + log.Info("Metric: upload_sst_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTForPiTRSeconds))) + log.Info("Metric: upload_sst_meta_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTMetaForPiTRSeconds))) +} + // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { etcdCLI, err := dialEtcdWithCfg(c, cfg.Config) @@ -676,7 +690,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf log.Error("failed to close the etcd client", zap.Error(err)) } }() - if err := checkTaskExists(c, cfg, etcdCLI); err != nil { + if err := checkConflictingLogBackup(c, cfg, etcdCLI); err != nil { return errors.Annotate(err, "failed to check task exists") } closeF, err := registerTaskToPD(c, etcdCLI) @@ -698,6 +712,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } defer mgr.Close() + defer printRestoreMetrics() + var restoreError error if IsStreamRestore(cmdName) { if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBRPiTR); err != nil { @@ -786,14 +802,15 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s // Init DB connection sessions err = client.Init(g, mgr.GetStorage()) defer client.Close() - if err != nil { return errors.Trace(err) } + u, s, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config) if err != nil { return errors.Trace(err) } + if cfg.CheckRequirements { err := checkIncompatibleChangefeed(ctx, backupMeta.EndVersion, mgr.GetDomain().GetEtcdClient()) log.Info("Checking incompatible TiCDC changefeeds before restoring.", @@ -933,6 +950,15 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s }() } + err = client.InstallPiTRSupport(ctx, snapclient.PiTRCollDep{ + PDCli: mgr.GetPDClient(), + EtcdCli: mgr.GetDomain().GetEtcdClient(), + Storage: util.ProtoV1Clone(u), + }) + if err != nil { + return errors.Trace(err) + } + sp := utils.BRServiceSafePoint{ BackupTS: restoreTS, TTL: utils.DefaultBRGCSafePointTTL, @@ -1161,6 +1187,11 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s return errors.Trace(err) } + failpoint.InjectCall("run-snapshot-restore-about-to-finish", &err) + if err != nil { + return err + } + schedulersRemovable = true // Set task summary to success status. diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 0c22db685ce16..2ee1713f4f6c3 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -54,13 +54,16 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/daemon" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/util/cdcutil" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/multierr" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const ( @@ -138,6 +141,18 @@ type StreamConfig struct { AdvancerCfg advancercfg.Config `json:"advancer-config" toml:"advancer-config"` } +func DefaultStreamConfig(flagsDef func(*pflag.FlagSet)) StreamConfig { + fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + flagsDef(fs) + DefineCommonFlags(fs) + cfg := StreamConfig{} + err := cfg.ParseFromFlags(fs) + if err != nil { + log.Panic("failed to parse backup flags to config", zap.Error(err)) + } + return cfg +} + func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStorage, error) { u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) if err != nil { @@ -1090,13 +1105,13 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre } if cfg.CleanUpCompactions { - est := stream.MigerationExtension(extStorage) + est := stream.MigrationExtension(extStorage) est.Hooks = stream.NewProgressBarHooks(console) newSN := math.MaxInt optPrompt := stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { console.Println("We are going to do the following: ") tbl := console.CreateTable() - stream.AddMigrationToTable(m, tbl) + est.AddMigrationToTable(ctx, m, tbl) tbl.Print() return console.PromptBool("Continue? ") }) @@ -1190,9 +1205,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } -// checkTaskExists checks whether there is a log backup task running. +// checkConflictingLogBackup checks whether there is a log backup task running. // If so, return an error. -func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.Client) error { +func checkConflictingLogBackup(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.Client) error { if err := checkConfigForStatus(cfg.PD); err != nil { return err } @@ -1203,15 +1218,37 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3. if err != nil { return err } - if len(tasks) > 0 { - return errors.Errorf("log backup task is running: %s, "+ - "please stop the task before restore, and after PITR operation finished, "+ - "create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) + for _, task := range tasks { + if err := checkTaskCompat(cfg, task); err != nil { + return err + } } return nil } +func checkTaskCompat(cfg *RestoreConfig, task streamhelper.Task) error { + baseErr := errors.Errorf("log backup task is running: %s, and isn't compatible with your restore."+ + "You may check the extra information to get rid of this. If that doesn't work, you may "+ + "stop the task before restore, and after the restore operation finished, "+ + "create log-backup task again and create a full backup on this cluster.", task.Info.Name) + if len(cfg.FullBackupStorage) > 0 { + return errors.Annotate(baseErr, "you want to do point in time restore, which isn't compatible with an enabled log backup task yet") + } + if !cfg.UserFiltered() { + return errors.Annotate(baseErr, + "you want to restore a whole cluster, you may use `-f` or `restore table|database` to "+ + "specify the tables to restore to continue") + } + if cfg.LocalEncryptionEnabled() { + return errors.Annotate(baseErr, "the data you want to restore is encrypted, they cannot be copied to the log storage") + } + if task.Info.GetSecurityConfig().GetEncryption() != nil { + return errors.Annotate(baseErr, "the running log backup task is encrypted, the data copied to the log storage cannot work") + } + return nil +} + func checkIncompatibleChangefeed(ctx context.Context, backupTS uint64, etcdCLI *clientv3.Client) error { nameSet, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(ctx, etcdCLI, backupTS) if err != nil { @@ -1332,6 +1369,7 @@ func restoreStream( checkpointTotalKVCount uint64 checkpointTotalSize uint64 currentTS uint64 + extraFields []zapcore.Field mu sync.Mutex startTime = time.Now() ) @@ -1340,18 +1378,20 @@ func restoreStream( summary.Log("restore log failed summary", zap.Error(err)) } else { totalDureTime := time.Since(startTime) - summary.Log("restore log success summary", zap.Duration("total-take", totalDureTime), - zap.Uint64("source-start-point", cfg.StartTS), - zap.Uint64("source-end-point", cfg.RestoreTS), - zap.Uint64("target-end-point", currentTS), - zap.String("source-start", stream.FormatDate(oracle.GetTimeFromTS(cfg.StartTS))), - zap.String("source-end", stream.FormatDate(oracle.GetTimeFromTS(cfg.RestoreTS))), - zap.String("target-end", stream.FormatDate(oracle.GetTimeFromTS(currentTS))), - zap.Uint64("total-kv-count", totalKVCount), - zap.Uint64("skipped-kv-count-by-checkpoint", checkpointTotalKVCount), - zap.String("total-size", units.HumanSize(float64(totalSize))), - zap.String("skipped-size-by-checkpoint", units.HumanSize(float64(checkpointTotalSize))), - zap.String("average-speed", units.HumanSize(float64(totalSize)/totalDureTime.Seconds())+"/s"), + summary.Log("restore log success summary", + append([]zapcore.Field{zap.Duration("total-take", totalDureTime), + zap.Uint64("source-start-point", cfg.StartTS), + zap.Uint64("source-end-point", cfg.RestoreTS), + zap.Uint64("target-end-point", currentTS), + zap.String("source-start", stream.FormatDate(oracle.GetTimeFromTS(cfg.StartTS))), + zap.String("source-end", stream.FormatDate(oracle.GetTimeFromTS(cfg.RestoreTS))), + zap.String("target-end", stream.FormatDate(oracle.GetTimeFromTS(currentTS))), + zap.Uint64("total-kv-count", totalKVCount), + zap.Uint64("skipped-kv-count-by-checkpoint", checkpointTotalKVCount), + zap.String("total-size", units.HumanSize(float64(totalSize))), + zap.String("skipped-size-by-checkpoint", units.HumanSize(float64(checkpointTotalSize))), + zap.String("average-speed (log)", units.HumanSize(float64(totalSize)/totalDureTime.Seconds())+"/s")}, + extraFields...)..., ) } }() @@ -1376,6 +1416,7 @@ func restoreStream( return errors.Annotate(err, "failed to create restore client") } defer client.Close(ctx) + defer client.RestoreSSTStatisticFields(&extraFields) if taskInfo != nil && taskInfo.Metadata != nil { // reuse the task's rewrite ts @@ -1452,7 +1493,8 @@ func restoreStream( if err != nil { return errors.Trace(err) } - client.BuildMigrations(migs) + client.BuildMigrations(migs.Migs) + defer cleanUpWithRetErr(&err, migs.ReadLock.Unlock) // get full backup meta storage to generate rewrite rules. fullBackupStorage, err := parseFullBackupTablesStorage(cfg) @@ -1524,7 +1566,10 @@ func restoreStream( return errors.Trace(err) } - compactionIter := client.LogFileManager.GetCompactionIter(ctx) + numberOfKVsInSST, err := client.LogFileManager.CountExtraSSTTotalKVs(ctx) + if err != nil { + return err + } se, err := g.CreateSession(mgr.GetStorage()) if err != nil { @@ -1534,7 +1579,12 @@ func restoreStream( splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx) log.Info("[Log Restore] get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys)) - pd := g.StartProgress(ctx, "Restore Files(SST + KV)", logclient.TotalEntryCount, !cfg.LogProgress) + addedSSTsIter := client.LogFileManager.GetIngestedSSTsSSTs(ctx) + compactionIter := client.LogFileManager.GetCompactionIter(ctx) + sstsIter := iter.ConcatAll(addedSSTsIter, compactionIter) + + totalWorkUnits := numberOfKVsInSST + int64(client.Stats.NumEntries) + pd := g.StartProgress(ctx, "Restore Files(SST + Log)", totalWorkUnits, !cfg.LogProgress) err = withProgress(pd, func(p glue.Progress) (pErr error) { updateStatsWithCheckpoint := func(kvCount, size uint64) { mu.Lock() @@ -1547,7 +1597,7 @@ func restoreStream( p.IncBy(int64(kvCount)) } compactedSplitIter, err := client.WrapCompactedFilesIterWithSplitHelper( - ctx, compactionIter, rewriteRules, sstCheckpointSets, + ctx, sstsIter, rewriteRules, sstCheckpointSets, updateStatsWithCheckpoint, splitSize, splitKeys, ) if err != nil { @@ -1999,6 +2049,15 @@ func checkPiTRTaskInfo( return checkInfo, nil } +func cleanUpWithRetErr(errOut *error, f func(ctx context.Context) error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + err := f(ctx) + if errOut != nil { + *errOut = multierr.Combine(*errOut, err) + } +} + func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error { log.Info("waiting for schema info finishes reloading") reloadStart := time.Now() diff --git a/go.mod b/go.mod index 7e9a0ac1a907d..012e598961546 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/fn v1.0.0 - github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 + github.com/pingcap/kvproto v0.0.0-20250108041715-3b77f2c65c63 github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index 0e36eb7a11528..939b987305752 100644 --- a/go.sum +++ b/go.sum @@ -676,8 +676,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= -github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250108041715-3b77f2c65c63 h1:ThJ7ddLJVk96Iai2HDeyJGuuhrcBtc3HwYKJfuKPLsI= +github.com/pingcap/kvproto v0.0.0-20250108041715-3b77f2c65c63/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8= From 94c29aefcd290ee82fe390b72c42b96e7ea34299 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 8 Jan 2025 17:55:08 +0800 Subject: [PATCH 2/9] added skip tiflash and utilized for integration test Signed-off-by: hillium --- br/pkg/task/operator/force_flush.go | 5 +++-- br/tests/br_test_utils.sh | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/br/pkg/task/operator/force_flush.go b/br/pkg/task/operator/force_flush.go index 4f9f622812c5b..200e4246c024f 100644 --- a/br/pkg/task/operator/force_flush.go +++ b/br/pkg/task/operator/force_flush.go @@ -46,8 +46,9 @@ func RunForceFlush(ctx context.Context, cfg *ForceFlushConfig) error { log.Info("About to start force flushing.", zap.Stringer("stores-pattern", cfg.StoresPattern)) for _, s := range tikvs { s := s - if !cfg.StoresPattern.MatchString(s.Address) { - log.Info("Skipping not matched TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address)) + if !cfg.StoresPattern.MatchString(s.Address) || engine.IsTiFlash(s) { + log.Info("Skipping TiFlash or not matched TiKV.", + zap.Uint64("store", s.GetId()), zap.String("addr", s.Address), zap.Bool("tiflash?", engine.IsTiFlash(s))) } log.Info("Starting force flush TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address)) eg.Go(func() error { diff --git a/br/tests/br_test_utils.sh b/br/tests/br_test_utils.sh index 9102415a77e14..9d2c79fe5a452 100644 --- a/br/tests/br_test_utils.sh +++ b/br/tests/br_test_utils.sh @@ -22,6 +22,8 @@ wait_log_checkpoint_advance() { sleep 10 local current_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") echo "current ts: $current_ts" + + run_br --skip-goleak --pd $PD_ADDR operator force-flush || echo "failed to run force flush, the case may be slower." i=0 while true; do # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty From 292280b7f538af0c1237c96b0b9db2a455696085 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 8 Jan 2025 18:09:58 +0800 Subject: [PATCH 3/9] revert irrelative changes --- br/pkg/task/common.go | 4 - br/pkg/task/operator/list_migration.go | 8 +- br/pkg/task/operator/migrate_to.go | 20 +++-- br/pkg/task/operator/prepare_snap.go | 20 ----- br/pkg/task/restore.go | 35 +------- br/pkg/task/stream.go | 107 ++++++------------------- 6 files changed, 39 insertions(+), 155 deletions(-) diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index c95bee08c59d8..1813741634609 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -596,10 +596,6 @@ func (cfg *Config) normalizePDURLs() error { return nil } -func (cfg *Config) UserFiltered() bool { - return len(cfg.Schemas) != 0 || len(cfg.Tables) != 0 || len(cfg.FilterStr) != 0 -} - // ParseFromFlags parses the config from the flag set. func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { var err error diff --git a/br/pkg/task/operator/list_migration.go b/br/pkg/task/operator/list_migration.go index 1e030d7e0f3d8..d6c7efd57197a 100644 --- a/br/pkg/task/operator/list_migration.go +++ b/br/pkg/task/operator/list_migration.go @@ -26,8 +26,8 @@ func RunListMigrations(ctx context.Context, cfg ListMigrationConfig) error { if err != nil { return err } - ext := stream.MigrationExtension(st) - migs, err := ext.Load(ctx, stream.MLNotFoundIsErr()) + ext := stream.MigerationExtension(st) + migs, err := ext.Load(ctx) if err != nil { return err } @@ -40,12 +40,12 @@ func RunListMigrations(ctx context.Context, cfg ListMigrationConfig) error { console.Println(statusOK(fmt.Sprintf("Total %d Migrations.", len(migs.Layers)+1))) console.Printf("> BASE <\n") tbl := console.CreateTable() - ext.AddMigrationToTable(ctx, migs.Base, tbl) + stream.AddMigrationToTable(migs.Base, tbl) tbl.Print() for _, t := range migs.Layers { console.Printf("> %08d <\n", t.SeqNum) tbl := console.CreateTable() - ext.AddMigrationToTable(ctx, &t.Content, tbl) + stream.AddMigrationToTable(&t.Content, tbl) tbl.Print() } } diff --git a/br/pkg/task/operator/migrate_to.go b/br/pkg/task/operator/migrate_to.go index 2a086b9868db1..282e82784ecb9 100644 --- a/br/pkg/task/operator/migrate_to.go +++ b/br/pkg/task/operator/migrate_to.go @@ -5,7 +5,7 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" - backup "github.com/pingcap/kvproto/pkg/brpb" + backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" @@ -39,16 +39,16 @@ func (cx migrateToCtx) printErr(errs []error, msg string) { } } -func (cx migrateToCtx) askForContinue(ctx context.Context, targetMig *backup.Migration) bool { +func (cx migrateToCtx) askForContinue(targetMig *backuppb.Migration) bool { tbl := cx.console.CreateTable() - cx.est.AddMigrationToTable(ctx, targetMig, tbl) + stream.AddMigrationToTable(targetMig, tbl) cx.console.Println("The migration going to be executed will be like: ") tbl.Print() return cx.console.PromptBool("Continue? ") } -func (cx migrateToCtx) dryRun(ctx context.Context, f func(stream.MigrationExt) stream.MergeAndMigratedTo) error { +func (cx migrateToCtx) dryRun(f func(stream.MigrationExt) stream.MergeAndMigratedTo) error { var ( est = cx.est console = cx.console @@ -60,7 +60,7 @@ func (cx migrateToCtx) dryRun(ctx context.Context, f func(stream.MigrationExt) s }) tbl := console.CreateTable() - cx.est.AddMigrationToTable(ctx, estBase.NewBase, tbl) + stream.AddMigrationToTable(estBase.NewBase, tbl) console.Println("The new BASE migration will be like: ") tbl.Print() file, err := storage.SaveJSONEffectsToTmp(effects) @@ -90,7 +90,7 @@ func RunMigrateTo(ctx context.Context, cfg MigrateToConfig) error { console := glue.ConsoleOperations{ConsoleGlue: glue.StdIOGlue{}} - est := stream.MigrationExtension(st) + est := stream.MigerationExtension(st) est.Hooks = stream.NewProgressBarHooks(console) migs, err := est.Load(ctx) if err != nil { @@ -120,14 +120,12 @@ func RunMigrateTo(ctx context.Context, cfg MigrateToConfig) error { return nil } if cfg.DryRun { - run = func(f func(stream.MigrationExt) stream.MergeAndMigratedTo) error { - return cx.dryRun(ctx, f) - } + run = cx.dryRun } return run(func(est stream.MigrationExt) stream.MergeAndMigratedTo { - return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backup.Migration) bool { - return cfg.Yes || cx.askForContinue(ctx, m) + return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { + return cfg.Yes || cx.askForContinue(m) })) }) } diff --git a/br/pkg/task/operator/prepare_snap.go b/br/pkg/task/operator/prepare_snap.go index 4bf6ed5b1b8e9..2f846e2ac9dc2 100644 --- a/br/pkg/task/operator/prepare_snap.go +++ b/br/pkg/task/operator/prepare_snap.go @@ -19,32 +19,12 @@ import ( "github.com/pingcap/tidb/br/pkg/task" "github.com/pingcap/tidb/br/pkg/utils" "github.com/tikv/client-go/v2/tikv" - pd "github.com/tikv/pd/client" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc/keepalive" ) -func createStoreManager(pd pd.Client, cfg *task.Config) (*utils.StoreManager, error) { - var ( - tconf *tls.Config - err error - ) - - if cfg.TLS.IsEnabled() { - tconf, err = cfg.TLS.ToTLSConfig() - if err != nil { - return nil, errors.Annotate(err, "invalid tls config") - } - } - kvMgr := utils.NewStoreManager(pd, keepalive.ClientParameters{ - Time: cfg.GRPCKeepaliveTime, - Timeout: cfg.GRPCKeepaliveTimeout, - }, tconf) - return kvMgr, nil -} - func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { var tc *tls.Config if cfg.TLS.IsEnabled() { diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 9b575bcea6c4b..1cb3fd9e92fe7 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -15,9 +15,7 @@ import ( "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/checkpoint" pconfig "github.com/pingcap/tidb/br/pkg/config" @@ -40,9 +38,7 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/engine" "github.com/spf13/cobra" @@ -280,10 +276,6 @@ type RestoreConfig struct { UseFSR bool `json:"use-fsr" toml:"use-fsr"` } -func (r *RestoreConfig) LocalEncryptionEnabled() bool { - return r.CipherInfo.CipherType != encryptionpb.EncryptionMethod_PLAINTEXT -} - // DefineRestoreFlags defines common flags for the restore tidb command. func DefineRestoreFlags(flags *pflag.FlagSet) { flags.Bool(flagNoSchema, false, "skip creating schemas and tables, reuse existing empty ones") @@ -673,12 +665,6 @@ func DefaultRestoreConfig(commonConfig Config) RestoreConfig { return cfg } -func printRestoreMetrics() { - log.Info("Metric: import_file_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreImportFileSeconds))) - log.Info("Metric: upload_sst_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTForPiTRSeconds))) - log.Info("Metric: upload_sst_meta_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTMetaForPiTRSeconds))) -} - // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { etcdCLI, err := dialEtcdWithCfg(c, cfg.Config) @@ -690,7 +676,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf log.Error("failed to close the etcd client", zap.Error(err)) } }() - if err := checkConflictingLogBackup(c, cfg, etcdCLI); err != nil { + if err := checkTaskExists(c, cfg, etcdCLI); err != nil { return errors.Annotate(err, "failed to check task exists") } closeF, err := registerTaskToPD(c, etcdCLI) @@ -712,8 +698,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } defer mgr.Close() - defer printRestoreMetrics() - var restoreError error if IsStreamRestore(cmdName) { if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBRPiTR); err != nil { @@ -802,15 +786,14 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s // Init DB connection sessions err = client.Init(g, mgr.GetStorage()) defer client.Close() + if err != nil { return errors.Trace(err) } - u, s, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config) if err != nil { return errors.Trace(err) } - if cfg.CheckRequirements { err := checkIncompatibleChangefeed(ctx, backupMeta.EndVersion, mgr.GetDomain().GetEtcdClient()) log.Info("Checking incompatible TiCDC changefeeds before restoring.", @@ -950,15 +933,6 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s }() } - err = client.InstallPiTRSupport(ctx, snapclient.PiTRCollDep{ - PDCli: mgr.GetPDClient(), - EtcdCli: mgr.GetDomain().GetEtcdClient(), - Storage: util.ProtoV1Clone(u), - }) - if err != nil { - return errors.Trace(err) - } - sp := utils.BRServiceSafePoint{ BackupTS: restoreTS, TTL: utils.DefaultBRGCSafePointTTL, @@ -1187,11 +1161,6 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s return errors.Trace(err) } - failpoint.InjectCall("run-snapshot-restore-about-to-finish", &err) - if err != nil { - return err - } - schedulersRemovable = true // Set task summary to success status. diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 2ee1713f4f6c3..0c22db685ce16 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -54,16 +54,13 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/daemon" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/util/cdcutil" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/multierr" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) const ( @@ -141,18 +138,6 @@ type StreamConfig struct { AdvancerCfg advancercfg.Config `json:"advancer-config" toml:"advancer-config"` } -func DefaultStreamConfig(flagsDef func(*pflag.FlagSet)) StreamConfig { - fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) - flagsDef(fs) - DefineCommonFlags(fs) - cfg := StreamConfig{} - err := cfg.ParseFromFlags(fs) - if err != nil { - log.Panic("failed to parse backup flags to config", zap.Error(err)) - } - return cfg -} - func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStorage, error) { u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) if err != nil { @@ -1105,13 +1090,13 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre } if cfg.CleanUpCompactions { - est := stream.MigrationExtension(extStorage) + est := stream.MigerationExtension(extStorage) est.Hooks = stream.NewProgressBarHooks(console) newSN := math.MaxInt optPrompt := stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { console.Println("We are going to do the following: ") tbl := console.CreateTable() - est.AddMigrationToTable(ctx, m, tbl) + stream.AddMigrationToTable(m, tbl) tbl.Print() return console.PromptBool("Continue? ") }) @@ -1205,9 +1190,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre return nil } -// checkConflictingLogBackup checks whether there is a log backup task running. +// checkTaskExists checks whether there is a log backup task running. // If so, return an error. -func checkConflictingLogBackup(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.Client) error { +func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.Client) error { if err := checkConfigForStatus(cfg.PD); err != nil { return err } @@ -1218,37 +1203,15 @@ func checkConflictingLogBackup(ctx context.Context, cfg *RestoreConfig, etcdCLI if err != nil { return err } - for _, task := range tasks { - if err := checkTaskCompat(cfg, task); err != nil { - return err - } + if len(tasks) > 0 { + return errors.Errorf("log backup task is running: %s, "+ + "please stop the task before restore, and after PITR operation finished, "+ + "create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) } return nil } -func checkTaskCompat(cfg *RestoreConfig, task streamhelper.Task) error { - baseErr := errors.Errorf("log backup task is running: %s, and isn't compatible with your restore."+ - "You may check the extra information to get rid of this. If that doesn't work, you may "+ - "stop the task before restore, and after the restore operation finished, "+ - "create log-backup task again and create a full backup on this cluster.", task.Info.Name) - if len(cfg.FullBackupStorage) > 0 { - return errors.Annotate(baseErr, "you want to do point in time restore, which isn't compatible with an enabled log backup task yet") - } - if !cfg.UserFiltered() { - return errors.Annotate(baseErr, - "you want to restore a whole cluster, you may use `-f` or `restore table|database` to "+ - "specify the tables to restore to continue") - } - if cfg.LocalEncryptionEnabled() { - return errors.Annotate(baseErr, "the data you want to restore is encrypted, they cannot be copied to the log storage") - } - if task.Info.GetSecurityConfig().GetEncryption() != nil { - return errors.Annotate(baseErr, "the running log backup task is encrypted, the data copied to the log storage cannot work") - } - return nil -} - func checkIncompatibleChangefeed(ctx context.Context, backupTS uint64, etcdCLI *clientv3.Client) error { nameSet, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(ctx, etcdCLI, backupTS) if err != nil { @@ -1369,7 +1332,6 @@ func restoreStream( checkpointTotalKVCount uint64 checkpointTotalSize uint64 currentTS uint64 - extraFields []zapcore.Field mu sync.Mutex startTime = time.Now() ) @@ -1378,20 +1340,18 @@ func restoreStream( summary.Log("restore log failed summary", zap.Error(err)) } else { totalDureTime := time.Since(startTime) - summary.Log("restore log success summary", - append([]zapcore.Field{zap.Duration("total-take", totalDureTime), - zap.Uint64("source-start-point", cfg.StartTS), - zap.Uint64("source-end-point", cfg.RestoreTS), - zap.Uint64("target-end-point", currentTS), - zap.String("source-start", stream.FormatDate(oracle.GetTimeFromTS(cfg.StartTS))), - zap.String("source-end", stream.FormatDate(oracle.GetTimeFromTS(cfg.RestoreTS))), - zap.String("target-end", stream.FormatDate(oracle.GetTimeFromTS(currentTS))), - zap.Uint64("total-kv-count", totalKVCount), - zap.Uint64("skipped-kv-count-by-checkpoint", checkpointTotalKVCount), - zap.String("total-size", units.HumanSize(float64(totalSize))), - zap.String("skipped-size-by-checkpoint", units.HumanSize(float64(checkpointTotalSize))), - zap.String("average-speed (log)", units.HumanSize(float64(totalSize)/totalDureTime.Seconds())+"/s")}, - extraFields...)..., + summary.Log("restore log success summary", zap.Duration("total-take", totalDureTime), + zap.Uint64("source-start-point", cfg.StartTS), + zap.Uint64("source-end-point", cfg.RestoreTS), + zap.Uint64("target-end-point", currentTS), + zap.String("source-start", stream.FormatDate(oracle.GetTimeFromTS(cfg.StartTS))), + zap.String("source-end", stream.FormatDate(oracle.GetTimeFromTS(cfg.RestoreTS))), + zap.String("target-end", stream.FormatDate(oracle.GetTimeFromTS(currentTS))), + zap.Uint64("total-kv-count", totalKVCount), + zap.Uint64("skipped-kv-count-by-checkpoint", checkpointTotalKVCount), + zap.String("total-size", units.HumanSize(float64(totalSize))), + zap.String("skipped-size-by-checkpoint", units.HumanSize(float64(checkpointTotalSize))), + zap.String("average-speed", units.HumanSize(float64(totalSize)/totalDureTime.Seconds())+"/s"), ) } }() @@ -1416,7 +1376,6 @@ func restoreStream( return errors.Annotate(err, "failed to create restore client") } defer client.Close(ctx) - defer client.RestoreSSTStatisticFields(&extraFields) if taskInfo != nil && taskInfo.Metadata != nil { // reuse the task's rewrite ts @@ -1493,8 +1452,7 @@ func restoreStream( if err != nil { return errors.Trace(err) } - client.BuildMigrations(migs.Migs) - defer cleanUpWithRetErr(&err, migs.ReadLock.Unlock) + client.BuildMigrations(migs) // get full backup meta storage to generate rewrite rules. fullBackupStorage, err := parseFullBackupTablesStorage(cfg) @@ -1566,10 +1524,7 @@ func restoreStream( return errors.Trace(err) } - numberOfKVsInSST, err := client.LogFileManager.CountExtraSSTTotalKVs(ctx) - if err != nil { - return err - } + compactionIter := client.LogFileManager.GetCompactionIter(ctx) se, err := g.CreateSession(mgr.GetStorage()) if err != nil { @@ -1579,12 +1534,7 @@ func restoreStream( splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx) log.Info("[Log Restore] get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys)) - addedSSTsIter := client.LogFileManager.GetIngestedSSTsSSTs(ctx) - compactionIter := client.LogFileManager.GetCompactionIter(ctx) - sstsIter := iter.ConcatAll(addedSSTsIter, compactionIter) - - totalWorkUnits := numberOfKVsInSST + int64(client.Stats.NumEntries) - pd := g.StartProgress(ctx, "Restore Files(SST + Log)", totalWorkUnits, !cfg.LogProgress) + pd := g.StartProgress(ctx, "Restore Files(SST + KV)", logclient.TotalEntryCount, !cfg.LogProgress) err = withProgress(pd, func(p glue.Progress) (pErr error) { updateStatsWithCheckpoint := func(kvCount, size uint64) { mu.Lock() @@ -1597,7 +1547,7 @@ func restoreStream( p.IncBy(int64(kvCount)) } compactedSplitIter, err := client.WrapCompactedFilesIterWithSplitHelper( - ctx, sstsIter, rewriteRules, sstCheckpointSets, + ctx, compactionIter, rewriteRules, sstCheckpointSets, updateStatsWithCheckpoint, splitSize, splitKeys, ) if err != nil { @@ -2049,15 +1999,6 @@ func checkPiTRTaskInfo( return checkInfo, nil } -func cleanUpWithRetErr(errOut *error, f func(ctx context.Context) error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - err := f(ctx) - if errOut != nil { - *errOut = multierr.Combine(*errOut, err) - } -} - func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) error { log.Info("waiting for schema info finishes reloading") reloadStart := time.Now() From 6f38e25374072230056e57e417136647ca113c48 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 8 Jan 2025 18:20:14 +0800 Subject: [PATCH 4/9] make bazel_prepare --- br/pkg/task/BUILD.bazel | 3 --- 1 file changed, 3 deletions(-) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 2fde9e8981389..d6d4aaaf0291e 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -50,7 +50,6 @@ go_library( "//br/pkg/streamhelper/daemon", "//br/pkg/summary", "//br/pkg/utils", - "//br/pkg/utils/iter", "//br/pkg/version", "//pkg/config", "//pkg/ddl", @@ -59,7 +58,6 @@ go_library( "//pkg/infoschema/context", "//pkg/kv", "//pkg/meta/model", - "//pkg/metrics", "//pkg/parser/ast", "//pkg/parser/mysql", "//pkg/sessionctx/stmtctx", @@ -100,7 +98,6 @@ go_library( "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", - "@org_uber_go_zap//zapcore", ], ) From 728145e07b0009a8b3bc4fff0fe3dd07a70d9162 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 8 Jan 2025 18:43:04 +0800 Subject: [PATCH 5/9] fix build --- br/pkg/task/operator/force_flush.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/br/pkg/task/operator/force_flush.go b/br/pkg/task/operator/force_flush.go index 200e4246c024f..cdc08e6050b39 100644 --- a/br/pkg/task/operator/force_flush.go +++ b/br/pkg/task/operator/force_flush.go @@ -2,18 +2,22 @@ package operator import ( "context" + "crypto/tls" "slices" "github.com/pingcap/errors" logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/util/engine" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/opt" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) func getAllTiKVs(ctx context.Context, p pd.Client) ([]*metapb.Store, error) { @@ -25,6 +29,25 @@ func getAllTiKVs(ctx context.Context, p pd.Client) ([]*metapb.Store, error) { return withoutTiFlash, err } +func createStoreManager(pd pd.Client, cfg *task.Config) (*utils.StoreManager, error) { + var ( + tconf *tls.Config + err error + ) + + if cfg.TLS.IsEnabled() { + tconf, err = cfg.TLS.ToTLSConfig() + if err != nil { + return nil, errors.Annotate(err, "invalid tls config") + } + } + kvMgr := utils.NewStoreManager(pd, keepalive.ClientParameters{ + Time: cfg.GRPCKeepaliveTime, + Timeout: cfg.GRPCKeepaliveTimeout, + }, tconf) + return kvMgr, nil +} + func RunForceFlush(ctx context.Context, cfg *ForceFlushConfig) error { pdMgr, err := dialPD(ctx, &cfg.Config) if err != nil { From 476f860c09e8b068b9b5ebffc2bf28a134bfa221 Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 9 Jan 2025 09:32:29 +0800 Subject: [PATCH 6/9] fix build Signed-off-by: hillium --- br/pkg/streamhelper/basic_lib_for_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 22fa031854fbe..bbc74b0d44b85 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -183,6 +183,11 @@ func (f *fakeStore) GetID() uint64 { return f.id } +func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) { + f.flush() + return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil +} + func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) { f.clientMu.Lock() defer f.clientMu.Unlock() From 15664c5f72a57189f0a5cdb15343c52d845af2ae Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 9 Jan 2025 13:57:33 +0800 Subject: [PATCH 7/9] make linter happy --- br/pkg/task/operator/checksum_table.go | 6 ++---- br/pkg/task/operator/force_flush.go | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/br/pkg/task/operator/checksum_table.go b/br/pkg/task/operator/checksum_table.go index dbf9d751bd370..59c52f6eb4cea 100644 --- a/br/pkg/task/operator/checksum_table.go +++ b/br/pkg/task/operator/checksum_table.go @@ -7,9 +7,6 @@ import ( "sync" "sync/atomic" - kvutil "github.com/tikv/client-go/v2/util" - "golang.org/x/sync/errgroup" - "github.com/pingcap/errors" backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" @@ -22,7 +19,9 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/util" "github.com/tikv/client-go/v2/oracle" + kvutil "github.com/tikv/client-go/v2/util" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) type checksumTableCtx struct { @@ -231,7 +230,6 @@ func (c *checksumTableCtx) runChecksum(ctx context.Context, reqs []request) ([]C resultsMu := new(sync.Mutex) for _, req := range reqs { - req := req wkPool.ApplyOnErrorGroup(eg, func() error { total := req.copReq.Len() finished := new(atomic.Int64) diff --git a/br/pkg/task/operator/force_flush.go b/br/pkg/task/operator/force_flush.go index cdc08e6050b39..838c582e289a3 100644 --- a/br/pkg/task/operator/force_flush.go +++ b/br/pkg/task/operator/force_flush.go @@ -68,7 +68,6 @@ func RunForceFlush(ctx context.Context, cfg *ForceFlushConfig) error { eg, ectx := errgroup.WithContext(ctx) log.Info("About to start force flushing.", zap.Stringer("stores-pattern", cfg.StoresPattern)) for _, s := range tikvs { - s := s if !cfg.StoresPattern.MatchString(s.Address) || engine.IsTiFlash(s) { log.Info("Skipping TiFlash or not matched TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address), zap.Bool("tiflash?", engine.IsTiFlash(s))) From 40dfca5fc279c713086debe29cf9022221e969e0 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 14 Jan 2025 15:34:55 +0800 Subject: [PATCH 8/9] fix build --- br/pkg/streamhelper/basic_lib_for_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 51c91f20920d2..3d0bf84ac4f78 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -188,11 +188,6 @@ func (f *fakeStore) GetID() uint64 { return f.id } -func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) { - f.flush() - return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil -} - func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) { f.clientMu.Lock() defer f.clientMu.Unlock() From 547de59149cd62625a4b5fb38aa5d3b519543a4c Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 14 Jan 2025 16:23:57 +0800 Subject: [PATCH 9/9] address comments --- br/pkg/task/operator/config.go | 3 +++ br/pkg/task/operator/force_flush.go | 2 ++ 2 files changed, 5 insertions(+) diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index 03996beed3011..8ccf1ef6266b5 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -192,6 +192,9 @@ func (cfg *MigrateToConfig) Verify() error { type ForceFlushConfig struct { task.Config + // StoresPattern matches the address of TiKV. + // The address usually looks like ":20160". + // You may list the store by `pd-ctl stores`. StoresPattern *regexp.Regexp } diff --git a/br/pkg/task/operator/force_flush.go b/br/pkg/task/operator/force_flush.go index 838c582e289a3..6bdc3a0bae288 100644 --- a/br/pkg/task/operator/force_flush.go +++ b/br/pkg/task/operator/force_flush.go @@ -71,7 +71,9 @@ func RunForceFlush(ctx context.Context, cfg *ForceFlushConfig) error { if !cfg.StoresPattern.MatchString(s.Address) || engine.IsTiFlash(s) { log.Info("Skipping TiFlash or not matched TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address), zap.Bool("tiflash?", engine.IsTiFlash(s))) + continue } + log.Info("Starting force flush TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address)) eg.Go(func() error { var logBackupCli logbackup.LogBackupClient