From 0e8c8dadce1517dae6ea67ae799c74ee80f37099 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 6 Nov 2024 18:09:03 +0800 Subject: [PATCH] lightning: test a simple case that golang outputs same SST (#57061) ref pingcap/tidb#57105 --- DEPS.bzl | 13 ++ go.mod | 1 + go.sum | 2 + pkg/lightning/backend/local/duplicate.go | 14 +- pkg/lightning/backend/local/local.go | 28 +-- pkg/lightning/backend/local/local_test.go | 4 +- pkg/lightning/backend/local/region_job.go | 6 +- pkg/lightning/tikv/BUILD.bazel | 32 ++- pkg/lightning/tikv/local_sst_writer.go | 115 +++++++++ pkg/lightning/tikv/local_sst_writer_test.go | 247 ++++++++++++++++++++ pkg/lightning/tikv/prop_collector.go | 232 ++++++++++++++++++ 11 files changed, 664 insertions(+), 30 deletions(-) create mode 100644 pkg/lightning/tikv/local_sst_writer.go create mode 100644 pkg/lightning/tikv/local_sst_writer_test.go create mode 100644 pkg/lightning/tikv/prop_collector.go diff --git a/DEPS.bzl b/DEPS.bzl index 9efd4d92159ab..74f9c36ae3bd0 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4576,6 +4576,19 @@ def go_deps(): "https://storage.googleapis.com/pingcapmirror/gomod/github.com/labstack/gommon/com_github_labstack_gommon-v0.4.0.zip", ], ) + go_repository( + name = "com_github_lance6716_pebble", + build_file_proto_mode = "disable_global", + importpath = "github.com/lance6716/pebble", + sha256 = "fad807913eaf5ff14651929475dda5e5c6f6838dbaeef11b7eddefe771f01f1e", + strip_prefix = "github.com/lance6716/pebble@v0.0.0-20241104073946-6f55c09bd183", + urls = [ + "http://bazel-cache.pingcap.net:8080/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip", + "http://ats.apps.svc/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip", + "https://cache.hawkingrei.com/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/lance6716/pebble/com_github_lance6716_pebble-v0.0.0-20241104073946-6f55c09bd183.zip", + ], + ) go_repository( name = "com_github_lasiar_canonicalheader", build_file_proto_mode = "disable_global", diff --git a/go.mod b/go.mod index cd7e4c39b9a39..110335258504d 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( github.com/kisielk/errcheck v1.7.0 github.com/klauspost/compress v1.17.9 github.com/ks3sdklib/aws-sdk-go v1.2.9 + github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183 github.com/lestrrat-go/jwx/v2 v2.0.21 github.com/mgechev/revive v1.4.0 github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7 diff --git a/go.sum b/go.sum index 119d5b3d8a3ca..631799d088f9b 100644 --- a/go.sum +++ b/go.sum @@ -558,6 +558,8 @@ github.com/ks3sdklib/aws-sdk-go v1.2.9 h1:Eg0fM56r4Gjp9PiK1Bg9agJUxCAWCk236qq9DI github.com/ks3sdklib/aws-sdk-go v1.2.9/go.mod h1:xBNbOrxSnd36AQpZ8o99mGGu+blblUd9rI0MKGmeufo= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183 h1:CrFmpCAT5PGMgmQadTa2lXZrjuvpknONB1/pyxiyDsM= +github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183/go.mod h1:ZxnWA3Ab0ufDIyppyzL16j6HFNpdXeiU/1cE4Wlv/lQ= github.com/lestrrat-go/blackmagic v1.0.2 h1:Cg2gVSc9h7sz9NOByczrbUvLopQmXrfFx//N+AkAr5k= github.com/lestrrat-go/blackmagic v1.0.2/go.mod h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU= github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= diff --git a/pkg/lightning/backend/local/duplicate.go b/pkg/lightning/backend/local/duplicate.go index 658e63b7b6b27..c77a77f1b8653 100644 --- a/pkg/lightning/backend/local/duplicate.go +++ b/pkg/lightning/backend/local/duplicate.go @@ -307,7 +307,7 @@ func getDupDetectClient( ctx context.Context, region *split.RegionInfo, keyRange tidbkv.KeyRange, - importClientFactory ImportClientFactory, + importClientFactory importClientFactory, resourceGroupName string, taskType string, minCommitTS uint64, @@ -317,7 +317,7 @@ func getDupDetectClient( return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, "region id %d has no leader", region.Region.Id) } - importClient, err := importClientFactory.Create(ctx, leader.GetStoreId()) + importClient, err := importClientFactory.create(ctx, leader.GetStoreId()) if err != nil { return nil, errors.Trace(err) } @@ -348,7 +348,7 @@ func NewRemoteDupKVStream( ctx context.Context, region *split.RegionInfo, keyRange tidbkv.KeyRange, - importClientFactory ImportClientFactory, + importClientFactory importClientFactory, resourceGroupName string, taskType string, minCommitTS uint64, @@ -902,7 +902,7 @@ func (m *dupeDetector) processRemoteDupTaskOnce( ctx context.Context, task dupTask, logger log.Logger, - importClientFactory ImportClientFactory, + importClientFactory importClientFactory, regionPool *util.WorkerPool, remainKeyRanges *pendingKeyRanges, algorithm config.DuplicateResolutionAlgorithm, @@ -980,7 +980,7 @@ func (m *dupeDetector) processRemoteDupTask( ctx context.Context, task dupTask, logger log.Logger, - importClientFactory ImportClientFactory, + importClientFactory importClientFactory, regionPool *util.WorkerPool, algorithm config.DuplicateResolutionAlgorithm, ) error { @@ -1026,7 +1026,7 @@ func (m *dupeDetector) processRemoteDupTask( // records all duplicate row info into errorMgr. func (m *dupeDetector) collectDuplicateRowsFromTiKV( ctx context.Context, - importClientFactory ImportClientFactory, + importClientFactory importClientFactory, algorithm config.DuplicateResolutionAlgorithm, ) error { tasks, err := m.buildDupTasks() @@ -1070,7 +1070,7 @@ type DupeController struct { dupeConcurrency int duplicateDB *pebble.DB keyAdapter common.KeyAdapter - importClientFactory ImportClientFactory + importClientFactory importClientFactory resourceGroupName string taskType string } diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 8ff151aebc821..3f935b654cc7b 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -114,10 +114,10 @@ var ( MaxWriteAndIngestRetryTimes = 30 ) -// ImportClientFactory is factory to create new import client for specific store. -type ImportClientFactory interface { - Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) - Close() +// importClientFactory is factory to create new import client for specific store. +type importClientFactory interface { + create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) + close() } type importClientFactoryImpl struct { @@ -209,8 +209,8 @@ func (f *importClientFactoryImpl) getGrpcConn(ctx context.Context, storeID uint6 }) } -// Create creates a new import client for specific store. -func (f *importClientFactoryImpl) Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { +// create creates a new import client for specific store. +func (f *importClientFactoryImpl) create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { conn, err := f.getGrpcConn(ctx, storeID) if err != nil { return nil, err @@ -218,8 +218,8 @@ func (f *importClientFactoryImpl) Create(ctx context.Context, storeID uint64) (s return sst.NewImportSSTClient(conn), nil } -// Close closes the factory. -func (f *importClientFactoryImpl) Close() { +// close closes the factory. +func (f *importClientFactoryImpl) close() { f.conns.Close() } @@ -511,7 +511,7 @@ type Backend struct { engineMgr *engineManager supportMultiIngest bool - importClientFactory ImportClientFactory + importClientFactory importClientFactory metrics *metric.Common writeLimiter StoreWriteLimiter @@ -557,7 +557,7 @@ func NewBackend( return } if importClientFactory != nil { - importClientFactory.Close() + importClientFactory.close() } if pdHTTPCli != nil { pdHTTPCli.Close() @@ -692,7 +692,7 @@ func (local *Backend) TotalMemoryConsume() int64 { return local.engineMgr.totalMemoryConsume() } -func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, importClientFactory ImportClientFactory) (bool, error) { +func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, factory importClientFactory) (bool, error) { stores, err := pdCli.GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { return false, errors.Trace(err) @@ -720,7 +720,7 @@ func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, importClientF return false, ctx.Err() } } - client, err1 := importClientFactory.Create(ctx, s.Id) + client, err1 := factory.create(ctx, s.Id) if err1 != nil { err = err1 log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address)) @@ -782,7 +782,7 @@ func (local *Backend) tikvSideCheckFreeSpace(ctx context.Context) { // Close the local backend. func (local *Backend) Close() { local.engineMgr.close() - local.importClientFactory.Close() + local.importClientFactory.close() _ = local.tikvCli.Close() local.pdHTTPCli.Close() @@ -820,7 +820,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig } func (local *Backend) getImportClient(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) { - return local.importClientFactory.Create(ctx, storeID) + return local.importClientFactory.create(ctx, storeID) } func splitRangeBySizeProps(fullRange common.Range, sizeProps *sizeProperties, sizeLimit int64, keysLimit int64) []common.Range { diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index f3165293df939..35dd73dd36363 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -797,7 +797,7 @@ type mockImportClientFactory struct { apiInvokeRecorder map[string][]uint64 } -func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst.ImportSSTClient, error) { +func (f *mockImportClientFactory) create(_ context.Context, storeID uint64) (sst.ImportSSTClient, error) { for _, store := range f.stores { if store.Id == storeID { return f.createClientFn(store), nil @@ -806,7 +806,7 @@ func (f *mockImportClientFactory) Create(_ context.Context, storeID uint64) (sst return nil, fmt.Errorf("store %d not found", storeID) } -func (f *mockImportClientFactory) Close() {} +func (f *mockImportClientFactory) close() {} func TestMultiIngest(t *testing.T) { allStores := []*metapb.Store{ diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 6be89f1feeb70..14f1c9a8191e5 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -408,7 +408,7 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { }, } for _, peer := range region.GetPeers() { - cli, err := clientFactory.Create(ctx, peer.StoreId) + cli, err := clientFactory.create(ctx, peer.StoreId) if err != nil { return annotateErr(err, peer, "when create client") } @@ -664,7 +664,7 @@ func (local *Backend) checkWriteStall( ) (bool, *sst.IngestResponse, error) { clientFactory := local.importClientFactory for _, peer := range region.Region.GetPeers() { - cli, err := clientFactory.Create(ctx, peer.StoreId) + cli, err := clientFactory.create(ctx, peer.StoreId) if err != nil { return false, nil, errors.Trace(err) } @@ -744,7 +744,7 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe "region id %d has no leader", j.region.Region.Id) } - cli, err := clientFactory.Create(ctx, leader.StoreId) + cli, err := clientFactory.create(ctx, leader.StoreId) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/lightning/tikv/BUILD.bazel b/pkg/lightning/tikv/BUILD.bazel index 1d36be8bd7072..cae38c320e978 100644 --- a/pkg/lightning/tikv/BUILD.bazel +++ b/pkg/lightning/tikv/BUILD.bazel @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "tikv", - srcs = ["tikv.go"], + srcs = [ + "local_sst_writer.go", + "prop_collector.go", + "tikv.go", + ], importpath = "github.com/pingcap/tidb/pkg/lightning/tikv", visibility = ["//visibility:public"], deps = [ @@ -13,7 +17,14 @@ go_library( "//pkg/lightning/config", "//pkg/lightning/log", "//pkg/meta/model", + "//pkg/util/codec", + "//pkg/util/intest", "@com_github_coreos_go_semver//semver", + "@com_github_lance6716_pebble//:pebble", + "@com_github_lance6716_pebble//bloom", + "@com_github_lance6716_pebble//objstorage/objstorageprovider", + "@com_github_lance6716_pebble//sstable", + "@com_github_lance6716_pebble//vfs", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/debugpb", "@com_github_pingcap_kvproto//pkg/import_sstpb", @@ -32,15 +43,28 @@ go_library( go_test( name = "tikv_test", timeout = "short", - srcs = ["tikv_test.go"], + srcs = [ + "local_sst_writer_test.go", + "tikv_test.go", + ], + embed = [":tikv"], flaky = True, - shard_count = 4, + shard_count = 7, deps = [ - ":tikv", + "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//sstable", + "@com_github_cockroachdb_pebble//vfs", "@com_github_coreos_go_semver//semver", + "@com_github_google_uuid//:uuid", + "@com_github_lance6716_pebble//sstable", + "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//http", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//credentials/insecure", ], ) diff --git a/pkg/lightning/tikv/local_sst_writer.go b/pkg/lightning/tikv/local_sst_writer.go new file mode 100644 index 0000000000000..73ee591ca17ca --- /dev/null +++ b/pkg/lightning/tikv/local_sst_writer.go @@ -0,0 +1,115 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "encoding/binary" + + rocks "github.com/lance6716/pebble" + rocksbloom "github.com/lance6716/pebble/bloom" + "github.com/lance6716/pebble/objstorage/objstorageprovider" + rockssst "github.com/lance6716/pebble/sstable" + "github.com/lance6716/pebble/vfs" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/intest" +) + +// writeCFWriter generates SST files for TiKV's write column family. +type writeCFWriter struct { + sstWriter *rockssst.Writer + ts uint64 +} + +// newWriteCFWriter creates a new writeCFWriter. Currently `identity` is acquired +// from a real SST file generated by TiKV. +func newWriteCFWriter( + sstPath string, + ts uint64, + identity *rockssst.Identity, +) (*writeCFWriter, error) { + f, err := vfs.Default.Create(sstPath) + if err != nil { + return nil, errors.Trace(err) + } + writable := objstorageprovider.NewFileWritable(f) + writer := rockssst.NewWriter(writable, rockssst.WriterOptions{ + // TODO(lance6716): should read TiKV config to know compression algorithm. + Compression: rocks.ZstdCompression, + // TODO(lance6716): should check the behaviour is the exactly same. + FilterPolicy: rocksbloom.FilterPolicy(10), + MergerName: "nullptr", + TablePropertyCollectors: []func() rockssst.TablePropertyCollector{ + func() rockssst.TablePropertyCollector { + return newMVCCPropCollector(ts) + }, + func() rockssst.TablePropertyCollector { + return newRangePropertiesCollector() + }, + // titan is only triggered when SST compaction at TiKV side. + func() rockssst.TablePropertyCollector { + return mockCollector{name: "BlobFileSizeCollector"} + }, + }, + }, identity) + return &writeCFWriter{sstWriter: writer, ts: ts}, nil +} + +// set mimic TiKV's TxnSstWriter logic to encode key and value and write to SST. +func (w *writeCFWriter) set(key, value []byte) error { + intest.Assert(isShortValue(value), "not implemented, need to write to default CF") + + // key layout in this case: + // z{mem-comparable encoded key}{bit-wise reversed TS} + actualKey := make([]byte, 0, 1+codec.EncodedBytesLength(len(key))+8) + // keys::data_key will add the 'z' prefix [1] at `TxnSstWriter.put` [2]. + // + // [1] https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/keys/src/lib.rs#L206 + // [2] https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/sst_importer/src/sst_writer.rs#L92 + actualKey = append(actualKey, 'z') + // Key::from_raw [3] will encode the key as bytes at `TxnSstWriter.write` [4], + // which is the caller of `TxnSstWriter.put` [2]. + // + // [3] https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/txn_types/src/types.rs#L55 + // [4] https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/sst_importer/src/sst_writer.rs#L74 + actualKey = codec.EncodeBytes(actualKey, key) + // Key::append_ts [5] will append the bit-wise reverted ts at + // `TxnSstWriter.write` [4]. + // + // [5] https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/txn_types/src/types.rs#L118 + actualKey = binary.BigEndian.AppendUint64(actualKey, ^w.ts) + + // value layout in this case: + // P{varint-encoded TS}v{value length}{value} + actualValue := make([]byte, 0, 1+binary.MaxVarintLen64+1+1+len(value)) + // below logic can be found at `WriteRef.to_bytes` [6]. + // + // [6] https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/txn_types/src/write.rs#L362 + actualValue = append(actualValue, 'P') + actualValue = binary.AppendUvarint(actualValue, w.ts) + actualValue = append(actualValue, 'v') + actualValue = append(actualValue, byte(len(value))) + actualValue = append(actualValue, value...) + + return errors.Trace(w.sstWriter.Set(actualKey, actualValue)) +} + +func (w *writeCFWriter) close() error { + return errors.Trace(w.sstWriter.Close()) +} + +func isShortValue(val []byte) bool { + return len(val) <= 255 +} diff --git a/pkg/lightning/tikv/local_sst_writer_test.go b/pkg/lightning/tikv/local_sst_writer_test.go new file mode 100644 index 0000000000000..522d541e1cb4d --- /dev/null +++ b/pkg/lightning/tikv/local_sst_writer_test.go @@ -0,0 +1,247 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "context" + "encoding/json" + "strings" + "testing" + + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/vfs" + "github.com/google/uuid" + rockssst "github.com/lance6716/pebble/sstable" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +var ( + sortedKVs = [][2][]byte{ + {[]byte("a"), []byte("1")}, + } + ts uint64 = 1 +) + +// write2ImportService4Test writes these sorted key-value pairs to the TiKV +// cluster. SST files are generated by TiKV and saved in import directory if no +// error happens. +func write2ImportService4Test( + ctx context.Context, + pdAddrs []string, + sortedKVs [][2][]byte, + ts uint64, +) ([]*import_sstpb.SSTMeta, error) { + pdClient, err := pd.NewClient(pdAddrs, pd.SecurityOption{}) + if err != nil { + return nil, errors.Trace(err) + } + defer pdClient.Close() + + r0, err := pdClient.GetRegion(ctx, sortedKVs[0][0]) + if err != nil { + return nil, errors.Trace(err) + } + r1, err := pdClient.GetRegion(ctx, sortedKVs[len(sortedKVs)-1][0]) + if err != nil { + return nil, errors.Trace(err) + } + if r0.Meta.Id != r1.Meta.Id { + return nil, errors.Errorf( + "only support write to the same region, "+ + "first key: %X, last key: %X, "+ + "first region id: %d, last region id: %d", + sortedKVs[0][0], sortedKVs[len(sortedKVs)-1][0], + r0.Meta.Id, r1.Meta.Id, + ) + } + + store, err := pdClient.GetStore(ctx, r0.Leader.GetStoreId()) + if err != nil { + return nil, errors.Trace(err) + } + conn, err := grpc.DialContext( + ctx, store.GetAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + if err != nil { + return nil, errors.Trace(err) + } + defer conn.Close() + + ingestClient := import_sstpb.NewImportSSTClient(conn) + writeStream, err := ingestClient.Write(ctx) + if err != nil { + return nil, errors.Trace(err) + } + u := uuid.New() + writeMeta := &import_sstpb.SSTMeta{ + Uuid: u[:], + RegionId: r0.Meta.Id, + RegionEpoch: r0.Meta.RegionEpoch, + Range: &import_sstpb.Range{ + Start: sortedKVs[0][0], + End: sortedKVs[len(sortedKVs)-1][0], + }, + } + rpcCtx := kvrpcpb.Context{ + RegionId: r0.Meta.Id, + RegionEpoch: r0.Meta.RegionEpoch, + Peer: r0.Leader, + } + err = writeStream.Send(&import_sstpb.WriteRequest{ + Chunk: &import_sstpb.WriteRequest_Meta{Meta: writeMeta}, + Context: &rpcCtx, + }) + if err != nil { + return nil, errors.Trace(err) + } + + batch := &import_sstpb.WriteBatch{ + CommitTs: ts, + Pairs: make([]*import_sstpb.Pair, 0, len(sortedKVs)), + } + for _, kv := range sortedKVs { + batch.Pairs = append(batch.Pairs, &import_sstpb.Pair{ + Key: kv[0], + Value: kv[1], + }) + } + err = writeStream.Send(&import_sstpb.WriteRequest{ + Chunk: &import_sstpb.WriteRequest_Batch{Batch: batch}, + Context: &rpcCtx, + }) + if err != nil { + return nil, errors.Trace(err) + } + resp, err := writeStream.CloseAndRecv() + if err != nil { + return nil, errors.Trace(err) + } + if resp.GetError() != nil { + return nil, errors.Errorf("write failed: %s", resp.GetError()) + } + return resp.Metas, nil +} + +func TestGRPCWriteToTiKV(t *testing.T) { + t.Skip(`This is a manual test. You can use tiup playground and run this test. +After the test is finished, find the SST files in the import directory of the TiKV node.`) + + ctx := context.Background() + pdAddrs := []string{"127.0.0.1:2379"} + + metas, err := write2ImportService4Test(ctx, pdAddrs, sortedKVs, ts) + require.NoError(t, err) + for _, meta := range metas { + t.Logf("meta UUID: %v", uuid.UUID(meta.Uuid).String()) + } +} + +func TestPebbleWriteSST(t *testing.T) { + sstPath := "/tmp/test-write.sst" + writer, err := newWriteCFWriter(sstPath, ts, &rockssst.Identity{ + DB: "SST Writer", + Host: "lance6716-nuc10i7fnh", + Session: "DS38NDUWK5HLG8SSL5M7", + OriginalFileNumber: 1, + }) + require.NoError(t, err) + + for _, kv := range sortedKVs { + err = writer.set(kv[0], kv[1]) + require.NoError(t, err) + } + + err = writer.close() + require.NoError(t, err) + + f, err := vfs.Default.Open(sstPath) + require.NoError(t, err) + readable, err := rockssst.NewSimpleReadable(f) + require.NoError(t, err) + reader, err := rockssst.NewReader(readable, rockssst.ReaderOptions{}) + require.NoError(t, err) + defer reader.Close() + + layout, err := reader.Layout() + require.NoError(t, err) + + infos := layout.BlockInfos(reader) + expected := ` +[ + {"Offset":0,"Length":42,"Name":"data","Compression":0,"Checksum":2258416982}, + {"Offset":121,"Length":39,"Name":"index","Compression":0,"Checksum":3727189474}, + {"Offset":165,"Length":1253,"Name":"properties","Compression":0,"Checksum":561778464}, + {"Offset":1423,"Length":79,"Name":"meta-index","Compression":0,"Checksum":955781521}, + {"Offset":1507,"Length":53,"Name":"footer","Compression":0,"Checksum":0} +]` + var expectedInfos []*rockssst.BlockInfo + err = json.Unmarshal([]byte(expected), &expectedInfos) + require.NoError(t, err) + require.Equal(t, expectedInfos, infos) +} + +func TestDebugReadSST(t *testing.T) { + t.Skip("this is a manual test") + + sstPath := "/tmp/test-write.sst" + t.Logf("read sst: %s", sstPath) + f, err := vfs.Default.Open(sstPath) + require.NoError(t, err) + readable, err := sstable.NewSimpleReadable(f) + require.NoError(t, err) + reader, err := sstable.NewReader(readable, sstable.ReaderOptions{}) + require.NoError(t, err) + defer reader.Close() + + layout, err := reader.Layout() + require.NoError(t, err) + + content := &strings.Builder{} + layout.Describe(content, true, reader, nil) + + t.Logf("layout:\n %s", content.String()) + t.Logf("properties:\n %s", reader.Properties.String()) + + iter, err := reader.NewIter(nil, nil) + require.NoError(t, err) + defer iter.Close() + + k, v := iter.First() + if k == nil { + return + } + getValue := func(v pebble.LazyValue) []byte { + realV, _, err2 := v.Value(nil) + require.NoError(t, err2) + return realV + } + t.Logf("key: %X\nvalue: %X", k.UserKey, getValue(v)) + for { + k, v = iter.Next() + if k == nil { + break + } + t.Logf("key: %X\nvalue: %X", k.UserKey, getValue(v)) + } +} diff --git a/pkg/lightning/tikv/prop_collector.go b/pkg/lightning/tikv/prop_collector.go new file mode 100644 index 0000000000000..bed4be968a11b --- /dev/null +++ b/pkg/lightning/tikv/prop_collector.go @@ -0,0 +1,232 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "bytes" + "encoding/binary" + "slices" + + rockssst "github.com/lance6716/pebble/sstable" +) + +type mockCollector struct { + name string +} + +var _ rockssst.TablePropertyCollector = mockCollector{} + +// Add implements the TablePropertyCollector interface. +func (mockCollector) Add(rockssst.InternalKey, []byte) error { + return nil +} + +// Finish implements the TablePropertyCollector interface. +func (mockCollector) Finish(map[string]string) error { + return nil +} + +// Name implements the TablePropertyCollector interface. +func (m mockCollector) Name() string { + return m.name +} + +type indexHandleKV struct { + key []byte + size uint64 + offset uint64 +} + +// mvccPropCollector is a specialized version of TiKV's `MvccPropertiesCollector`. +type mvccPropCollector struct { + props struct { + minTS uint64 + maxTS uint64 + numRows uint64 + numPuts uint64 + numDeletes uint64 + numVersions uint64 + maxRowVersions uint64 + ttl struct { + maxExpireTS *uint64 + minExpireTS *uint64 + } + } + lastRow []byte + curIndexSize uint64 + curIndexOffset uint64 + rowIndexHandles []indexHandleKV +} + +var _ rockssst.TablePropertyCollector = (*mvccPropCollector)(nil) + +func newMVCCPropCollector(ts uint64) *mvccPropCollector { + ret := &mvccPropCollector{} + ret.props.minTS = ts + ret.props.maxTS = ts + ret.props.maxRowVersions = 1 + return ret +} + +// Add implements the TablePropertyCollector interface. It mimics +// https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/engine_rocks/src/properties.rs#L407. +func (m *mvccPropCollector) Add(key rockssst.InternalKey, _ []byte) error { + m.props.numVersions++ + m.props.numRows++ + m.props.numPuts++ + + m.curIndexSize++ + m.curIndexOffset++ + m.lastRow = key.UserKey[:len(key.UserKey)-8] + if m.curIndexOffset == 1 || m.curIndexSize >= 10000 { + m.rowIndexHandles = append(m.rowIndexHandles, indexHandleKV{ + key: m.lastRow, + size: m.curIndexSize, + offset: m.curIndexOffset, + }) + m.curIndexSize = 0 + } + + return nil +} + +// Finish implements the TablePropertyCollector interface. It mimics +// https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/engine_rocks/src/properties.rs#L505. +func (m *mvccPropCollector) Finish(userProps map[string]string) error { + if m.curIndexSize > 0 { + m.rowIndexHandles = append(m.rowIndexHandles, indexHandleKV{ + key: m.lastRow, + size: m.curIndexSize, + offset: m.curIndexOffset, + }) + } + + userProps["tikv.min_ts"] = string(binary.BigEndian.AppendUint64(nil, m.props.minTS)) + userProps["tikv.max_ts"] = string(binary.BigEndian.AppendUint64(nil, m.props.maxTS)) + userProps["tikv.num_rows"] = string(binary.BigEndian.AppendUint64(nil, m.props.numRows)) + userProps["tikv.num_puts"] = string(binary.BigEndian.AppendUint64(nil, m.props.numPuts)) + userProps["tikv.num_deletes"] = string(binary.BigEndian.AppendUint64(nil, m.props.numDeletes)) + userProps["tikv.num_versions"] = string(binary.BigEndian.AppendUint64(nil, m.props.numVersions)) + userProps["tikv.max_row_versions"] = string(binary.BigEndian.AppendUint64(nil, m.props.maxRowVersions)) + if m.props.ttl.maxExpireTS != nil { + userProps["tikv.max_expire_ts"] = string(binary.BigEndian.AppendUint64(nil, *m.props.ttl.maxExpireTS)) + } + if m.props.ttl.minExpireTS != nil { + userProps["tikv.min_expire_ts"] = string(binary.BigEndian.AppendUint64(nil, *m.props.ttl.minExpireTS)) + } + userProps["tikv.num_errors"] = string(binary.BigEndian.AppendUint64(nil, 0)) + + slices.SortFunc(m.rowIndexHandles, func(i, j indexHandleKV) int { + return bytes.Compare(i.key, j.key) + }) + buf := make([]byte, 0, 1024) + for _, handle := range m.rowIndexHandles { + buf = binary.BigEndian.AppendUint64(buf, uint64(len(handle.key))) + buf = append(buf, handle.key...) + buf = binary.BigEndian.AppendUint64(buf, handle.size) + buf = binary.BigEndian.AppendUint64(buf, handle.offset) + } + userProps["tikv.rows_index"] = string(buf) + return nil +} + +// Name implements the TablePropertyCollector interface. +func (*mvccPropCollector) Name() string { + return "tikv.mvcc-properties-collector" +} + +type rangeOffsets struct { + size uint64 + keyCount uint64 +} + +type rangeProperty struct { + key []byte + rangeOffsets +} + +type rangeProperties []rangeProperty + +// encode encodes the range properties into a byte slice. +func (r rangeProperties) encode() []byte { + b := make([]byte, 0, 1024) + for _, p := range r { + b = binary.BigEndian.AppendUint64(b, uint64(len(p.key))) + b = append(b, p.key...) + b = binary.BigEndian.AppendUint64(b, p.size) + b = binary.BigEndian.AppendUint64(b, p.keyCount) + } + return b +} + +// rangePropertiesCollector is a specialized version of TiKV's `RangePropertiesCollector`. +type rangePropertiesCollector struct { + props rangeProperties + lastOffsets rangeOffsets + lastKey []byte + currentOffsets rangeOffsets + propSizeIdxDistance uint64 + propKeysIdxDistance uint64 +} + +func newRangePropertiesCollector() *rangePropertiesCollector { + return &rangePropertiesCollector{ + props: make([]rangeProperty, 0, 1024), + propSizeIdxDistance: 4 * 1024 * 1024, + propKeysIdxDistance: 40 * 1024, + } +} + +func (c *rangePropertiesCollector) sizeInLastRange() uint64 { + return c.currentOffsets.size - c.lastOffsets.size +} + +func (c *rangePropertiesCollector) keysInLastRange() uint64 { + return c.currentOffsets.keyCount - c.lastOffsets.keyCount +} + +func (c *rangePropertiesCollector) insertNewPoint(key []byte) { + c.lastOffsets = c.currentOffsets + c.props = append(c.props, rangeProperty{key: append([]byte{}, key...), rangeOffsets: c.currentOffsets}) +} + +// Add implements the TablePropertyCollector interface. It mimics +// https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/engine_rocks/src/properties.rs#L329. +func (c *rangePropertiesCollector) Add(key rockssst.InternalKey, value []byte) error { + c.currentOffsets.size += uint64(len(value)) + uint64(len(key.UserKey)) + c.currentOffsets.keyCount++ + if len(c.lastKey) == 0 || c.sizeInLastRange() >= c.propSizeIdxDistance || + c.keysInLastRange() >= c.propKeysIdxDistance { + c.insertNewPoint(key.UserKey) + } + c.lastKey = append(c.lastKey[:0], key.UserKey...) + return nil +} + +// Finish implements the TablePropertyCollector interface. It mimics +// https://github.com/tikv/tikv/blob/7793f1d5dc40206fe406ca001be1e0d7f1b83a8f/components/engine_rocks/src/properties.rs#L349. +func (c *rangePropertiesCollector) Finish(userProps map[string]string) error { + if c.sizeInLastRange() > 0 || c.keysInLastRange() > 0 { + c.insertNewPoint(c.lastKey) + } + + userProps["tikv.range_index"] = string(c.props.encode()) + return nil +} + +// Name implements the TablePropertyCollector interface. +func (*rangePropertiesCollector) Name() string { + return "tikv.range-properties-collector" +}