Skip to content

Commit 552b369

Browse files
authored
br: batch restore compacted sst (#64068)
close #64308
1 parent 7793eae commit 552b369

File tree

11 files changed

+745
-38
lines changed

11 files changed

+745
-38
lines changed

DEPS.bzl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6374,13 +6374,13 @@ def go_deps():
63746374
name = "com_github_pingcap_kvproto",
63756375
build_file_proto_mode = "disable_global",
63766376
importpath = "github.com/pingcap/kvproto",
6377-
sha256 = "944c84a0347ba1d154c473941a8820324226757f79be328acc5eb02c78d804c1",
6378-
strip_prefix = "github.com/pingcap/[email protected]20251023055424-e9d10f5dcd23",
6377+
sha256 = "4c632c03ee044e45c5e59cdb86f8c5ec387d28673cc947d62e6f8d2996de4d80",
6378+
strip_prefix = "github.com/pingcap/[email protected]20251104104744-291054671541",
63796379
urls = [
6380-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251023055424-e9d10f5dcd23.zip",
6381-
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251023055424-e9d10f5dcd23.zip",
6382-
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251023055424-e9d10f5dcd23.zip",
6383-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251023055424-e9d10f5dcd23.zip",
6380+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251104104744-291054671541.zip",
6381+
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251104104744-291054671541.zip",
6382+
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251104104744-291054671541.zip",
6383+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20251104104744-291054671541.zip",
63846384
],
63856385
)
63866386
go_repository(

br/pkg/restore/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ go_test(
6060
],
6161
embed = [":restore"],
6262
flaky = True,
63-
shard_count = 17,
63+
shard_count = 19,
6464
deps = [
6565
"//br/pkg/conn",
6666
"//br/pkg/mock",
@@ -83,6 +83,7 @@ go_test(
8383
"@com_github_pingcap_kvproto//pkg/import_sstpb",
8484
"@com_github_pingcap_kvproto//pkg/metapb",
8585
"@com_github_stretchr_testify//require",
86+
"@com_github_tikv_pd_client//opt",
8687
"@org_golang_google_grpc//:grpc",
8788
],
8889
)

br/pkg/restore/internal/import_client/import_client.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ type ImporterClient interface {
5656
req *import_sstpb.DownloadRequest,
5757
) (*import_sstpb.DownloadResponse, error)
5858

59+
BatchDownloadSST(
60+
ctx context.Context,
61+
storeID uint64,
62+
req *import_sstpb.DownloadRequest,
63+
) (*import_sstpb.DownloadResponse, error)
64+
5965
MultiIngest(
6066
ctx context.Context,
6167
storeID uint64,
@@ -75,6 +81,8 @@ type ImporterClient interface {
7581

7682
CloseGrpcClient() error
7783

84+
CheckBatchDownloadSupport(ctx context.Context, stores []uint64) (bool, error)
85+
7886
CheckMultiIngestSupport(ctx context.Context, stores []uint64) error
7987

8088
AddForcePartitionRange(ctx context.Context, storeID uint64, req *import_sstpb.AddPartitionRangeRequest) error
@@ -143,6 +151,18 @@ func (ic *importClient) DownloadSST(
143151
return client.Download(ctx, req)
144152
}
145153

154+
func (ic *importClient) BatchDownloadSST(
155+
ctx context.Context,
156+
storeID uint64,
157+
req *import_sstpb.DownloadRequest,
158+
) (*import_sstpb.DownloadResponse, error) {
159+
client, err := ic.GetImportClient(ctx, storeID)
160+
if err != nil {
161+
return nil, errors.Trace(err)
162+
}
163+
return client.BatchDownload(ctx, req)
164+
}
165+
146166
func (ic *importClient) SetDownloadSpeedLimit(
147167
ctx context.Context,
148168
storeID uint64,
@@ -248,6 +268,21 @@ func (ic *importClient) CloseGrpcClient() error {
248268
return nil
249269
}
250270

271+
func (ic *importClient) CheckBatchDownloadSupport(ctx context.Context, stores []uint64) (bool, error) {
272+
for _, storeID := range stores {
273+
_, err := ic.BatchDownloadSST(ctx, storeID, &import_sstpb.DownloadRequest{})
274+
if err != nil {
275+
if s, ok := status.FromError(err); ok {
276+
if s.Code() == codes.Unimplemented {
277+
return false, nil
278+
}
279+
}
280+
return false, errors.Annotatef(err, "failed to check batch download support. (store id %d)", storeID)
281+
}
282+
}
283+
return true, nil
284+
}
285+
251286
func (ic *importClient) CheckMultiIngestSupport(ctx context.Context, stores []uint64) error {
252287
for _, storeID := range stores {
253288
_, err := ic.MultiIngest(ctx, storeID, &import_sstpb.MultiIngestRequest{})

br/pkg/restore/log_client/client.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func (s *SstRestoreManager) Close(ctx context.Context) {
154154

155155
func NewSstRestoreManager(
156156
ctx context.Context,
157+
metaClient split.SplitClient,
157158
snapFileImporter *snapclient.SnapFileImporter,
158159
concurrencyPerStore uint,
159160
storeCount uint,
@@ -176,7 +177,13 @@ func NewSstRestoreManager(
176177
return nil, errors.Trace(err)
177178
}
178179
}
179-
s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, checkpointRunner)
180+
if snapFileImporter.GetMergeSst() {
181+
log.Info("create batch sst restorer to restore SST files")
182+
s.restorer = restore.NewBatchSstRestorer(ctx, snapFileImporter, metaClient, sstWorkerPool, checkpointRunner)
183+
} else {
184+
log.Info("create simple sst restorer to restore SST files")
185+
s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, checkpointRunner)
186+
}
180187
return s, nil
181188
}
182189

@@ -549,18 +556,22 @@ func (rc *LogClient) InitClients(
549556
createCallBacks = append(createCallBacks, func(importer *snapclient.SnapFileImporter) error {
550557
return importer.CheckMultiIngestSupport(ctx, stores)
551558
})
559+
createCallBacks = append(createCallBacks, func(importer *snapclient.SnapFileImporter) error {
560+
return importer.CheckBatchDownloadSupport(ctx, stores)
561+
})
552562

553563
opt := snapclient.NewSnapFileImporterOptions(
554564
rc.cipher, metaClient, importCli, backend,
555565
snapclient.RewriteModeKeyspace, stores, concurrencyPerStore, createCallBacks, closeCallBacks,
556566
)
557567
snapFileImporter, err := snapclient.NewSnapFileImporter(
558-
ctx, rc.dom.Store().GetCodec().GetAPIVersion(), snapclient.TiDBCompcated, opt)
568+
ctx, rc.dom.Store().GetCodec().GetAPIVersion(), snapclient.TiDBCompacted, opt)
559569
if err != nil {
560570
return errors.Trace(err)
561571
}
562572
rc.sstRestoreManager, err = NewSstRestoreManager(
563573
ctx,
574+
metaClient,
564575
snapFileImporter,
565576
concurrencyPerStore,
566577
uint(len(stores)),

br/pkg/restore/misc.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,19 @@ import (
2222
"encoding/binary"
2323
"fmt"
2424
"path"
25+
"slices"
2526
"strconv"
2627
"strings"
2728

2829
"github.com/gogo/protobuf/proto"
2930
"github.com/pingcap/errors"
3031
"github.com/pingcap/failpoint"
32+
backuppb "github.com/pingcap/kvproto/pkg/brpb"
3133
"github.com/pingcap/log"
3234
berrors "github.com/pingcap/tidb/br/pkg/errors"
3335
"github.com/pingcap/tidb/br/pkg/logutil"
36+
"github.com/pingcap/tidb/br/pkg/restore/split"
37+
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
3438
"github.com/pingcap/tidb/br/pkg/storage"
3539
"github.com/pingcap/tidb/br/pkg/utils"
3640
"github.com/pingcap/tidb/pkg/domain"
@@ -389,3 +393,158 @@ func HasRestoreIDColumn(dom *domain.Domain) bool {
389393
}
390394
return false
391395
}
396+
397+
type regionScanner struct {
398+
regionClient split.SplitClient
399+
regionCache []*split.RegionInfo
400+
cacheSize int
401+
}
402+
403+
func NewRegionScanner(regionClient split.SplitClient, cacheSize int) *regionScanner {
404+
return &regionScanner{
405+
regionClient: regionClient,
406+
cacheSize: cacheSize,
407+
}
408+
}
409+
410+
func (scanner *regionScanner) locateRegionFromRemote(ctx context.Context, key []byte) (*split.RegionInfo, error) {
411+
regionInfos, err := split.ScanRegionsWithRetry(ctx, scanner.regionClient, key, []byte(""), scanner.cacheSize)
412+
if err != nil {
413+
return nil, errors.Trace(err)
414+
}
415+
scanner.regionCache = regionInfos
416+
return scanner.regionCache[0], nil
417+
}
418+
419+
func (scanner *regionScanner) locateRegionFromCache(ctx context.Context, key []byte) (*split.RegionInfo, error) {
420+
if len(scanner.regionCache) == 0 {
421+
return scanner.locateRegionFromRemote(ctx, key)
422+
}
423+
if bytes.Compare(key, scanner.regionCache[len(scanner.regionCache)-1].Region.EndKey) >= 0 {
424+
return scanner.locateRegionFromRemote(ctx, key)
425+
}
426+
i, ok := slices.BinarySearchFunc(scanner.regionCache, key, func(regionInfo *split.RegionInfo, k []byte) int {
427+
startCmpRet := bytes.Compare(regionInfo.Region.StartKey, k)
428+
if startCmpRet <= 0 && (len(regionInfo.Region.EndKey) == 0 || bytes.Compare(regionInfo.Region.EndKey, k) > 0) {
429+
return 0
430+
}
431+
return startCmpRet
432+
})
433+
if !ok {
434+
return scanner.locateRegionFromRemote(ctx, key)
435+
}
436+
scanner.regionCache = scanner.regionCache[i:]
437+
return scanner.regionCache[0], nil
438+
}
439+
440+
func (scanner *regionScanner) IsKeyRangeInOneRegion(ctx context.Context, startKey, endKey []byte) (bool, error) {
441+
regionInfo, err := scanner.locateRegionFromCache(ctx, startKey)
442+
if err != nil {
443+
return false, errors.Trace(err)
444+
}
445+
return len(regionInfo.Region.EndKey) == 0 || bytes.Compare(endKey, regionInfo.Region.EndKey) < 0, nil
446+
}
447+
448+
type BackupFileSetWithKeyRange struct {
449+
backupFileSet BackupFileSet
450+
startKey []byte
451+
endKey []byte
452+
}
453+
454+
func GroupOverlappedBackupFileSetsIter(ctx context.Context, regionClient split.SplitClient, backupFileSets []BackupFileSet, fn func(BatchBackupFileSet)) error {
455+
backupFileSetWithKeyRanges := make([]*BackupFileSetWithKeyRange, 0, len(backupFileSets))
456+
for _, backupFileSet := range backupFileSets {
457+
startKey, endKey, err := getKeyRangeForBackupFileSet(backupFileSet)
458+
if err != nil {
459+
return errors.Trace(err)
460+
}
461+
backupFileSetWithKeyRanges = append(backupFileSetWithKeyRanges, &BackupFileSetWithKeyRange{
462+
backupFileSet: backupFileSet,
463+
startKey: startKey,
464+
endKey: endKey,
465+
})
466+
}
467+
slices.SortFunc(backupFileSetWithKeyRanges, func(a, b *BackupFileSetWithKeyRange) int {
468+
startKeyCmp := bytes.Compare(a.startKey, b.startKey)
469+
if startKeyCmp == 0 {
470+
return bytes.Compare(a.endKey, b.endKey)
471+
}
472+
return startKeyCmp
473+
})
474+
regionScanner := NewRegionScanner(regionClient, 64)
475+
var thisBackupFileSet *BackupFileSet = nil
476+
thisBatchBackupFileSet := make([]BackupFileSet, 0)
477+
lastEndKey := []byte{}
478+
for _, file := range backupFileSetWithKeyRanges {
479+
if bytes.Compare(lastEndKey, file.startKey) < 0 {
480+
// the next file is not overlapped with this backup file set anymore, so add the set
481+
// into the batch set.
482+
if thisBackupFileSet != nil {
483+
thisBatchBackupFileSet = append(thisBatchBackupFileSet, *thisBackupFileSet)
484+
thisBackupFileSet = nil
485+
}
486+
// create new this backup file set
487+
thisBackupFileSet = &BackupFileSet{
488+
TableID: file.backupFileSet.TableID,
489+
SSTFiles: make([]*backuppb.File, 0),
490+
RewriteRules: file.backupFileSet.RewriteRules,
491+
}
492+
thisBackupFileSet.SSTFiles = append(thisBackupFileSet.SSTFiles, file.backupFileSet.SSTFiles...)
493+
// check whether [lastEndKey, file.startKey] is in the one region
494+
inOneRegion, err := regionScanner.IsKeyRangeInOneRegion(ctx, lastEndKey, file.startKey)
495+
if err != nil {
496+
return errors.Trace(err)
497+
}
498+
if !inOneRegion && len(thisBatchBackupFileSet) > 0 {
499+
// not in the same region, so this batch backup file set can be output
500+
fn(thisBatchBackupFileSet)
501+
thisBatchBackupFileSet = make([]BackupFileSet, 0)
502+
}
503+
lastEndKey = file.endKey
504+
} else {
505+
// the next file is overlapped with this backup file set, so add the file
506+
// into the set.
507+
thisBackupFileSet.SSTFiles = append(thisBackupFileSet.SSTFiles, file.backupFileSet.SSTFiles...)
508+
if thisBackupFileSet.TableID != file.backupFileSet.TableID || !thisBackupFileSet.RewriteRules.Equal(file.backupFileSet.RewriteRules) {
509+
log.Error("the overlapped SST must have the same table id and rewrite rules",
510+
zap.Int64("set table id", thisBackupFileSet.TableID),
511+
zap.Int64("file table id", file.backupFileSet.TableID),
512+
zap.Reflect("set rewrite rule", thisBackupFileSet.RewriteRules),
513+
zap.Reflect("file rewrite rule", file.backupFileSet.RewriteRules),
514+
)
515+
return errors.Errorf("the overlapped SST must have the same table id(%d<>%d) and rewrite rules",
516+
thisBackupFileSet.TableID, file.backupFileSet.TableID)
517+
}
518+
// update lastEndKey if file.endKey is larger
519+
if bytes.Compare(lastEndKey, file.endKey) < 0 {
520+
lastEndKey = file.endKey
521+
}
522+
}
523+
}
524+
// add the set into the batch set.
525+
if thisBackupFileSet != nil {
526+
thisBatchBackupFileSet = append(thisBatchBackupFileSet, *thisBackupFileSet)
527+
}
528+
// output the last batch backup file set
529+
if len(thisBatchBackupFileSet) > 0 {
530+
fn(thisBatchBackupFileSet)
531+
}
532+
return nil
533+
}
534+
535+
func getKeyRangeForBackupFileSet(backupFileSet BackupFileSet) ([]byte, []byte, error) {
536+
var startKey, endKey []byte
537+
for _, f := range backupFileSet.SSTFiles {
538+
start, end, err := restoreutils.GetRewriteRawKeys(f, backupFileSet.RewriteRules)
539+
if err != nil {
540+
return nil, nil, errors.Trace(err)
541+
}
542+
if len(startKey) == 0 || bytes.Compare(start, startKey) < 0 {
543+
startKey = start
544+
}
545+
if len(endKey) == 0 || bytes.Compare(endKey, end) < 0 {
546+
endKey = end
547+
}
548+
}
549+
return startKey, endKey, nil
550+
}

0 commit comments

Comments
 (0)