Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix admin check return wrong result for index containing generated columns #57152

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 51 additions & 54 deletions pkg/executor/check_table_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,61 +352,41 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non
w.e.BaseExecutor.ReleaseSysSession(ctx, se)
}()

tblMeta := w.table.Meta()
tblName := TableName(w.e.dbName, tblMeta.Name.String())

var pkCols []string
var pkTypes []*types.FieldType
switch {
case w.e.table.Meta().IsCommonHandle:
pkColsInfo := w.e.table.Meta().GetPrimaryKey().Columns
case tblMeta.IsCommonHandle:
pkColsInfo := tblMeta.GetPrimaryKey().Columns
for _, colInfo := range pkColsInfo {
colStr := colInfo.Name.O
pkCols = append(pkCols, colStr)
pkTypes = append(pkTypes, &w.e.table.Meta().Columns[colInfo.Offset].FieldType)
pkCols = append(pkCols, ColumnName(colInfo.Name.O))
pkTypes = append(pkTypes, &tblMeta.Columns[colInfo.Offset].FieldType)
}
case w.e.table.Meta().PKIsHandle:
pkCols = append(pkCols, w.e.table.Meta().GetPkName().O)
case tblMeta.PKIsHandle:
pkCols = append(pkCols, ColumnName(tblMeta.GetPkName().O))
default: // support decoding _tidb_rowid.
pkCols = append(pkCols, model.ExtraHandleName.O)
pkCols = append(pkCols, ColumnName(model.ExtraHandleName.O))
}
handleColumns := strings.Join(pkCols, ",")

// CheckSum of (handle + index columns).
var md5HandleAndIndexCol strings.Builder
md5HandleAndIndexCol.WriteString("crc32(md5(concat_ws(0x2, ")
for _, col := range pkCols {
md5HandleAndIndexCol.WriteString(ColumnName(col))
md5HandleAndIndexCol.WriteString(", ")
}
for offset, col := range idxInfo.Columns {
tblCol := w.table.Meta().Columns[col.Offset]
if tblCol.IsGenerated() && !tblCol.GeneratedStored {
md5HandleAndIndexCol.WriteString(tblCol.GeneratedExprString)
indexColNames := make([]string, len(idxInfo.Columns))
for i, col := range idxInfo.Columns {
tblCol := tblMeta.Columns[col.Offset]
if tblCol.IsVirtualGenerated() && tblCol.Hidden {
indexColNames[i] = tblCol.GeneratedExprString
} else {
md5HandleAndIndexCol.WriteString(ColumnName(col.Name.O))
}
if offset != len(idxInfo.Columns)-1 {
md5HandleAndIndexCol.WriteString(", ")
indexColNames[i] = ColumnName(col.Name.O)
}
}
md5HandleAndIndexCol.WriteString(")))")
indexColumns := strings.Join(indexColNames, ",")

// Used to group by and order.
var md5Handle strings.Builder
md5Handle.WriteString("crc32(md5(concat_ws(0x2, ")
for i, col := range pkCols {
md5Handle.WriteString(ColumnName(col))
if i != len(pkCols)-1 {
md5Handle.WriteString(", ")
}
}
md5Handle.WriteString(")))")
// CheckSum of (handle + index columns).
md5HandleAndIndexCol := fmt.Sprintf("crc32(md5(concat_ws(0x2, %s, %s)))", handleColumns, indexColumns)

handleColumnField := strings.Join(pkCols, ", ")
var indexColumnField strings.Builder
for offset, col := range idxInfo.Columns {
indexColumnField.WriteString(ColumnName(col.Name.O))
if offset != len(idxInfo.Columns)-1 {
indexColumnField.WriteString(", ")
}
}
// Used to group by and order.
md5Handle := fmt.Sprintf("crc32(md5(concat_ws(0x2, %s)))", handleColumns)

tableRowCntToCheck := int64(0)

Expand Down Expand Up @@ -437,17 +417,28 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non
logutil.BgLogger().Warn("compare checksum by group reaches time limit", zap.Int("times", times))
break
}
whereKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle.String(), offset, mod)
groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) div %d %% %d)", md5Handle.String(), offset, mod, bucketSize)
whereKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle, offset, mod)
groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) div %d %% %d)", md5Handle, offset, mod, bucketSize)
if !checkOnce {
whereKey = "0"
}
checkOnce = true

tblQuery := fmt.Sprintf("select /*+ read_from_storage(tikv[%s]) */ bit_xor(%s), %s, count(*) from %s use index() where %s = 0 group by %s", TableName(w.e.dbName, w.e.table.Meta().Name.String()), md5HandleAndIndexCol.String(), groupByKey, TableName(w.e.dbName, w.e.table.Meta().Name.String()), whereKey, groupByKey)
idxQuery := fmt.Sprintf("select bit_xor(%s), %s, count(*) from %s use index(`%s`) where %s = 0 group by %s", md5HandleAndIndexCol.String(), groupByKey, TableName(w.e.dbName, w.e.table.Meta().Name.String()), idxInfo.Name, whereKey, groupByKey)

logutil.BgLogger().Info("fast check table by group", zap.String("table name", w.table.Meta().Name.String()), zap.String("index name", idxInfo.Name.String()), zap.Int("times", times), zap.Int("current offset", offset), zap.Int("current mod", mod), zap.String("table sql", tblQuery), zap.String("index sql", idxQuery))
tblQuery := fmt.Sprintf(
"select /*+ read_from_storage(tikv[%s]) */ bit_xor(%s), %s, count(*) from %s use index() where %s = 0 group by %s",
tblName, md5HandleAndIndexCol, groupByKey, tblName, whereKey, groupByKey)
idxQuery := fmt.Sprintf(
"select bit_xor(%s), %s, count(*) from %s use index(`%s`) where %s = 0 group by %s",
md5HandleAndIndexCol, groupByKey, tblName, idxInfo.Name, whereKey, groupByKey)

logutil.BgLogger().Info(
"fast check table by group",
zap.String("table name", tblMeta.Name.String()),
zap.String("index name", idxInfo.Name.String()),
zap.Int("times", times),
zap.Int("current offset", offset), zap.Int("current mod", mod),
zap.String("table sql", tblQuery), zap.String("index sql", idxQuery),
)

// compute table side checksum.
tableChecksum, err := getCheckSum(w.e.contextCtx, se, tblQuery)
Expand Down Expand Up @@ -502,7 +493,9 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non

if !meetError {
if times != 1 {
logutil.BgLogger().Error("unexpected result, no error detected in this round, but an error is detected in the previous round", zap.Int("times", times), zap.Int("offset", offset), zap.Int("mod", mod))
logutil.BgLogger().Error(
"unexpected result, no error detected in this round, but an error is detected in the previous round",
zap.Int("times", times), zap.Int("offset", offset), zap.Int("mod", mod))
}
break
}
Expand All @@ -528,9 +521,13 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non
}

if meetError {
groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle.String(), offset, mod)
indexSQL := fmt.Sprintf("select %s, %s, %s from %s use index(`%s`) where %s = 0 order by %s", handleColumnField, indexColumnField.String(), md5HandleAndIndexCol.String(), TableName(w.e.dbName, w.e.table.Meta().Name.String()), idxInfo.Name, groupByKey, handleColumnField)
tableSQL := fmt.Sprintf("select /*+ read_from_storage(tikv[%s]) */ %s, %s, %s from %s use index() where %s = 0 order by %s", TableName(w.e.dbName, w.e.table.Meta().Name.String()), handleColumnField, indexColumnField.String(), md5HandleAndIndexCol.String(), TableName(w.e.dbName, w.e.table.Meta().Name.String()), groupByKey, handleColumnField)
groupByKey := fmt.Sprintf("((cast(%s as signed) - %d) %% %d)", md5Handle, offset, mod)
indexSQL := fmt.Sprintf(
"select %s, %s, %s from %s use index(`%s`) where %s = 0 order by %s",
handleColumns, indexColumns, md5HandleAndIndexCol, tblName, idxInfo.Name, groupByKey, handleColumns)
tableSQL := fmt.Sprintf(
"select /*+ read_from_storage(tikv[%s]) */ %s, %s, %s from %s use index() where %s = 0 order by %s",
tblName, handleColumns, indexColumns, md5HandleAndIndexCol, tblName, groupByKey, handleColumns)

idxRow, err := queryToRow(se, indexSQL)
if err != nil {
Expand Down Expand Up @@ -562,7 +559,7 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non
getValueFromRow := func(row chunk.Row) ([]types.Datum, error) {
valueDatum := make([]types.Datum, 0)
for i, t := range idxInfo.Columns {
valueDatum = append(valueDatum, row.GetDatum(i+len(pkCols), &w.table.Meta().Columns[t.Offset].FieldType))
valueDatum = append(valueDatum, row.GetDatum(i+len(pkCols), &tblMeta.Columns[t.Offset].FieldType))
}
return valueDatum, nil
}
Expand Down Expand Up @@ -590,7 +587,7 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask, _ func(workerpool.Non
}
return k
},
Tbl: w.table.Meta(),
Tbl: tblMeta,
Idx: idxInfo,
EnableRedactLog: w.sctx.GetSessionVars().EnableRedactLog,
Storage: w.sctx.GetStore(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/admintest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 23,
shard_count = 24,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
42 changes: 42 additions & 0 deletions pkg/executor/test/admintest/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2098,3 +2098,45 @@ func TestAdminCheckGlobalIndexDuringDDL(t *testing.T) {
}
}
}

func TestAdminCheckGeneratedColumns(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("DROP TABLE IF EXISTS t")
tk.MustExec("CREATE TABLE t(pk int PRIMARY KEY CLUSTERED, val int, gen int GENERATED ALWAYS AS (val * pk) VIRTUAL, KEY idx_gen(gen))")
tk.MustExec("INSERT INTO t(pk, val) VALUES (2, 5)")
tk.MustExec("ADMIN CHECK TABLE t")

// Make some corrupted index. Build the index information.
sctx := mock.NewContext()
sctx.Store = store
ctx := sctx.GetTableCtx()
is := domain.InfoSchema()
dbName := pmodel.NewCIStr("test")
tblName := pmodel.NewCIStr("t")
tbl, err := is.TableByName(context.Background(), dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
tk.Session().GetSessionVars().IndexLookupSize = 3
tk.Session().GetSessionVars().MaxChunkSize = 3

// Simulate inconsistent index column
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
txn, err := store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(ctx, txn, types.MakeDatums(10), kv.IntHandle(2))
require.NoError(t, err)
_, err = indexOpr.Create(ctx, txn, types.MakeDatums(5), kv.IntHandle(2), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)

for _, enabled := range []bool{false, true} {
tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled))
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
}
}