diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 0677c2b77d876..530ae08dbbcab 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -188,6 +188,7 @@ go_library( "//pkg/util/set", "//pkg/util/size", "//pkg/util/slice", + "//pkg/util/sqlescape", "//pkg/util/sqlexec", "//pkg/util/sqlkiller", "//pkg/util/stringutil", @@ -368,6 +369,7 @@ go_test( "//pkg/testkit/testfailpoint", "//pkg/testkit/testsetup", "//pkg/testkit/testutil", + "//pkg/tici", "//pkg/types", "//pkg/util", "//pkg/util/chunk", diff --git a/pkg/ddl/create_table.go b/pkg/ddl/create_table.go index eb4287b26fc79..90b752166aa9c 100644 --- a/pkg/ddl/create_table.go +++ b/pkg/ddl/create_table.go @@ -218,7 +218,7 @@ func createTiCIIndexes(jobCtx *jobContext, schemaName string, tblInfo *model.Tab if !index.IsTiCIIndex() { continue } - if err := tici.CreateFulltextIndex(ctx, jobCtx.store, tblInfo, index, schemaName); err != nil { + if err := tici.CreateFulltextIndex(ctx, jobCtx.store, tblInfo, index, schemaName, nil); err != nil { return err } } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index a0e5002f060d4..34a026265961d 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4775,6 +4775,10 @@ func (e *executor) createFulltextIndex(ctx sessionctx.Context, ti ast.Ident, ind job.Type = model.ActionAddFullTextIndex indexPartSpecifications[0].Expr = nil + if err := e.captureFullTextIndexSysvarsToJob(ctx, job, indexOption); err != nil { + return errors.Trace(err) + } + args := &model.ModifyIndexArgs{ IndexArgs: []*model.IndexArg{{ IndexName: indexName, @@ -4982,6 +4986,12 @@ func (e *executor) createColumnarIndex(ctx sessionctx.Context, ti ast.Ident, ind // indexPartSpecifications[0].Expr can not be unmarshaled, so we set it to nil. indexPartSpecifications[0].Expr = nil + if columnarIndexType == model.ColumnarIndexTypeFulltext { + if err := e.captureFullTextIndexSysvarsToJob(ctx, job, indexOption); err != nil { + return errors.Trace(err) + } + } + // TODO: support CDCWriteSource args := &model.ModifyIndexArgs{ @@ -5005,6 +5015,116 @@ func (e *executor) createColumnarIndex(ctx sessionctx.Context, ti ast.Ident, ind return errors.Trace(err) } +func (e *executor) captureFullTextIndexSysvarsToJob(sctx sessionctx.Context, job *model.Job, indexOption *ast.IndexOption) error { + parser := model.FullTextParserTypeStandardV1 + if indexOption != nil && indexOption.ParserName.L != "" { + parser = model.GetFullTextParserTypeBySQLName(indexOption.ParserName.L) + } + + sessVars := sctx.GetSessionVars() + getVar := func(name string) (string, error) { + val, err := sessVars.GetSessionOrGlobalSystemVar(context.Background(), name) + return val, errors.Trace(err) + } + + maxTokenSize, err := getVar(vardef.InnodbFtMaxTokenSize) + if err != nil { + return err + } + minTokenSize, err := getVar(vardef.InnodbFtMinTokenSize) + if err != nil { + return err + } + ngramTokenSize, err := getVar(vardef.NgramTokenSize) + if err != nil { + return err + } + enableStopword, err := getVar(vardef.InnodbFtEnableStopword) + if err != nil { + return err + } + serverStopwordTable, err := getVar(vardef.InnodbFtServerStopwordTable) + if err != nil { + return err + } + userStopwordTable, err := getVar(vardef.InnodbFtUserStopwordTable) + if err != nil { + return err + } + + // Validate token size constraints early. + if parser == model.FullTextParserTypeStandardV1 { + minVal, err := strconv.ParseInt(minTokenSize, 10, 64) + if err != nil { + return errors.Trace(err) + } + maxVal, err := strconv.ParseInt(maxTokenSize, 10, 64) + if err != nil { + return errors.Trace(err) + } + if minVal > maxVal { + return variable.ErrWrongValueForVar.GenWithStackByArgs(vardef.InnodbFtMinTokenSize, minTokenSize) + } + } + + // Validate stopword table schema if it will be used by standard parser. + if parser == model.FullTextParserTypeStandardV1 && variable.TiDBOptOn(enableStopword) { + stopwordTable := strings.TrimSpace(userStopwordTable) + stopwordTableVar := vardef.InnodbFtUserStopwordTable + if stopwordTable == "" { + stopwordTable = strings.TrimSpace(serverStopwordTable) + stopwordTableVar = vardef.InnodbFtServerStopwordTable + } + if stopwordTable != "" { + dbName, tblName, ok := splitFullTextStopwordTableName(stopwordTable) + if !ok { + return variable.ErrWrongValueForVar.GenWithStackByArgs(stopwordTableVar, stopwordTable) + } + is := e.infoCache.GetLatest() + tbl, err := is.TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tblName)) + if err != nil { + return errors.Trace(err) + } + tblInfo := tbl.Meta() + // TiDB row-store tables are treated as InnoDB-compatible. TiFlash-only tables are not. + if tblInfo.IsColumnar { + return dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table must be an InnoDB table") + } + if len(tblInfo.Columns) != 1 || tblInfo.Columns[0].Name.L != "value" || tblInfo.Columns[0].FieldType.GetType() != mysql.TypeVarchar { + return dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table must contain a single VARCHAR column named value") + } + } + } + + job.AddSystemVars(vardef.InnodbFtMaxTokenSize, maxTokenSize) + job.AddSystemVars(vardef.InnodbFtMinTokenSize, minTokenSize) + job.AddSystemVars(vardef.NgramTokenSize, ngramTokenSize) + job.AddSystemVars(vardef.InnodbFtEnableStopword, enableStopword) + job.AddSystemVars(vardef.InnodbFtServerStopwordTable, serverStopwordTable) + job.AddSystemVars(vardef.InnodbFtUserStopwordTable, userStopwordTable) + return nil +} + +func splitFullTextStopwordTableName(raw string) (dbName string, tblName string, ok bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return "", "", false + } + parts := strings.Split(raw, "/") + if len(parts) != 2 { + parts = strings.Split(raw, ".") + } + if len(parts) != 2 { + return "", "", false + } + dbName = strings.TrimSpace(parts[0]) + tblName = strings.TrimSpace(parts[1]) + if dbName == "" || tblName == "" { + return "", "", false + } + return dbName, tblName, true +} + func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job { charset, collate := ctx.GetSessionVars().GetCharsetInfo() job := &model.Job{ diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 5743dcb55a57b..5ad83ec2002cf 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -86,6 +86,7 @@ import ( tidblogutil "github.com/pingcap/tidb/pkg/util/logutil" decoder "github.com/pingcap/tidb/pkg/util/rowDecoder" "github.com/pingcap/tidb/pkg/util/size" + "github.com/pingcap/tidb/pkg/util/sqlescape" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -1588,7 +1589,7 @@ func checkAndBuildIndexInfo( return indexInfo, nil } -func onCreateFulltextIndex(jobCtx *jobContext, job *model.Job) (ver int64, err error) { +func (w *worker) onCreateFulltextIndex(jobCtx *jobContext, job *model.Job) (ver int64, err error) { // Handle the rolling back job. if job.IsRollingback() { ver, err = onDropIndex(jobCtx, job) @@ -1664,7 +1665,15 @@ func onCreateFulltextIndex(jobCtx *jobContext, job *model.Job) (ver int64, err e if err != nil { return ver, errors.Trace(err) } - err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName) + parserInfo, err := w.buildTiCIFulltextParserInfo(jobCtx, job, indexInfo) + if err != nil { + if !isRetryableJobError(err, job.ErrorCount) { + return convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err) + } + return ver, errors.Trace(err) + } + + err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName, parserInfo) if err != nil { if !isRetryableJobError(err, job.ErrorCount) { return convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err) @@ -1808,7 +1817,7 @@ func (w *worker) onCreateHybridIndex(jobCtx *jobContext, job *model.Job) (ver in // so job.SnapshotVer will be captured and propagated // through dist-task metadata. if !job.ReorgMeta.TiCIIndexCreated { - err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName) + err = tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName, nil) if err != nil { if !isRetryableJobError(err, job.ErrorCount) { return convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err) @@ -1975,6 +1984,15 @@ func (w *worker) onCreateColumnarIndex(jobCtx *jobContext, job *model.Job) (ver if err != nil { return ver, errors.Trace(err) } + if indexInfo.FullTextInfo != nil { + parserInfo, err := w.buildTiCIFulltextParserInfo(jobCtx, job, indexInfo) + if err != nil { + return ver, errors.Trace(err) + } + if err := tici.CreateFulltextIndex(jobCtx.stepCtx, jobCtx.store, tblInfo, indexInfo, job.SchemaName, parserInfo); err != nil { + return ver, errors.Trace(err) + } + } job.SnapshotVer = currVer.Ver return ver, nil } @@ -2115,6 +2133,138 @@ func (w *worker) checkColumnarIndexProcessOnce(jobCtx *jobContext, tbl table.Tab return true, notAddedIndexCnt, addedIndexCnt, nil } +const ( + // maxFullTextStopwordCount limits how many stopwords can be read from a stopword table per FULLTEXT index creation. + // It is a safety limit to avoid excessive memory usage / oversized TiCI requests. + maxFullTextStopwordCount = 10000 + // maxFullTextStopwordBytes limits total stopword payload bytes per FULLTEXT index creation. + maxFullTextStopwordBytes = 1 << 20 // 1MiB +) + +func (w *worker) buildTiCIFulltextParserInfo(jobCtx *jobContext, job *model.Job, indexInfo *model.IndexInfo) (*tici.ParserInfo, error) { + getJobSysVar := func(name, fallback string) string { + if val, ok := job.GetSystemVars(name); ok { + return val + } + return fallback + } + + if indexInfo == nil || indexInfo.FullTextInfo == nil { + return nil, errors.New("missing fulltext info") + } + + var parserType tici.ParserType + parserParams := make(map[string]string, 8) + switch indexInfo.FullTextInfo.ParserType { + case model.FullTextParserTypeStandardV1: + parserType = tici.ParserType_DEFAULT_PARSER + parserParams["parser_name"] = "standard" + parserParams[vardef.InnodbFtMinTokenSize] = getJobSysVar(vardef.InnodbFtMinTokenSize, "3") + parserParams[vardef.InnodbFtMaxTokenSize] = getJobSysVar(vardef.InnodbFtMaxTokenSize, "84") + case model.FullTextParserTypeMultilingualV1, model.FullTextParserTypeNgramV1: + // Multilingual parser is currently treated as an n-gram based tokenizer. + parserType = tici.ParserType_OTHER_PARSER + if indexInfo.FullTextInfo.ParserType == model.FullTextParserTypeMultilingualV1 { + parserParams["parser_name"] = "multilingual" + } else { + parserParams["parser_name"] = "ngram" + } + parserParams[vardef.NgramTokenSize] = getJobSysVar(vardef.NgramTokenSize, "2") + default: + parserType = tici.ParserType_OTHER_PARSER + parserParams["parser_name"] = indexInfo.FullTextInfo.ParserType.SQLName() + } + + enableStopword := getJobSysVar(vardef.InnodbFtEnableStopword, vardef.On) + parserParams[vardef.InnodbFtEnableStopword] = enableStopword + parserParams[vardef.InnodbFtServerStopwordTable] = getJobSysVar(vardef.InnodbFtServerStopwordTable, "") + parserParams[vardef.InnodbFtUserStopwordTable] = getJobSysVar(vardef.InnodbFtUserStopwordTable, "") + + var stopWords []string + if indexInfo.FullTextInfo.ParserType == model.FullTextParserTypeStandardV1 && variable.TiDBOptOn(enableStopword) { + stopwordTable := strings.TrimSpace(parserParams[vardef.InnodbFtUserStopwordTable]) + if stopwordTable == "" { + stopwordTable = strings.TrimSpace(parserParams[vardef.InnodbFtServerStopwordTable]) + } + if stopwordTable != "" { + dbName, tblName, ok := splitFullTextStopwordTableName(stopwordTable) + if !ok { + return nil, errors.New("invalid stopword table name") + } + stopwords, err := w.readFullTextStopwords(jobCtx, dbName, tblName) + if err != nil { + return nil, errors.Trace(err) + } + stopWords = stopwords + } + } + + return &tici.ParserInfo{ + ParserType: parserType, + ParserParams: parserParams, + StopWords: stopWords, + }, nil +} + +func (w *worker) readFullTextStopwords(jobCtx *jobContext, dbName, tblName string) (stopwords []string, err error) { + const label = "ddl_read_fulltext_stopwords" + startTime := time.Now() + defer func() { + metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) + }() + + var sb strings.Builder + sqlescape.MustFormatSQL(&sb, "SELECT `value` FROM %n.%n", dbName, tblName) + + ctx := jobCtx.stepCtx + if ctx.Value(kv.RequestSourceKey) == nil { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) + } + rs, err := w.sess.Context.GetSQLExecutor().ExecuteInternal(ctx, sb.String()) + if err != nil { + return nil, errors.Trace(err) + } + if rs == nil { + return nil, nil + } + defer terror.Call(rs.Close) + + stopwords = make([]string, 0, 64) + seen := make(map[string]struct{}, 64) + totalBytes := 0 + + req := rs.NewChunk(nil) + for { + if err := rs.Next(ctx, req); err != nil { + return nil, errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + iter := chunk.NewIterator4Chunk(req) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + if row.IsNull(0) { + continue + } + word := strings.TrimSpace(row.GetString(0)) + if word == "" { + continue + } + if _, ok := seen[word]; ok { + continue + } + seen[word] = struct{}{} + stopwords = append(stopwords, word) + totalBytes += len(word) + if len(stopwords) > maxFullTextStopwordCount || totalBytes > maxFullTextStopwordBytes { + return nil, dbterror.ErrUnsupportedAddColumnarIndex.FastGen("stopword table is too large") + } + } + req = chunk.Renew(req, 1024) + } + return stopwords, nil +} + func (w *worker) onCreateIndex(jobCtx *jobContext, job *model.Job, isPK bool) (ver int64, err error) { // Handle the rolling back job. if job.IsRollingback() { diff --git a/pkg/ddl/index_modify_test.go b/pkg/ddl/index_modify_test.go index e41aec8abb573..109590d292d83 100644 --- a/pkg/ddl/index_modify_test.go +++ b/pkg/ddl/index_modify_test.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strconv" "strings" "sync/atomic" @@ -49,6 +50,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" + "github.com/pingcap/tidb/pkg/tici" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" @@ -1918,6 +1920,65 @@ func TestAddColumnarIndexSimple(t *testing.T) { require.Equal(t, false, tbl.Meta().Indices[2].InvertedInfo == nil) } +func TestFullTextIndexSysvarsPassedToTiCI(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, tiflashReplicaLease, mockstore.WithMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t, sw;") + + tiflash := infosync.NewMockTiFlash() + infosync.SetMockTiFlash(tiflash) + defer func() { + tiflash.Lock() + tiflash.StatusServer.Close() + tiflash.Unlock() + }() + + limit := vardef.GetDDLErrorCountLimit() + vardef.SetDDLErrorCountLimit(1) + defer func() { + vardef.SetDDLErrorCountLimit(limit) + }() + originalWT := ddl.GetWaitTimeWhenErrorOccurred() + ddl.SetWaitTimeWhenErrorOccurred(10 * time.Millisecond) + defer func() { ddl.SetWaitTimeWhenErrorOccurred(originalWT) }() + + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckColumnarIndexProcess", `return(1)`) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/tici/MockCreateTiCIIndexRequest", `return(1)`) + tici.ResetMockTiCICreateIndexRequest() + + tk.MustExec("create table sw (value varchar(20))") + tk.MustExec("insert into sw values ('a'), ('the'), ('foo'), ('foo')") + + tk.MustExec("create table t (id int, c text)") + tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';") + + tk.MustExec("set @@global.innodb_ft_min_token_size=1") + tk.MustExec("set @@global.innodb_ft_max_token_size=10") + tk.MustExec("set @@innodb_ft_enable_stopword=on") + tk.MustExec("set @@innodb_ft_user_stopword_table='test/sw'") + + tk.MustExec("alter table t add fulltext index fts_idx(c)") + + raw := tici.GetMockTiCICreateIndexRequest() + require.NotEmpty(t, raw) + + var req tici.CreateIndexRequest + require.NoError(t, req.Unmarshal(raw)) + require.NotNil(t, req.ParserInfo) + + parserParams := req.ParserInfo.ParserParams + require.Equal(t, "standard", parserParams["parser_name"]) + require.Equal(t, "1", parserParams["innodb_ft_min_token_size"]) + require.Equal(t, "10", parserParams["innodb_ft_max_token_size"]) + require.Equal(t, "ON", parserParams["innodb_ft_enable_stopword"]) + require.Equal(t, "test/sw", parserParams["innodb_ft_user_stopword_table"]) + + stopwords := append([]string(nil), req.ParserInfo.StopWords...) + sort.Strings(stopwords) + require.Equal(t, []string{"a", "foo", "the"}, stopwords) +} + func testAddColumnarIndexRollback(prepareSQL []string, addIdxSQL string, t *testing.T) { store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, tiflashReplicaLease, mockstore.WithMockTiFlash(2)) tk := testkit.NewTestKit(t, store) diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 5e8038f872be6..aa6cd8c7751d7 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -977,7 +977,7 @@ func (w *worker) runOneJobStep( case model.ActionAddPrimaryKey: ver, err = w.onCreateIndex(jobCtx, job, true) case model.ActionAddFullTextIndex: - ver, err = onCreateFulltextIndex(jobCtx, job) + ver, err = w.onCreateFulltextIndex(jobCtx, job) case model.ActionAddHybridIndex: ver, err = w.onCreateHybridIndex(jobCtx, job) case model.ActionAddColumnarIndex: diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index ae00b6c458aa7..73ef14094e341 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -264,11 +264,11 @@ func rollingbackDropIndex(jobCtx *jobContext, job *model.Job) (ver int64, err er } } -func rollingbackAddFullTextIndex(jobCtx *jobContext, job *model.Job) (ver int64, err error) { +func rollingbackAddFullTextIndex(w *worker, jobCtx *jobContext, job *model.Job) (ver int64, err error) { if job.SchemaState == model.StateWriteReorganization { // Add full text index workers are started. need to ask them to exit. jobCtx.logger.Info("run the cancelling DDL job", zap.String("job", job.String())) - ver, err = onCreateFulltextIndex(jobCtx, job) + ver, err = w.onCreateFulltextIndex(jobCtx, job) } else { // add index's reorg workers are not running, remove the indexInfo in tableInfo. ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob) @@ -639,7 +639,7 @@ func convertJob2RollbackJob(w *worker, jobCtx *jobContext, job *model.Job) (ver case model.ActionAddPrimaryKey: ver, err = rollingbackAddIndex(jobCtx, job) case model.ActionAddFullTextIndex: - ver, err = rollingbackAddFullTextIndex(jobCtx, job) + ver, err = rollingbackAddFullTextIndex(w, jobCtx, job) case model.ActionAddHybridIndex: ver, err = rollingbackAddHybridIndex(w, jobCtx, job) case model.ActionAddColumnarIndex: diff --git a/pkg/sessionctx/vardef/sysvar.go b/pkg/sessionctx/vardef/sysvar.go index 633dd03771860..89f91448dcfc4 100644 --- a/pkg/sessionctx/vardef/sysvar.go +++ b/pkg/sessionctx/vardef/sysvar.go @@ -234,6 +234,16 @@ const ( InnodbAdaptiveHashIndex = "innodb_adaptive_hash_index" // InnodbFtEnableStopword is the name for 'innodb_ft_enable_stopword' system variable. InnodbFtEnableStopword = "innodb_ft_enable_stopword" // #nosec G101 + // InnodbFtMaxTokenSize is the name for 'innodb_ft_max_token_size' system variable. + InnodbFtMaxTokenSize = "innodb_ft_max_token_size" // #nosec G101 + // InnodbFtMinTokenSize is the name for 'innodb_ft_min_token_size' system variable. + InnodbFtMinTokenSize = "innodb_ft_min_token_size" // #nosec G101 + // NgramTokenSize is the name for 'ngram_token_size' system variable. + NgramTokenSize = "ngram_token_size" // #nosec G101 + // InnodbFtServerStopwordTable is the name for 'innodb_ft_server_stopword_table' system variable. + InnodbFtServerStopwordTable = "innodb_ft_server_stopword_table" // #nosec G101 + // InnodbFtUserStopwordTable is the name for 'innodb_ft_user_stopword_table' system variable. + InnodbFtUserStopwordTable = "innodb_ft_user_stopword_table" // #nosec G101 // InnodbOptimizeFullTextOnly is the name for 'innodb_optimize_fulltext_only' system variable. InnodbOptimizeFullTextOnly = "innodb_optimize_fulltext_only" // InnodbStatusOutputLocks is the name for 'innodb_status_output_locks' system variable. diff --git a/pkg/sessionctx/variable/noop.go b/pkg/sessionctx/variable/noop.go index 9cb45ebcece35..40bc1557c7f47 100644 --- a/pkg/sessionctx/variable/noop.go +++ b/pkg/sessionctx/variable/noop.go @@ -115,7 +115,6 @@ var noopSysVars = []*SysVar{ {Scope: vardef.ScopeGlobal, Name: vardef.InnodbBufferPoolDumpAtShutdown, Value: "0"}, {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.SQLNotes, Value: "1"}, {Scope: vardef.ScopeGlobal, Name: vardef.InnodbCmpPerIndexEnabled, Value: vardef.Off, Type: vardef.TypeBool, AutoConvertNegativeBool: true}, - {Scope: vardef.ScopeGlobal, Name: "innodb_ft_server_stopword_table", Value: ""}, {Scope: vardef.ScopeNone, Name: "performance_schema_max_file_instances", Value: "7693"}, {Scope: vardef.ScopeNone, Name: "log_output", Value: "FILE"}, {Scope: vardef.ScopeGlobal, Name: "binlog_group_commit_sync_delay", Value: ""}, @@ -156,7 +155,6 @@ var noopSysVars = []*SysVar{ {Scope: vardef.ScopeGlobal, Name: "max_binlog_size", Value: "1073741824"}, {Scope: vardef.ScopeGlobal, Name: "concurrent_insert", Value: "AUTO"}, {Scope: vardef.ScopeGlobal, Name: vardef.InnodbAdaptiveHashIndex, Value: vardef.On, Type: vardef.TypeBool, AutoConvertNegativeBool: true}, - {Scope: vardef.ScopeGlobal, Name: vardef.InnodbFtEnableStopword, Value: vardef.On, Type: vardef.TypeBool, AutoConvertNegativeBool: true}, {Scope: vardef.ScopeGlobal, Name: "general_log_file", Value: "/usr/local/mysql/data/localhost.log"}, {Scope: vardef.ScopeGlobal, Name: "innodb_compression_level", Value: "6"}, {Scope: vardef.ScopeNone, Name: "myisam_mmap_size", Value: "18446744073709551615"}, @@ -258,7 +256,6 @@ var noopSysVars = []*SysVar{ return normalizedValue, nil }}, {Scope: vardef.ScopeNone, Name: "innodb_api_enable_binlog", Value: "0"}, - {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: "innodb_ft_user_stopword_table", Value: ""}, {Scope: vardef.ScopeNone, Name: "server_id_bits", Value: "32"}, {Scope: vardef.ScopeNone, Name: "innodb_buffer_pool_load_at_startup", Value: "1"}, { @@ -309,7 +306,6 @@ var noopSysVars = []*SysVar{ {Scope: vardef.ScopeGlobal, Name: "binlog_max_flush_queue_time", Value: "0"}, {Scope: vardef.ScopeGlobal, Name: "innodb_fill_factor", Value: ""}, {Scope: vardef.ScopeGlobal, Name: "log_syslog_facility", Value: ""}, - {Scope: vardef.ScopeNone, Name: "innodb_ft_min_token_size", Value: "3"}, {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: "transaction_write_set_extraction", Value: ""}, {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: "ndb_blob_write_batch_bytes", Value: ""}, {Scope: vardef.ScopeGlobal, Name: "automatic_sp_privileges", Value: "1"}, @@ -575,7 +571,6 @@ var noopSysVars = []*SysVar{ {Scope: vardef.ScopeNone, Name: "innodb_log_file_size", Value: "50331648"}, {Scope: vardef.ScopeGlobal, Name: "sync_relay_log_info", Value: "10000"}, {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: "optimizer_trace_limit", Value: "1"}, - {Scope: vardef.ScopeNone, Name: "innodb_ft_max_token_size", Value: "84"}, {Scope: vardef.ScopeGlobal, Name: "ndb_log_binlog_index", Value: ""}, {Scope: vardef.ScopeGlobal, Name: "innodb_api_bk_commit_interval", Value: "5"}, {Scope: vardef.ScopeNone, Name: "innodb_undo_directory", Value: "."}, diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index a255322a8610d..569a9aa581e76 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -1781,6 +1781,23 @@ var defaultSysVars = []*SysVar{ return nil }, }, + {Scope: vardef.ScopeGlobal, Name: vardef.InnodbFtMaxTokenSize, Value: "84", Type: vardef.TypeUnsigned, MinValue: 10, MaxValue: 84}, + {Scope: vardef.ScopeGlobal, Name: vardef.InnodbFtMinTokenSize, Value: "3", Type: vardef.TypeUnsigned, MinValue: 0, MaxValue: 16}, + {Scope: vardef.ScopeGlobal, Name: vardef.NgramTokenSize, Value: "2", Type: vardef.TypeUnsigned, MinValue: 1, MaxValue: 10}, + {Scope: vardef.ScopeGlobal, Name: vardef.InnodbFtServerStopwordTable, Value: "", Type: vardef.TypeStr, Validation: func(_ *SessionVars, normalizedValue string, originalValue string, _ vardef.ScopeFlag) (string, error) { + normalizedValue = strings.TrimSpace(normalizedValue) + if normalizedValue == "" { + return normalizedValue, nil + } + parts := strings.Split(normalizedValue, "/") + if len(parts) != 2 { + parts = strings.Split(normalizedValue, ".") + } + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs(vardef.InnodbFtServerStopwordTable, originalValue) + } + return normalizedValue, nil + }}, /* The system variables below have GLOBAL and SESSION scope */ {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.TiDBEnablePlanReplayerContinuousCapture, Value: BoolToOnOff(false), Type: vardef.TypeBool, @@ -2048,6 +2065,21 @@ var defaultSysVars = []*SysVar{ s.LockWaitTimeout = lockWaitSec * 1000 return nil }}, + {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.InnodbFtEnableStopword, Value: vardef.On, Type: vardef.TypeBool, AutoConvertNegativeBool: true}, + {Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.InnodbFtUserStopwordTable, Value: "", Type: vardef.TypeStr, Validation: func(_ *SessionVars, normalizedValue string, originalValue string, _ vardef.ScopeFlag) (string, error) { + normalizedValue = strings.TrimSpace(normalizedValue) + if normalizedValue == "" { + return normalizedValue, nil + } + parts := strings.Split(normalizedValue, "/") + if len(parts) != 2 { + parts = strings.Split(normalizedValue, ".") + } + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs(vardef.InnodbFtUserStopwordTable, originalValue) + } + return normalizedValue, nil + }}, { Scope: vardef.ScopeGlobal | vardef.ScopeSession, Name: vardef.GroupConcatMaxLen, diff --git a/pkg/tici/tici.pb.go b/pkg/tici/tici.pb.go index 3437d41173e46..c0c9f78a31c96 100644 --- a/pkg/tici/tici.pb.go +++ b/pkg/tici/tici.pb.go @@ -2238,7 +2238,8 @@ type CreateIndexRequest struct { TableInfo []byte `protobuf:"bytes,2,opt,name=table_info,json=tableInfo,proto3" json:"table_info,omitempty"` IndexId int64 `protobuf:"varint,3,opt,name=index_id,json=indexId,proto3" json:"index_id,omitempty"` // Keyspace ID - KeyspaceId uint32 `protobuf:"varint,4,opt,name=keyspace_id,json=keyspaceId,proto3" json:"keyspace_id,omitempty"` + KeyspaceId uint32 `protobuf:"varint,4,opt,name=keyspace_id,json=keyspaceId,proto3" json:"keyspace_id,omitempty"` + ParserInfo *ParserInfo `protobuf:"bytes,5,opt,name=parser_info,json=parserInfo,proto3" json:"parser_info,omitempty"` } func (m *CreateIndexRequest) Reset() { *m = CreateIndexRequest{} } @@ -2301,6 +2302,13 @@ func (m *CreateIndexRequest) GetKeyspaceId() uint32 { return 0 } +func (m *CreateIndexRequest) GetParserInfo() *ParserInfo { + if m != nil { + return m.ParserInfo + } + return nil +} + // CreateIndexResponse is a response to the index creation request type CreateIndexResponse struct { Status ErrorCode `protobuf:"varint,1,opt,name=status,proto3,enum=tici.ErrorCode" json:"status,omitempty"` @@ -2816,6 +2824,8 @@ type ParserInfo struct { ParserType ParserType `protobuf:"varint,1,opt,name=parser_type,json=parserType,proto3,enum=tici.ParserType" json:"parser_type,omitempty"` // Parser parameters ParserParams map[string]string `protobuf:"bytes,2,rep,name=parser_params,json=parserParams,proto3" json:"parser_params,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Stop words list. Only used when stopwords are enabled at index creation time. + StopWords []string `protobuf:"bytes,3,rep,name=stop_words,json=stopWords,proto3" json:"stop_words,omitempty"` } func (m *ParserInfo) Reset() { *m = ParserInfo{} } @@ -2864,6 +2874,13 @@ func (m *ParserInfo) GetParserParams() map[string]string { return nil } +func (m *ParserInfo) GetStopWords() []string { + if m != nil { + return m.StopWords + } + return nil +} + // GetIndexProgressRequest is a request to get the progress of an index build type GetIndexProgressRequest struct { // Table ID @@ -4858,6 +4875,9 @@ func (this *CreateIndexRequest) Equal(that any) bool { if this.KeyspaceId != that1.KeyspaceId { return false } + if !this.ParserInfo.Equal(that1.ParserInfo) { + return false + } return true } func (this *CreateIndexResponse) Equal(that any) bool { @@ -5135,6 +5155,14 @@ func (this *ParserInfo) Equal(that any) bool { return false } } + if len(this.StopWords) != len(that1.StopWords) { + return false + } + for i := range this.StopWords { + if this.StopWords[i] != that1.StopWords[i] { + return false + } + } return true } func (this *GetIndexProgressRequest) Equal(that any) bool { @@ -8492,6 +8520,18 @@ func (m *CreateIndexRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.ParserInfo != nil { + { + size, err := m.ParserInfo.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTici(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } if m.KeyspaceId != 0 { i = encodeVarintTici(dAtA, i, uint64(m.KeyspaceId)) i-- @@ -8921,6 +8961,15 @@ func (m *ParserInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.StopWords) > 0 { + for iNdEx := len(m.StopWords) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.StopWords[iNdEx]) + copy(dAtA[i:], m.StopWords[iNdEx]) + i = encodeVarintTici(dAtA, i, uint64(len(m.StopWords[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } if len(m.ParserParams) > 0 { for k := range m.ParserParams { v := m.ParserParams[k] @@ -10100,6 +10149,10 @@ func (m *CreateIndexRequest) Size() (n int) { if m.KeyspaceId != 0 { n += 1 + sovTici(uint64(m.KeyspaceId)) } + if m.ParserInfo != nil { + l = m.ParserInfo.Size() + n += 1 + l + sovTici(uint64(l)) + } return n } @@ -10296,6 +10349,12 @@ func (m *ParserInfo) Size() (n int) { n += mapEntrySize + 1 + sovTici(uint64(mapEntrySize)) } } + if len(m.StopWords) > 0 { + for _, e := range m.StopWords { + l = len(e) + n += 1 + l + sovTici(uint64(l)) + } + } return n } @@ -10971,6 +11030,7 @@ func (this *CreateIndexRequest) String() string { `TableInfo:` + fmt.Sprintf("%v", this.TableInfo) + `,`, `IndexId:` + fmt.Sprintf("%v", this.IndexId) + `,`, `KeyspaceId:` + fmt.Sprintf("%v", this.KeyspaceId) + `,`, + `ParserInfo:` + strings.Replace(this.ParserInfo.String(), "ParserInfo", "ParserInfo", 1) + `,`, `}`, }, "") return s @@ -11086,6 +11146,11 @@ func (this *ParserInfo) String() string { if this == nil { return "nil" } + repeatedStringForStopWords := "[]string{" + for _, f := range this.StopWords { + repeatedStringForStopWords += fmt.Sprintf("%v", f) + "," + } + repeatedStringForStopWords += "}" keysForParserParams := make([]string, 0, len(this.ParserParams)) for k := range this.ParserParams { keysForParserParams = append(keysForParserParams, k) @@ -11099,6 +11164,7 @@ func (this *ParserInfo) String() string { s := strings.Join([]string{`&ParserInfo{`, `ParserType:` + fmt.Sprintf("%v", this.ParserType) + `,`, `ParserParams:` + mapStringForParserParams + `,`, + `StopWords:` + repeatedStringForStopWords + `,`, `}`, }, "") return s @@ -15981,6 +16047,42 @@ func (m *CreateIndexRequest) Unmarshal(dAtA []byte) error { break } } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ParserInfo", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTici + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTici + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTici + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ParserInfo == nil { + m.ParserInfo = &ParserInfo{} + } + if err := m.ParserInfo.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTici(dAtA[iNdEx:]) @@ -17382,6 +17484,38 @@ func (m *ParserInfo) Unmarshal(dAtA []byte) error { } m.ParserParams[mapkey] = mapvalue iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StopWords", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTici + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTici + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTici + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StopWords = append(m.StopWords, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTici(dAtA[iNdEx:]) diff --git a/pkg/tici/tici.proto b/pkg/tici/tici.proto index ce76b4b42b9ec..c89c4b1f443e6 100644 --- a/pkg/tici/tici.proto +++ b/pkg/tici/tici.proto @@ -406,6 +406,8 @@ message CreateIndexRequest { int64 index_id = 3; // Keyspace ID uint32 keyspace_id = 4; + // Parser information for index creation. + ParserInfo parser_info = 5; } // CreateIndexResponse is a response to the index creation request @@ -500,6 +502,8 @@ message ParserInfo { ParserType parser_type = 1; // Parser parameters map parser_params = 2; + // Stop words list. Only used when stopwords are enabled at index creation time. + repeated string stop_words = 3; } // GetIndexProgressRequest is a request to get the progress of an index build diff --git a/pkg/tici/tici_manager_client.go b/pkg/tici/tici_manager_client.go index e2446ae052263..60df670ab5a3c 100644 --- a/pkg/tici/tici_manager_client.go +++ b/pkg/tici/tici_manager_client.go @@ -44,6 +44,27 @@ var ( newManagerCtxFunc = NewManagerCtx ) +var mockTiCICreateIndexRequest atomic.Value // stores []byte of CreateIndexRequest for tests. + +// GetMockTiCICreateIndexRequest returns the marshaled CreateIndexRequest bytes captured by the +// `MockCreateTiCIIndexRequest` failpoint. It returns nil if nothing was captured. +func GetMockTiCICreateIndexRequest() []byte { + v := mockTiCICreateIndexRequest.Load() + if v == nil { + return nil + } + b, ok := v.([]byte) + if !ok || len(b) == 0 { + return nil + } + return append([]byte(nil), b...) +} + +// ResetMockTiCICreateIndexRequest clears the captured request for tests. +func ResetMockTiCICreateIndexRequest() { + mockTiCICreateIndexRequest.Store([]byte{}) +} + type metaClient struct { conn *grpc.ClientConn client MetaServiceClient @@ -204,18 +225,21 @@ func (t *ManagerCtx) getKeyspaceID() uint32 { return t.keyspaceID.Load() } -// CreateFulltextIndex creates fulltext index on TiCI. -func (t *ManagerCtx) CreateFulltextIndex(ctx context.Context, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, schemaName string) error { +func buildCreateFulltextIndexRequest(tblInfo *model.TableInfo, indexID int64, schemaName string, keyspaceID uint32, parserInfo *ParserInfo) (*CreateIndexRequest, error) { tableInfoJSON, err := cloneAndMarshalTableInfo(tblInfo) if err != nil { - return err + return nil, err } - req := &CreateIndexRequest{ + return &CreateIndexRequest{ DatabaseName: schemaName, TableInfo: tableInfoJSON, - IndexId: indexInfo.ID, - KeyspaceId: t.getKeyspaceID(), - } + IndexId: indexID, + KeyspaceId: keyspaceID, + ParserInfo: parserInfo, + }, nil +} + +func (t *ManagerCtx) createIndex(ctx context.Context, req *CreateIndexRequest) error { t.mu.RLock() defer t.mu.RUnlock() if err := t.checkMetaClient(); err != nil { @@ -226,13 +250,22 @@ func (t *ManagerCtx) CreateFulltextIndex(ctx context.Context, tblInfo *model.Tab return dbterror.ErrInvalidDDLJob.FastGenByArgs(err) } if resp.Status != ErrorCode_SUCCESS { - logutil.BgLogger().Error("create fulltext index failed", zap.String("indexID", resp.IndexId), zap.String("errorMessage", resp.ErrorMessage)) + logutil.BgLogger().Error("create index failed", zap.String("indexID", resp.IndexId), zap.String("errorMessage", resp.ErrorMessage)) return dbterror.ErrInvalidDDLJob.FastGenByArgs(resp.ErrorMessage) } - logutil.BgLogger().Info("create fulltext index success", zap.String("indexID", resp.IndexId)) + logutil.BgLogger().Info("create index success", zap.String("indexID", resp.IndexId)) return nil } +// CreateFulltextIndex creates fulltext index on TiCI. +func (t *ManagerCtx) CreateFulltextIndex(ctx context.Context, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, schemaName string, parserInfo *ParserInfo) error { + req, err := buildCreateFulltextIndexRequest(tblInfo, indexInfo.ID, schemaName, t.getKeyspaceID(), parserInfo) + if err != nil { + return err + } + return t.createIndex(ctx, req) +} + // DropFullTextIndex drop fulltext index on TiCI. func (t *ManagerCtx) DropFullTextIndex(ctx context.Context, tableID, indexID int64) error { req := &DropIndexRequest{ @@ -625,7 +658,40 @@ func ModelIndexToTiCIIndexInfo(indexInfo *model.IndexInfo, tblInfo *model.TableI } // CreateFulltextIndex create fulltext index on TiCI. -func CreateFulltextIndex(ctx context.Context, store kv.Storage, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, schemaName string) error { +func CreateFulltextIndex(ctx context.Context, store kv.Storage, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, schemaName string, parserInfo *ParserInfo) error { + keyspaceID := uint32(0) + if store != nil { + keyspaceID = uint32(store.GetCodec().GetKeyspaceID()) + // Log when the KeyspaceID is the special null value, as per requested by TiCI team. + if keyspaceID == constants.NullKeyspaceID { + logutil.BgLogger().Debug("Setting special KeyspaceID for TiCI", zap.Uint32("KeyspaceID", keyspaceID)) + } + } + + req, err := buildCreateFulltextIndexRequest(tblInfo, indexInfo.ID, schemaName, keyspaceID, parserInfo) + if err != nil { + return err + } + + failpoint.Inject("MockCreateTiCIIndexRequest", func(val failpoint.Value) { + v, _ := val.(int) + switch v { + case 1: + data, err := req.Marshal() + if err != nil { + failpoint.Return(errors.Trace(err)) + } + mockTiCICreateIndexRequest.Store(data) + failpoint.Return(nil) + case 2: + data, err := req.Marshal() + if err != nil { + failpoint.Return(errors.Trace(err)) + } + failpoint.Return(errors.Errorf("mock tici create index request: %s", hex.EncodeToString(data))) + } + }) + failpoint.Inject("MockCreateTiCIIndexSuccess", func(val failpoint.Value) { if x := val.(bool); x { logutil.BgLogger().Info("MockCreateTiCIIndexSuccess failpoint triggered", zap.Bool("success", true)) @@ -645,14 +711,9 @@ func CreateFulltextIndex(ctx context.Context, store kv.Storage, tblInfo *model.T } defer ticiManager.Close() if store != nil { - keyspaceID := uint32(store.GetCodec().GetKeyspaceID()) - // Log when the KeyspaceID is the special null value, as per requested by TiCI team. - if keyspaceID == constants.NullKeyspaceID { - logutil.BgLogger().Debug("Setting special KeyspaceID for TiCI", zap.Uint32("KeyspaceID", keyspaceID)) - } ticiManager.SetKeyspaceID(keyspaceID) } - return ticiManager.CreateFulltextIndex(ctx, tblInfo, indexInfo, schemaName) + return ticiManager.createIndex(ctx, req) } // DropFullTextIndex drop fulltext index on TiCI. diff --git a/pkg/tici/tici_manager_client_test.go b/pkg/tici/tici_manager_client_test.go index 95873198f3ef7..2a187382067c8 100644 --- a/pkg/tici/tici_manager_client_test.go +++ b/pkg/tici/tici_manager_client_test.go @@ -103,21 +103,21 @@ func TestCreateFulltextIndex(t *testing.T) { On("CreateIndex", mock.Anything, mock.MatchedBy(matchKeyspace[*CreateIndexRequest](keyspaceID))). Return(&CreateIndexResponse{Status: ErrorCode_SUCCESS, IndexId: "2"}, nil). Once() - err := ctx.CreateFulltextIndex(context.Background(), tblInfo, indexInfo, schemaName) + err := ctx.CreateFulltextIndex(context.Background(), tblInfo, indexInfo, schemaName, nil) assert.NoError(t, err) mockClient. On("CreateIndex", mock.Anything, mock.MatchedBy(matchKeyspace[*CreateIndexRequest](keyspaceID))). Return(&CreateIndexResponse{Status: ErrorCode_UNKNOWN_ERROR, IndexId: "2", ErrorMessage: "fail"}, nil). Once() - err = ctx.CreateFulltextIndex(context.Background(), tblInfo, indexInfo, schemaName) + err = ctx.CreateFulltextIndex(context.Background(), tblInfo, indexInfo, schemaName, nil) require.ErrorContains(t, err, "fail") mockClient. On("CreateIndex", mock.Anything, mock.MatchedBy(matchKeyspace[*CreateIndexRequest](keyspaceID))). Return(&CreateIndexResponse{}, errors.New("rpc error")). Once() - err = ctx.CreateFulltextIndex(context.Background(), tblInfo, indexInfo, schemaName) + err = ctx.CreateFulltextIndex(context.Background(), tblInfo, indexInfo, schemaName, nil) require.ErrorContains(t, err, "rpc error") }