Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ go_library(
"//pkg/util/set",
"//pkg/util/size",
"//pkg/util/slice",
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/stringutil",
Expand Down Expand Up @@ -368,6 +369,7 @@ go_test(
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/testkit/testutil",
"//pkg/tici",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func createTiCIIndexes(jobCtx *jobContext, schemaName string, tblInfo *model.Tab
if !index.IsTiCIIndex() {
continue
}
if err := tici.CreateFulltextIndex(ctx, jobCtx.store, tblInfo, index, schemaName); err != nil {
if err := tici.CreateFulltextIndex(ctx, jobCtx.store, tblInfo, index, schemaName, nil); err != nil {
return err
}
}
Expand Down
120 changes: 120 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4775,6 +4775,10 @@ func (e *executor) createFulltextIndex(ctx sessionctx.Context, ti ast.Ident, ind
job.Type = model.ActionAddFullTextIndex
indexPartSpecifications[0].Expr = nil

if err := e.captureFullTextIndexSysvarsToJob(ctx, job, indexOption); err != nil {
return errors.Trace(err)
}

args := &model.ModifyIndexArgs{
IndexArgs: []*model.IndexArg{{
IndexName: indexName,
Expand Down Expand Up @@ -4982,6 +4986,12 @@ func (e *executor) createColumnarIndex(ctx sessionctx.Context, ti ast.Ident, ind
// indexPartSpecifications[0].Expr can not be unmarshaled, so we set it to nil.
indexPartSpecifications[0].Expr = nil

if columnarIndexType == model.ColumnarIndexTypeFulltext {
if err := e.captureFullTextIndexSysvarsToJob(ctx, job, indexOption); err != nil {
return errors.Trace(err)
}
}

// TODO: support CDCWriteSource

args := &model.ModifyIndexArgs{
Expand All @@ -5005,6 +5015,116 @@ func (e *executor) createColumnarIndex(ctx sessionctx.Context, ti ast.Ident, ind
return errors.Trace(err)
}

func (e *executor) captureFullTextIndexSysvarsToJob(sctx sessionctx.Context, job *model.Job, indexOption *ast.IndexOption) error {
parser := model.FullTextParserTypeStandardV1
if indexOption != nil && indexOption.ParserName.L != "" {
parser = model.GetFullTextParserTypeBySQLName(indexOption.ParserName.L)
}

sessVars := sctx.GetSessionVars()
getVar := func(name string) (string, error) {
val, err := sessVars.GetSessionOrGlobalSystemVar(context.Background(), name)
return val, errors.Trace(err)
}

maxTokenSize, err := getVar(vardef.InnodbFtMaxTokenSize)
if err != nil {
return err
}
minTokenSize, err := getVar(vardef.InnodbFtMinTokenSize)
if err != nil {
return err
}
ngramTokenSize, err := getVar(vardef.NgramTokenSize)
if err != nil {
return err
}
enableStopword, err := getVar(vardef.InnodbFtEnableStopword)
if err != nil {
return err
}
serverStopwordTable, err := getVar(vardef.InnodbFtServerStopwordTable)
if err != nil {
return err
}
userStopwordTable, err := getVar(vardef.InnodbFtUserStopwordTable)
if err != nil {
return err
}

// Validate token size constraints early.
if parser == model.FullTextParserTypeStandardV1 {
minVal, err := strconv.ParseInt(minTokenSize, 10, 64)
if err != nil {
return errors.Trace(err)
}
maxVal, err := strconv.ParseInt(maxTokenSize, 10, 64)
if err != nil {
return errors.Trace(err)
}
if minVal > maxVal {
return variable.ErrWrongValueForVar.GenWithStackByArgs(vardef.InnodbFtMinTokenSize, minTokenSize)
}
}

// Validate stopword table schema if it will be used by standard parser.
if parser == model.FullTextParserTypeStandardV1 && variable.TiDBOptOn(enableStopword) {
stopwordTable := strings.TrimSpace(userStopwordTable)
stopwordTableVar := vardef.InnodbFtUserStopwordTable
if stopwordTable == "" {
stopwordTable = strings.TrimSpace(serverStopwordTable)
stopwordTableVar = vardef.InnodbFtServerStopwordTable
}
if stopwordTable != "" {
dbName, tblName, ok := splitFullTextStopwordTableName(stopwordTable)
if !ok {
return variable.ErrWrongValueForVar.GenWithStackByArgs(stopwordTableVar, stopwordTable)
}
is := e.infoCache.GetLatest()
tbl, err := is.TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tblName))
if err != nil {
return errors.Trace(err)
}
tblInfo := tbl.Meta()
// TiDB row-store tables are treated as InnoDB-compatible. TiFlash-only tables are not.
if tblInfo.IsColumnar {
return dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table must be an InnoDB table")
}
if len(tblInfo.Columns) != 1 || tblInfo.Columns[0].Name.L != "value" || tblInfo.Columns[0].FieldType.GetType() != mysql.TypeVarchar {
return dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table must contain a single VARCHAR column named value")
}
}
}

job.AddSystemVars(vardef.InnodbFtMaxTokenSize, maxTokenSize)
job.AddSystemVars(vardef.InnodbFtMinTokenSize, minTokenSize)
job.AddSystemVars(vardef.NgramTokenSize, ngramTokenSize)
job.AddSystemVars(vardef.InnodbFtEnableStopword, enableStopword)
job.AddSystemVars(vardef.InnodbFtServerStopwordTable, serverStopwordTable)
job.AddSystemVars(vardef.InnodbFtUserStopwordTable, userStopwordTable)
return nil
}

func splitFullTextStopwordTableName(raw string) (dbName string, tblName string, ok bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", "", false
}
parts := strings.Split(raw, "/")
if len(parts) != 2 {
parts = strings.Split(raw, ".")
}
if len(parts) != 2 {
return "", "", false
}
dbName = strings.TrimSpace(parts[0])
tblName = strings.TrimSpace(parts[1])
if dbName == "" || tblName == "" {
return "", "", false
}
return dbName, tblName, true
}

func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job {
charset, collate := ctx.GetSessionVars().GetCharsetInfo()
job := &model.Job{
Expand Down
156 changes: 153 additions & 3 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -1588,7 +1589,7 @@ func checkAndBuildIndexInfo(
return indexInfo, nil
}

func onCreateFulltextIndex(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
func (w *worker) onCreateFulltextIndex(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropIndex(jobCtx, job)
Expand Down Expand Up @@ -1664,7 +1665,15 @@ func onCreateFulltextIndex(jobCtx *jobContext, job *model.Job) (ver int64, err e
if err != nil {
return ver, errors.Trace(err)
}
err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName)
parserInfo, err := w.buildTiCIFulltextParserInfo(jobCtx, job, indexInfo)
if err != nil {
if !isRetryableJobError(err, job.ErrorCount) {
return convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err)
}
return ver, errors.Trace(err)
}

err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName, parserInfo)
if err != nil {
if !isRetryableJobError(err, job.ErrorCount) {
return convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err)
Expand Down Expand Up @@ -1808,7 +1817,7 @@ func (w *worker) onCreateHybridIndex(jobCtx *jobContext, job *model.Job) (ver in
// so job.SnapshotVer will be captured and propagated
// through dist-task metadata.
if !job.ReorgMeta.TiCIIndexCreated {
err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName)
err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName, nil)
if err != nil {
if !isRetryableJobError(err, job.ErrorCount) {
return convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err)
Expand Down Expand Up @@ -1975,6 +1984,15 @@ func (w *worker) onCreateColumnarIndex(jobCtx *jobContext, job *model.Job) (ver
if err != nil {
return ver, errors.Trace(err)
}
if indexInfo.FullTextInfo != nil {
parserInfo, err := w.buildTiCIFulltextParserInfo(jobCtx, job, indexInfo)
if err != nil {
return ver, errors.Trace(err)
}
if err := tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName, parserInfo); err != nil {
return ver, errors.Trace(err)
}
}
job.SnapshotVer = currVer.Ver
return ver, nil
}
Expand Down Expand Up @@ -2115,6 +2133,138 @@ func (w *worker) checkColumnarIndexProcessOnce(jobCtx *jobContext, tbl table.Tab
return true, notAddedIndexCnt, addedIndexCnt, nil
}

const (
// maxFullTextStopwordCount limits how many stopwords can be read from a stopword table per FULLTEXT index creation.
// It is a safety limit to avoid excessive memory usage / oversized TiCI requests.
maxFullTextStopwordCount = 10000
// maxFullTextStopwordBytes limits total stopword payload bytes per FULLTEXT index creation.
maxFullTextStopwordBytes = 1 << 20 // 1MiB
)

func (w *worker) buildTiCIFulltextParserInfo(jobCtx *jobContext, job *model.Job, indexInfo *model.IndexInfo) (*tici.ParserInfo, error) {
getJobSysVar := func(name, fallback string) string {
if val, ok := job.GetSystemVars(name); ok {
return val
}
return fallback
}

if indexInfo == nil || indexInfo.FullTextInfo == nil {
return nil, errors.New("missing fulltext info")
}

var parserType tici.ParserType
parserParams := make(map[string]string, 8)
switch indexInfo.FullTextInfo.ParserType {
case model.FullTextParserTypeStandardV1:
parserType = tici.ParserType_DEFAULT_PARSER
parserParams["parser_name"] = "standard"
parserParams[vardef.InnodbFtMinTokenSize] = getJobSysVar(vardef.InnodbFtMinTokenSize, "3")
parserParams[vardef.InnodbFtMaxTokenSize] = getJobSysVar(vardef.InnodbFtMaxTokenSize, "84")
case model.FullTextParserTypeMultilingualV1, model.FullTextParserTypeNgramV1:
// Multilingual parser is currently treated as an n-gram based tokenizer.
parserType = tici.ParserType_OTHER_PARSER
if indexInfo.FullTextInfo.ParserType == model.FullTextParserTypeMultilingualV1 {
parserParams["parser_name"] = "multilingual"
} else {
parserParams["parser_name"] = "ngram"
}
parserParams[vardef.NgramTokenSize] = getJobSysVar(vardef.NgramTokenSize, "2")
default:
parserType = tici.ParserType_OTHER_PARSER
parserParams["parser_name"] = indexInfo.FullTextInfo.ParserType.SQLName()
}

enableStopword := getJobSysVar(vardef.InnodbFtEnableStopword, vardef.On)
parserParams[vardef.InnodbFtEnableStopword] = enableStopword
parserParams[vardef.InnodbFtServerStopwordTable] = getJobSysVar(vardef.InnodbFtServerStopwordTable, "")
parserParams[vardef.InnodbFtUserStopwordTable] = getJobSysVar(vardef.InnodbFtUserStopwordTable, "")

var stopWords []string
if indexInfo.FullTextInfo.ParserType == model.FullTextParserTypeStandardV1 && variable.TiDBOptOn(enableStopword) {
stopwordTable := strings.TrimSpace(parserParams[vardef.InnodbFtUserStopwordTable])
if stopwordTable == "" {
stopwordTable = strings.TrimSpace(parserParams[vardef.InnodbFtServerStopwordTable])
}
if stopwordTable != "" {
dbName, tblName, ok := splitFullTextStopwordTableName(stopwordTable)
if !ok {
return nil, errors.New("invalid stopword table name")
}
stopwords, err := w.readFullTextStopwords(jobCtx, dbName, tblName)
if err != nil {
return nil, errors.Trace(err)
}
stopWords = stopwords
}
}

return &tici.ParserInfo{
ParserType: parserType,
ParserParams: parserParams,
StopWords: stopWords,
}, nil
}

func (w *worker) readFullTextStopwords(jobCtx *jobContext, dbName, tblName string) (stopwords []string, err error) {
const label = "ddl_read_fulltext_stopwords"
startTime := time.Now()
defer func() {
metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

var sb strings.Builder
sqlescape.MustFormatSQL(&sb, "SELECT `value` FROM %n.%n", dbName, tblName)

ctx := jobCtx.stepCtx
if ctx.Value(kv.RequestSourceKey) == nil {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
}
rs, err := w.sess.Context.GetSQLExecutor().ExecuteInternal(ctx, sb.String())
if err != nil {
return nil, errors.Trace(err)
}
if rs == nil {
return nil, nil
}
defer terror.Call(rs.Close)

stopwords = make([]string, 0, 64)
seen := make(map[string]struct{}, 64)
totalBytes := 0

req := rs.NewChunk(nil)
for {
if err := rs.Next(ctx, req); err != nil {
return nil, errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
iter := chunk.NewIterator4Chunk(req)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
if row.IsNull(0) {
continue
}
word := strings.TrimSpace(row.GetString(0))
if word == "" {
continue
}
if _, ok := seen[word]; ok {
continue
}
seen[word] = struct{}{}
stopwords = append(stopwords, word)
totalBytes += len(word)
if len(stopwords) > maxFullTextStopwordCount || totalBytes > maxFullTextStopwordBytes {
return nil, dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table is too large")
}
}
req = chunk.Renew(req, 1024)
}
return stopwords, nil
}

func (w *worker) onCreateIndex(jobCtx *jobContext, job *model.Job, isPK bool) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
Expand Down
Loading