Skip to content

Commit

Permalink
lightning: test a simple case that golang outputs same SST (#57061)
Browse files Browse the repository at this point in the history
ref #57105
  • Loading branch information
lance6716 authored Nov 6, 2024
1 parent e3f9303 commit 0e8c8da
Show file tree
Hide file tree
Showing 11 changed files with 664 additions and 30 deletions.
13 changes: 13 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4576,6 +4576,19 @@ def go_deps():
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/labstack/gommon/com_github_labstack_gommon-v0.4.0.zip",
],
)
go_repository(
name = "com_github_lance6716_pebble",
build_file_proto_mode = "disable_global",
importpath = "github.com/lance6716/pebble",
sha256 = "fad807913eaf5ff14651929475dda5e5c6f6838dbaeef11b7eddefe771f01f1e",
strip_prefix = "github.com/lance6716/[email protected]",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip",
"http://ats.apps.svc/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip",
"https://cache.hawkingrei.com/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip",
],
)
go_repository(
name = "com_github_lasiar_canonicalheader",
build_file_proto_mode = "disable_global",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
github.com/kisielk/errcheck v1.7.0
github.com/klauspost/compress v1.17.9
github.com/ks3sdklib/aws-sdk-go v1.2.9
github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183
github.com/lestrrat-go/jwx/v2 v2.0.21
github.com/mgechev/revive v1.4.0
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,8 @@ github.com/ks3sdklib/aws-sdk-go v1.2.9 h1:Eg0fM56r4Gjp9PiK1Bg9agJUxCAWCk236qq9DI
github.com/ks3sdklib/aws-sdk-go v1.2.9/go.mod h1:xBNbOrxSnd36AQpZ8o99mGGu+blblUd9rI0MKGmeufo=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183 h1:CrFmpCAT5PGMgmQadTa2lXZrjuvpknONB1/pyxiyDsM=
github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183/go.mod h1:ZxnWA3Ab0ufDIyppyzL16j6HFNpdXeiU/1cE4Wlv/lQ=
github.com/lestrrat-go/blackmagic v1.0.2 h1:Cg2gVSc9h7sz9NOByczrbUvLopQmXrfFx//N+AkAr5k=
github.com/lestrrat-go/blackmagic v1.0.2/go.mod h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU=
github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE=
Expand Down
14 changes: 7 additions & 7 deletions pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func getDupDetectClient(
ctx context.Context,
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
importClientFactory importClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
Expand All @@ -317,7 +317,7 @@ func getDupDetectClient(
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", region.Region.Id)
}
importClient, err := importClientFactory.Create(ctx, leader.GetStoreId())
importClient, err := importClientFactory.create(ctx, leader.GetStoreId())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func NewRemoteDupKVStream(
ctx context.Context,
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
importClientFactory importClientFactory,
resourceGroupName string,
taskType string,
minCommitTS uint64,
Expand Down Expand Up @@ -902,7 +902,7 @@ func (m *dupeDetector) processRemoteDupTaskOnce(
ctx context.Context,
task dupTask,
logger log.Logger,
importClientFactory ImportClientFactory,
importClientFactory importClientFactory,
regionPool *util.WorkerPool,
remainKeyRanges *pendingKeyRanges,
algorithm config.DuplicateResolutionAlgorithm,
Expand Down Expand Up @@ -980,7 +980,7 @@ func (m *dupeDetector) processRemoteDupTask(
ctx context.Context,
task dupTask,
logger log.Logger,
importClientFactory ImportClientFactory,
importClientFactory importClientFactory,
regionPool *util.WorkerPool,
algorithm config.DuplicateResolutionAlgorithm,
) error {
Expand Down Expand Up @@ -1026,7 +1026,7 @@ func (m *dupeDetector) processRemoteDupTask(
// records all duplicate row info into errorMgr.
func (m *dupeDetector) collectDuplicateRowsFromTiKV(
ctx context.Context,
importClientFactory ImportClientFactory,
importClientFactory importClientFactory,
algorithm config.DuplicateResolutionAlgorithm,
) error {
tasks, err := m.buildDupTasks()
Expand Down Expand Up @@ -1070,7 +1070,7 @@ type DupeController struct {
dupeConcurrency int
duplicateDB *pebble.DB
keyAdapter common.KeyAdapter
importClientFactory ImportClientFactory
importClientFactory importClientFactory
resourceGroupName string
taskType string
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ var (
MaxWriteAndIngestRetryTimes = 30
)

// ImportClientFactory is factory to create new import client for specific store.
type ImportClientFactory interface {
Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error)
Close()
// importClientFactory is factory to create new import client for specific store.
type importClientFactory interface {
create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error)
close()
}

type importClientFactoryImpl struct {
Expand Down Expand Up @@ -209,17 +209,17 @@ func (f *importClientFactoryImpl) getGrpcConn(ctx context.Context, storeID uint6
})
}

// Create creates a new import client for specific store.
func (f *importClientFactoryImpl) Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
// create creates a new import client for specific store.
func (f *importClientFactoryImpl) create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
conn, err := f.getGrpcConn(ctx, storeID)
if err != nil {
return nil, err
}
return sst.NewImportSSTClient(conn), nil
}

// Close closes the factory.
func (f *importClientFactoryImpl) Close() {
// close closes the factory.
func (f *importClientFactoryImpl) close() {
f.conns.Close()
}

Expand Down Expand Up @@ -511,7 +511,7 @@ type Backend struct {
engineMgr *engineManager

supportMultiIngest bool
importClientFactory ImportClientFactory
importClientFactory importClientFactory

metrics *metric.Common
writeLimiter StoreWriteLimiter
Expand Down Expand Up @@ -557,7 +557,7 @@ func NewBackend(
return
}
if importClientFactory != nil {
importClientFactory.Close()
importClientFactory.close()
}
if pdHTTPCli != nil {
pdHTTPCli.Close()
Expand Down Expand Up @@ -692,7 +692,7 @@ func (local *Backend) TotalMemoryConsume() int64 {
return local.engineMgr.totalMemoryConsume()
}

func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, importClientFactory ImportClientFactory) (bool, error) {
func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, factory importClientFactory) (bool, error) {
stores, err := pdCli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -720,7 +720,7 @@ func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, importClientF
return false, ctx.Err()
}
}
client, err1 := importClientFactory.Create(ctx, s.Id)
client, err1 := factory.create(ctx, s.Id)
if err1 != nil {
err = err1
log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
Expand Down Expand Up @@ -782,7 +782,7 @@ func (local *Backend) tikvSideCheckFreeSpace(ctx context.Context) {
// Close the local backend.
func (local *Backend) Close() {
local.engineMgr.close()
local.importClientFactory.Close()
local.importClientFactory.close()

_ = local.tikvCli.Close()
local.pdHTTPCli.Close()
Expand Down Expand Up @@ -820,7 +820,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
}

func (local *Backend) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) {
return local.importClientFactory.Create(ctx, storeID)
return local.importClientFactory.create(ctx, storeID)
}

func splitRangeBySizeProps(fullRange common.Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []common.Range {
Expand Down
4 changes: 2 additions & 2 deletions pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ type mockImportClientFactory struct {
apiInvokeRecorder map[string][]uint64
}

func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst.ImportSSTClient, error) {
func (f *mockImportClientFactory) create(_ context.Context, storeID uint64) (sst.ImportSSTClient, error) {
for _, store := range f.stores {
if store.Id == storeID {
return f.createClientFn(store), nil
Expand All @@ -806,7 +806,7 @@ func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst
return nil, fmt.Errorf("store %d not found", storeID)
}

func (f *mockImportClientFactory) Close() {}
func (f *mockImportClientFactory) close() {}

func TestMultiIngest(t *testing.T) {
allStores := []*metapb.Store{
Expand Down
6 changes: 3 additions & 3 deletions pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
},
}
for _, peer := range region.GetPeers() {
cli, err := clientFactory.Create(ctx, peer.StoreId)
cli, err := clientFactory.create(ctx, peer.StoreId)
if err != nil {
return annotateErr(err, peer, "when create client")
}
Expand Down Expand Up @@ -664,7 +664,7 @@ func (local *Backend) checkWriteStall(
) (bool, *sst.IngestResponse, error) {
clientFactory := local.importClientFactory
for _, peer := range region.Region.GetPeers() {
cli, err := clientFactory.Create(ctx, peer.StoreId)
cli, err := clientFactory.create(ctx, peer.StoreId)
if err != nil {
return false, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -744,7 +744,7 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe
"region id %d has no leader", j.region.Region.Id)
}

cli, err := clientFactory.Create(ctx, leader.StoreId)
cli, err := clientFactory.create(ctx, leader.StoreId)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
32 changes: 28 additions & 4 deletions pkg/lightning/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "tikv",
srcs = ["tikv.go"],
srcs = [
"local_sst_writer.go",
"prop_collector.go",
"tikv.go",
],
importpath = "github.com/pingcap/tidb/pkg/lightning/tikv",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -13,7 +17,14 @@ go_library(
"//pkg/lightning/config",
"//pkg/lightning/log",
"//pkg/meta/model",
"//pkg/util/codec",
"//pkg/util/intest",
"@com_github_coreos_go_semver//semver",
"@com_github_lance6716_pebble//:pebble",
"@com_github_lance6716_pebble//bloom",
"@com_github_lance6716_pebble//objstorage/objstorageprovider",
"@com_github_lance6716_pebble//sstable",
"@com_github_lance6716_pebble//vfs",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/debugpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
Expand All @@ -32,15 +43,28 @@ go_library(
go_test(
name = "tikv_test",
timeout = "short",
srcs = ["tikv_test.go"],
srcs = [
"local_sst_writer_test.go",
"tikv_test.go",
],
embed = [":tikv"],
flaky = True,
shard_count = 4,
shard_count = 7,
deps = [
":tikv",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_coreos_go_semver//semver",
"@com_github_google_uuid//:uuid",
"@com_github_lance6716_pebble//sstable",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
],
)
Loading

0 comments on commit 0e8c8da

Please sign in to comment.