Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions br/pkg/restore/ingestrec/ingest_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,71 @@ func (i *IngestRecorder) IterateForeignKeys(f func(*ForeignKeyRecord) error) err
}
return nil
}

// RecorderState is a serializable snapshot of ingest recorder data.
type RecorderState struct {
Items map[int64]map[int64]IndexState `json:"items,omitempty"`
}

// IndexState is a minimal representation of an ingested index.
type IndexState struct {
IsPrimary bool `json:"is_primary,omitempty"`
}

// ExportState returns a snapshot of the ingest recorder state.
func (i *IngestRecorder) ExportState() *RecorderState {
if i == nil || len(i.items) == 0 {
return nil
}
state := &RecorderState{
Items: make(map[int64]map[int64]IndexState, len(i.items)),
}
for tableID, indexes := range i.items {
if len(indexes) == 0 {
continue
}
tableIndexes := make(map[int64]IndexState, len(indexes))
for indexID, info := range indexes {
if info == nil {
continue
}
tableIndexes[indexID] = IndexState{IsPrimary: info.IsPrimary}
}
if len(tableIndexes) > 0 {
state.Items[tableID] = tableIndexes
}
}
if len(state.Items) == 0 {
return nil
}
return state
}

// MergeState merges a snapshot into the ingest recorder.
func (i *IngestRecorder) MergeState(state *RecorderState) {
if i == nil || state == nil || len(state.Items) == 0 {
return
}
if i.items == nil {
i.items = make(map[int64]map[int64]*IngestIndexInfo)
}
for tableID, indexes := range state.Items {
if len(indexes) == 0 {
continue
}
tableIndexes, exists := i.items[tableID]
if !exists {
tableIndexes = make(map[int64]*IngestIndexInfo, len(indexes))
i.items[tableID] = tableIndexes
}
for indexID, info := range indexes {
if _, ok := tableIndexes[indexID]; ok {
continue
}
tableIndexes[indexID] = &IngestIndexInfo{
IsPrimary: info.IsPrimary,
Updated: false,
}
}
}
}
53 changes: 22 additions & 31 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,14 +1008,6 @@
Opts *storeapi.Options
}

type GetIDMapConfig struct {
// required
LoadSavedIDMap bool

// optional
TableMappingManager *stream.TableMappingManager
}

// GetBaseIDMapAndMerge get the id map from following ways
// 1. from previously saved id map if the same task has been running and built/saved id map already but failed later
// 2. from previous different task. A PiTR job might be split into multiple runs/tasks and each task only restores
Expand All @@ -1025,49 +1017,48 @@
hasFullBackupStorageConfig,
loadSavedIDMap bool,
logCheckpointMetaManager checkpoint.LogMetaManagerT,
tableMappingManager *stream.TableMappingManager,
) error {
) (*SegmentedPiTRState, error) {

Check failure on line 1020 in br/pkg/restore/log_client/client.go

View workflow job for this annotation

GitHub Actions / Bazel Crossbuild (ubuntu)

undefined: SegmentedPiTRState
var (
err error
dbMaps []*backuppb.PitrDBMap
dbReplaces map[stream.UpstreamID]*stream.DBReplace
err error
state *SegmentedPiTRState

Check failure on line 1023 in br/pkg/restore/log_client/client.go

View workflow job for this annotation

GitHub Actions / Bazel Crossbuild (ubuntu)

undefined: SegmentedPiTRState
dbMaps []*backuppb.PitrDBMap
)

// this is a retry, id map saved last time, load it from external storage
if loadSavedIDMap {
log.Info("try to load previously saved pitr id maps")
dbMaps, err = rc.loadSchemasMap(ctx, rc.restoreTS, logCheckpointMetaManager)
state, err = rc.loadSegmentedPiTRState(ctx, rc.restoreTS, logCheckpointMetaManager, true)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
if state != nil {
dbMaps = state.DbMaps
}
}

// a new task, but without full snapshot restore, tries to load
// schemas map whose `restore-ts`` is the task's `start-ts`.
if len(dbMaps) <= 0 && !hasFullBackupStorageConfig {
log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS))
dbMaps, err = rc.loadSchemasMap(ctx, rc.startTS, logCheckpointMetaManager)
state, err = rc.loadSegmentedPiTRState(ctx, rc.startTS, logCheckpointMetaManager, false)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
err := rc.validateNoTiFlashReplica()
if err != nil {
return errors.Trace(err)
if state != nil {
dbMaps = state.DbMaps
}
if len(dbMaps) > 0 {
if err := rc.validateNoTiFlashReplica(); err != nil {
return nil, errors.Trace(err)
}
}
}

if len(dbMaps) <= 0 && !hasFullBackupStorageConfig {
log.Error("no id maps found")
return errors.New("no base id map found from saved id or last restored PiTR")
}
dbReplaces = stream.FromDBMapProto(dbMaps)

stream.LogDBReplaceMap("base db replace info", dbReplaces)
if len(dbReplaces) != 0 {
tableMappingManager.SetFromPiTRIDMap()
tableMappingManager.MergeBaseDBReplace(dbReplaces)
return nil, errors.New("no base id map found from saved id or last restored PiTR")
}
return nil
return state, nil
}

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo {
Expand Down Expand Up @@ -1979,14 +1970,14 @@

func (rc *LogClient) SaveIdMapWithFailPoints(
ctx context.Context,
manager *stream.TableMappingManager,
state *SegmentedPiTRState,

Check failure on line 1973 in br/pkg/restore/log_client/client.go

View workflow job for this annotation

GitHub Actions / Bazel Crossbuild (ubuntu)

undefined: SegmentedPiTRState
logCheckpointMetaManager checkpoint.LogMetaManagerT,
) error {
failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) {
failpoint.Return(errors.New("failpoint: failed before id maps saved"))
})

if err := rc.saveIDMap(ctx, manager, logCheckpointMetaManager); err != nil {
if err := rc.SaveSegmentedPiTRState(ctx, state, logCheckpointMetaManager); err != nil {
return errors.Trace(err)
}

Expand Down
55 changes: 45 additions & 10 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1346,9 +1347,8 @@ func TestInitSchemasReplaceForDDL(t *testing.T) {
require.NoError(t, err)
err = stg.WriteFile(ctx, logclient.PitrIDMapsFilename(123, 1), []byte("123"))
require.NoError(t, err)
err = client.GetBaseIDMapAndMerge(ctx, false, false, nil, stream.NewTableMappingManager())
require.Error(t, err)
require.Contains(t, err.Error(), "proto: wrong")
_, err = client.GetBaseIDMapAndMerge(ctx, false, false, nil)
requireInvalidProtoError(t, err)
err = stg.DeleteFile(ctx, logclient.PitrIDMapsFilename(123, 1))
require.NoError(t, err)
}
Expand All @@ -1358,9 +1358,8 @@ func TestInitSchemasReplaceForDDL(t *testing.T) {
client.SetStorage(ctx, backend, nil)
err := stg.WriteFile(ctx, logclient.PitrIDMapsFilename(123, 2), []byte("123"))
require.NoError(t, err)
err = client.GetBaseIDMapAndMerge(ctx, false, true, nil, stream.NewTableMappingManager())
require.Error(t, err)
require.Contains(t, err.Error(), "proto: wrong")
_, err = client.GetBaseIDMapAndMerge(ctx, false, true, nil)
requireInvalidProtoError(t, err)
err = stg.DeleteFile(ctx, logclient.PitrIDMapsFilename(123, 2))
require.NoError(t, err)
}
Expand All @@ -1373,12 +1372,20 @@ func TestInitSchemasReplaceForDDL(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
client := logclient.TEST_NewLogClient(123, 1, 2, 1, s.Mock.Domain, se)
err = client.GetBaseIDMapAndMerge(ctx, false, true, nil, stream.NewTableMappingManager())
_, err = client.GetBaseIDMapAndMerge(ctx, false, true, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "no base id map found from saved id or last restored PiTR")
}
}

func requireInvalidProtoError(t *testing.T, err error) {
t.Helper()
require.Error(t, err)
errMsg := err.Error()
require.True(t, strings.Contains(errMsg, "proto") || strings.Contains(errMsg, "EOF"),
"unexpected error: %s", errMsg)
}

func downstreamID(upstreamID int64) int64 {
return upstreamID + 10000000
}
Expand Down Expand Up @@ -1446,8 +1453,30 @@ func TestPITRIDMap(t *testing.T) {
baseTableMappingManager := &stream.TableMappingManager{
DBReplaceMap: getDBMap(),
}
err = client.TEST_saveIDMap(ctx, baseTableMappingManager, nil)
tiflashItems := map[int64]model.TiFlashReplicaInfo{
1: {Count: 1, Available: true},
2: {Count: 2, LocationLabels: []string{"zone", "rack"}, AvailablePartitionIDs: []int64{3, 4}},
}
ingestState := &ingestrec.RecorderState{
Items: map[int64]map[int64]ingestrec.IndexState{
10: {
1: {IsPrimary: true},
2: {IsPrimary: false},
},
},
}
state := &logclient.SegmentedPiTRState{
DbMaps: baseTableMappingManager.ToProto(),
TiFlashItems: tiflashItems,
IngestRecorderState: ingestState,
}
err = client.TEST_saveIDMap(ctx, state, nil)
require.NoError(t, err)
loadedState, err := client.TEST_loadSegmentedPiTRState(ctx, 2, nil)
require.NoError(t, err)
require.NotNil(t, loadedState)
require.Equal(t, tiflashItems, loadedState.TiFlashItems)
require.Equal(t, ingestState, loadedState.IngestRecorderState)
newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1, nil)
require.NoError(t, err)
require.Nil(t, newSchemaReplaces)
Expand Down Expand Up @@ -1496,7 +1525,10 @@ func TestPITRIDMapOnStorage(t *testing.T) {
baseTableMappingManager := &stream.TableMappingManager{
DBReplaceMap: getDBMap(),
}
err = client.TEST_saveIDMap(ctx, baseTableMappingManager, nil)
state := &logclient.SegmentedPiTRState{
DbMaps: baseTableMappingManager.ToProto(),
}
err = client.TEST_saveIDMap(ctx, state, nil)
require.NoError(t, err)
newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1552,7 +1584,10 @@ func TestPITRIDMapOnCheckpointStorage(t *testing.T) {
baseTableMappingManager := &stream.TableMappingManager{
DBReplaceMap: getDBMap(),
}
err = client.TEST_saveIDMap(ctx, baseTableMappingManager, logCheckpointMetaManager)
state := &logclient.SegmentedPiTRState{
DbMaps: baseTableMappingManager.ToProto(),
}
err = client.TEST_saveIDMap(ctx, state, logCheckpointMetaManager)
require.NoError(t, err)
newSchemaReplaces, err := client.TEST_initSchemasMap(ctx, 1, logCheckpointMetaManager)
require.NoError(t, err)
Expand Down
22 changes: 18 additions & 4 deletions br/pkg/restore/log_client/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/br/pkg/checkpoint"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils/iter"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
Expand Down Expand Up @@ -66,18 +65,33 @@ func (m *PhysicalWithMigrations) Physical() *backuppb.DataFileGroup {

func (rc *LogClient) TEST_saveIDMap(
ctx context.Context,
m *stream.TableMappingManager,
state *SegmentedPiTRState,
logCheckpointMetaManager checkpoint.LogMetaManagerT,
) error {
return rc.SaveIdMapWithFailPoints(ctx, m, logCheckpointMetaManager)
return rc.SaveIdMapWithFailPoints(ctx, state, logCheckpointMetaManager)
}

func (rc *LogClient) TEST_initSchemasMap(
ctx context.Context,
restoreTS uint64,
logCheckpointMetaManager checkpoint.LogMetaManagerT,
) ([]*backuppb.PitrDBMap, error) {
return rc.loadSchemasMap(ctx, restoreTS, logCheckpointMetaManager)
state, err := rc.loadSegmentedPiTRState(ctx, restoreTS, logCheckpointMetaManager, true)
if err != nil {
return nil, err
}
if state == nil {
return nil, nil
}
return state.DbMaps, nil
}

func (rc *LogClient) TEST_loadSegmentedPiTRState(
ctx context.Context,
restoreTS uint64,
logCheckpointMetaManager checkpoint.LogMetaManagerT,
) (*SegmentedPiTRState, error) {
return rc.loadSegmentedPiTRState(ctx, restoreTS, logCheckpointMetaManager, true)
}

// readStreamMetaByTS is used for streaming task. collect all meta file by TS, it is for test usage.
Expand Down
Loading
Loading