From 36bef0a5fa3f462769fa1c8478ce861f5f7ed0d0 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Wed, 6 Nov 2024 11:06:40 +0800 Subject: [PATCH 1/3] Fix check on generated columns --- pkg/executor/check_table_index.go | 105 +++++++++++----------- pkg/executor/test/admintest/admin_test.go | 40 +++++++++ 2 files changed, 91 insertions(+), 54 deletions(-) diff --git a/pkg/executor/check_table_index.go b/pkg/executor/check_table_index.go index 6b7d176739939..6f0187377c22b 100644 --- a/pkg/executor/check_table_index.go +++ b/pkg/executor/check_table_index.go @@ -352,61 +352,41 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non w.e.BaseExecutor.ReleaseSysSession(ctx, se) }() + tblMeta := w.table.Meta() + tblName := TableName(w.e.dbName, tblMeta.Name.String()) + var pkCols []string var pkTypes []*types.FieldType switch { - case w.e.table.Meta().IsCommonHandle: - pkColsInfo := w.e.table.Meta().GetPrimaryKey().Columns + case tblMeta.IsCommonHandle: + pkColsInfo := tblMeta.GetPrimaryKey().Columns for _, colInfo := range pkColsInfo { - colStr := colInfo.Name.O - pkCols = append(pkCols, colStr) - pkTypes = append(pkTypes, &w.e.table.Meta().Columns[colInfo.Offset].FieldType) + pkCols = append(pkCols, ColumnName(colInfo.Name.O)) + pkTypes = append(pkTypes, &tblMeta.Columns[colInfo.Offset].FieldType) } - case w.e.table.Meta().PKIsHandle: - pkCols = append(pkCols, w.e.table.Meta().GetPkName().O) + case tblMeta.PKIsHandle: + pkCols = append(pkCols, ColumnName(tblMeta.GetPkName().O)) default: // support decoding _tidb_rowid. - pkCols = append(pkCols, model.ExtraHandleName.O) + pkCols = append(pkCols, ColumnName(model.ExtraHandleName.O)) } + handleColumns := strings.Join(pkCols, ",") - // CheckSum of (handle + index columns). - var md5HandleAndIndexCol strings.Builder - md5HandleAndIndexCol.WriteString("crc32(md5(concat_ws(0x2, ") - for _, col := range pkCols { - md5HandleAndIndexCol.WriteString(ColumnName(col)) - md5HandleAndIndexCol.WriteString(", ") - } - for offset, col := range idxInfo.Columns { - tblCol := w.table.Meta().Columns[col.Offset] - if tblCol.IsGenerated() && !tblCol.GeneratedStored { - md5HandleAndIndexCol.WriteString(tblCol.GeneratedExprString) + indexColNames := make([]string, len(idxInfo.Columns)) + for i, col := range idxInfo.Columns { + tblCol := tblMeta.Columns[col.Offset] + if tblCol.IsVirtualGenerated() && tblCol.Hidden { + indexColNames[i] = tblCol.GeneratedExprString } else { - md5HandleAndIndexCol.WriteString(ColumnName(col.Name.O)) - } - if offset != len(idxInfo.Columns)-1 { - md5HandleAndIndexCol.WriteString(", ") + indexColNames[i] = ColumnName(col.Name.O) } } - md5HandleAndIndexCol.WriteString(")))") + indexColumns := strings.Join(indexColNames, ",") - // Used to group by and order. - var md5Handle strings.Builder - md5Handle.WriteString("crc32(md5(concat_ws(0x2, ") - for i, col := range pkCols { - md5Handle.WriteString(ColumnName(col)) - if i != len(pkCols)-1 { - md5Handle.WriteString(", ") - } - } - md5Handle.WriteString(")))") + // CheckSum of (handle + index columns). + md5HandleAndIndexCol := fmt.Sprintf("crc32(md5(concat_ws(0x2, %s, %s)))", handleColumns, indexColumns) - handleColumnField := strings.Join(pkCols, ", ") - var indexColumnField strings.Builder - for offset, col := range idxInfo.Columns { - indexColumnField.WriteString(ColumnName(col.Name.O)) - if offset != len(idxInfo.Columns)-1 { - indexColumnField.WriteString(", ") - } - } + // Used to group by and order. + md5Handle := fmt.Sprintf("crc32(md5(concat_ws(0x2, %s)))", handleColumns) tableRowCntToCheck := int64(0) @@ -437,17 +417,28 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non logutil.BgLogger().Warn("compare checksum by group reaches time limit", zap.Int("times", times)) break } - whereKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle.String(), offset, mod) - groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) div %d %% %d)", md5Handle.String(), offset, mod, bucketSize) + whereKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle, offset, mod) + groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) div %d %% %d)", md5Handle, offset, mod, bucketSize) if !checkOnce { whereKey = "0" } checkOnce = true - tblQuery := fmt.Sprintf("select /*+ read_from_storage(tikv[%s]) */ bit_xor(%s), %s, count(*) from %s use index() where %s = 0 group by %s", TableName(w.e.dbName, w.e.table.Meta().Name.String()), md5HandleAndIndexCol.String(), groupByKey, TableName(w.e.dbName, w.e.table.Meta().Name.String()), whereKey, groupByKey) - idxQuery := fmt.Sprintf("select bit_xor(%s), %s, count(*) from %s use index(`%s`) where %s = 0 group by %s", md5HandleAndIndexCol.String(), groupByKey, TableName(w.e.dbName, w.e.table.Meta().Name.String()), idxInfo.Name, whereKey, groupByKey) - - logutil.BgLogger().Info("fast check table by group", zap.String("table name", w.table.Meta().Name.String()), zap.String("index name", idxInfo.Name.String()), zap.Int("times", times), zap.Int("current offset", offset), zap.Int("current mod", mod), zap.String("table sql", tblQuery), zap.String("index sql", idxQuery)) + tblQuery := fmt.Sprintf( + "select /*+ read_from_storage(tikv[%s]) */ bit_xor(%s), %s, count(*) from %s use index() where %s = 0 group by %s", + tblName, md5HandleAndIndexCol, groupByKey, tblName, whereKey, groupByKey) + idxQuery := fmt.Sprintf( + "select bit_xor(%s), %s, count(*) from %s use index(`%s`) where %s = 0 group by %s", + md5HandleAndIndexCol, groupByKey, tblName, idxInfo.Name, whereKey, groupByKey) + + logutil.BgLogger().Info( + "fast check table by group", + zap.String("table name", tblMeta.Name.String()), + zap.String("index name", idxInfo.Name.String()), + zap.Int("times", times), + zap.Int("current offset", offset), zap.Int("current mod", mod), + zap.String("table sql", tblQuery), zap.String("index sql", idxQuery), + ) // compute table side checksum. tableChecksum, err := getCheckSum(w.e.contextCtx, se, tblQuery) @@ -502,7 +493,9 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non if !meetError { if times != 1 { - logutil.BgLogger().Error("unexpected result, no error detected in this round, but an error is detected in the previous round", zap.Int("times", times), zap.Int("offset", offset), zap.Int("mod", mod)) + logutil.BgLogger().Error( + "unexpected result, no error detected in this round, but an error is detected in the previous round", + zap.Int("times", times), zap.Int("offset", offset), zap.Int("mod", mod)) } break } @@ -528,9 +521,13 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non } if meetError { - groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle.String(), offset, mod) - indexSQL := fmt.Sprintf("select %s, %s, %s from %s use index(`%s`) where %s = 0 order by %s", handleColumnField, indexColumnField.String(), md5HandleAndIndexCol.String(), TableName(w.e.dbName, w.e.table.Meta().Name.String()), idxInfo.Name, groupByKey, handleColumnField) - tableSQL := fmt.Sprintf("select /*+ read_from_storage(tikv[%s]) */ %s, %s, %s from %s use index() where %s = 0 order by %s", TableName(w.e.dbName, w.e.table.Meta().Name.String()), handleColumnField, indexColumnField.String(), md5HandleAndIndexCol.String(), TableName(w.e.dbName, w.e.table.Meta().Name.String()), groupByKey, handleColumnField) + groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle, offset, mod) + indexSQL := fmt.Sprintf( + "select %s, %s, %s from %s use index(`%s`) where %s = 0 order by %s", + handleColumns, indexColumns, md5HandleAndIndexCol, tblName, idxInfo.Name, groupByKey, handleColumns) + tableSQL := fmt.Sprintf( + "select /*+ read_from_storage(tikv[%s]) */ %s, %s, %s from %s use index() where %s = 0 order by %s", + tblName, handleColumns, indexColumns, md5HandleAndIndexCol, tblName, groupByKey, handleColumns) idxRow, err := queryToRow(se, indexSQL) if err != nil { @@ -562,7 +559,7 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non getValueFromRow := func(row chunk.Row) ([]types.Datum, error) { valueDatum := make([]types.Datum, 0) for i, t := range idxInfo.Columns { - valueDatum = append(valueDatum, row.GetDatum(i+len(pkCols), &w.table.Meta().Columns[t.Offset].FieldType)) + valueDatum = append(valueDatum, row.GetDatum(i+len(pkCols), &tblMeta.Columns[t.Offset].FieldType)) } return valueDatum, nil } @@ -590,7 +587,7 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non } return k }, - Tbl: w.table.Meta(), + Tbl: tblMeta, Idx: idxInfo, EnableRedactLog: w.sctx.GetSessionVars().EnableRedactLog, Storage: w.sctx.GetStore(), diff --git a/pkg/executor/test/admintest/admin_test.go b/pkg/executor/test/admintest/admin_test.go index 5abeac81b11a0..15be6a382cc28 100644 --- a/pkg/executor/test/admintest/admin_test.go +++ b/pkg/executor/test/admintest/admin_test.go @@ -2098,3 +2098,43 @@ func TestAdminCheckGlobalIndexDuringDDL(t *testing.T) { } } } + +func TestAdminCheckGeneratedColumns(t *testing.T) { + store, domain := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("DROP TABLE IF EXISTS t") + tk.MustExec("CREATE TABLE t(pk int PRIMARY KEY CLUSTERED, gen int GENERATED ALWAYS AS (pk * pk) VIRTUAL, KEY idx_gen(gen))") + tk.MustExec("INSERT INTO t(pk) VALUES (2)") + tk.MustExec("ADMIN CHECK TABLE t") + + // Make some corrupted index. Build the index information. + sctx := mock.NewContext() + sctx.Store = store + ctx := sctx.GetTableCtx() + is := domain.InfoSchema() + dbName := pmodel.NewCIStr("test") + tblName := pmodel.NewCIStr("t") + tbl, err := is.TableByName(context.Background(), dbName, tblName) + require.NoError(t, err) + tblInfo := tbl.Meta() + idxInfo := tblInfo.Indices[0] + tk.Session().GetSessionVars().IndexLookupSize = 3 + tk.Session().GetSessionVars().MaxChunkSize = 3 + + indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo) + + txn, err := store.Begin() + require.NoError(t, err) + err = indexOpr.Delete(ctx, txn, types.MakeDatums(4), kv.IntHandle(2)) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + + for _, enabled := range []bool{false, true} { + tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled)) + err = tk.ExecToErr("admin check table t") + require.Error(t, err) + } +} From 10f5c7f09e2ebdd9634e350f413e2fb82f3a0eb5 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Wed, 6 Nov 2024 11:15:52 +0800 Subject: [PATCH 2/3] Update test --- pkg/executor/test/admintest/admin_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/executor/test/admintest/admin_test.go b/pkg/executor/test/admintest/admin_test.go index 15be6a382cc28..b97f3527dbe85 100644 --- a/pkg/executor/test/admintest/admin_test.go +++ b/pkg/executor/test/admintest/admin_test.go @@ -2105,8 +2105,8 @@ func TestAdminCheckGeneratedColumns(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("DROP TABLE IF EXISTS t") - tk.MustExec("CREATE TABLE t(pk int PRIMARY KEY CLUSTERED, gen int GENERATED ALWAYS AS (pk * pk) VIRTUAL, KEY idx_gen(gen))") - tk.MustExec("INSERT INTO t(pk) VALUES (2)") + tk.MustExec("CREATE TABLE t(pk int PRIMARY KEY CLUSTERED, val int, gen int GENERATED ALWAYS AS (val * pk) VIRTUAL, KEY idx_gen(gen))") + tk.MustExec("INSERT INTO t(pk, val) VALUES (2, 5)") tk.MustExec("ADMIN CHECK TABLE t") // Make some corrupted index. Build the index information. @@ -2123,11 +2123,13 @@ func TestAdminCheckGeneratedColumns(t *testing.T) { tk.Session().GetSessionVars().IndexLookupSize = 3 tk.Session().GetSessionVars().MaxChunkSize = 3 + // Simulate inconsistent index column indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo) - txn, err := store.Begin() require.NoError(t, err) - err = indexOpr.Delete(ctx, txn, types.MakeDatums(4), kv.IntHandle(2)) + err = indexOpr.Delete(ctx, txn, types.MakeDatums(10), kv.IntHandle(2)) + require.NoError(t, err) + _, err = indexOpr.Create(ctx, txn, types.MakeDatums(5), kv.IntHandle(2), nil) require.NoError(t, err) err = txn.Commit(context.Background()) require.NoError(t, err) From 7fe6cfec152ce6e102cb672f6de559f954005644 Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Wed, 6 Nov 2024 13:00:01 +0800 Subject: [PATCH 3/3] Fix bazel --- pkg/executor/test/admintest/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/executor/test/admintest/BUILD.bazel b/pkg/executor/test/admintest/BUILD.bazel index f7a58178265aa..f0794f6281482 100644 --- a/pkg/executor/test/admintest/BUILD.bazel +++ b/pkg/executor/test/admintest/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 23, + shard_count = 24, deps = [ "//pkg/config", "//pkg/ddl",