Skip to content

Commit

Permalink
statistics: merge global stats even if some partition stats are missi…
Browse files Browse the repository at this point in the history
…ng (#41176) (#49471)

ref #38999
  • Loading branch information
ti-chi-bot authored Sep 3, 2024
1 parent 6338796 commit fa1fa8e
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 30 deletions.
4 changes: 4 additions & 0 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
}
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
if hg == nil {
// All partitions have no stats so global stats are not created.
continue
}
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID,
globalStats.Count,
Expand Down
1 change: 1 addition & 0 deletions executor/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2877,6 +2877,7 @@ func TestAnalyzePartitionStaticToDynamic(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("set @@session.tidb_analyze_version = 2")
tk.MustExec("set @@session.tidb_stats_load_sync_wait = 20000") // to stabilise test
tk.MustExec("set @@session.tidb_skip_missing_partition_stats = 0")
createTable := `CREATE TABLE t (a int, b int, c varchar(10), d int, primary key(a), index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (10),
Expand Down
14 changes: 14 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,20 @@ func TestSetVar(t *testing.T) {
require.Equal(t, uint64(2), tk.Session().GetSessionVars().CDCWriteSource)
tk.MustExec("set @@session.tidb_cdc_write_source = 0")
require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)

// test tidb_skip_missing_partition_stats
// global scope
tk.MustQuery("select @@global.tidb_skip_missing_partition_stats").Check(testkit.Rows("1")) // default value
tk.MustExec("set global tidb_skip_missing_partition_stats = 0")
tk.MustQuery("select @@global.tidb_skip_missing_partition_stats").Check(testkit.Rows("0"))
tk.MustExec("set global tidb_skip_missing_partition_stats = 1")
tk.MustQuery("select @@global.tidb_skip_missing_partition_stats").Check(testkit.Rows("1"))
// session scope
tk.MustQuery("select @@session.tidb_skip_missing_partition_stats").Check(testkit.Rows("1")) // default value
tk.MustExec("set session tidb_skip_missing_partition_stats = 0")
tk.MustQuery("select @@session.tidb_skip_missing_partition_stats").Check(testkit.Rows("0"))
tk.MustExec("set session tidb_skip_missing_partition_stats = 1")
tk.MustQuery("select @@session.tidb_skip_missing_partition_stats").Check(testkit.Rows("1"))
}

func TestGetSetNoopVars(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,11 @@ type SessionVars struct {

// OptimizerFixControl control some details of the optimizer behavior through the tidb_opt_fix_control variable.
OptimizerFixControl map[uint64]string

// SkipMissingPartitionStats controls how to handle missing partition stats when merging partition stats to global stats.
// When set to true, skip missing partition stats and continue to merge other partition stats to global stats.
// When set to false, give up merging partition stats to global stats.
SkipMissingPartitionStats bool
}

var (
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,10 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBSkipMissingPartitionStats, Value: BoolToOnOff(DefTiDBSkipMissingPartitionStats), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.SkipMissingPartitionStats = TiDBOptOn(val)
return nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,10 @@ const (
PasswordReuseHistory = "password_history"
// PasswordReuseTime limit how long passwords can be reused.
PasswordReuseTime = "password_reuse_interval"
// TiDBSkipMissingPartitionStats controls how to handle missing partition stats when merging partition stats to global stats.
// When set to true, skip missing partition stats and continue to merge other partition stats to global stats.
// When set to false, give up merging partition stats to global stats.
TiDBSkipMissingPartitionStats = "tidb_skip_missing_partition_stats"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1171,6 +1175,7 @@ const (
DefTiDBTTLJobScheduleWindowEndTime = "23:59 +0000"
DefTiDBTTLScanWorkerCount = 4
DefTiDBTTLDeleteWorkerCount = 4
DefTiDBSkipMissingPartitionStats = true
)

// Process global variables.
Expand Down Expand Up @@ -1246,6 +1251,7 @@ var (
PasswordReuseInterval = atomic.NewInt64(DefPasswordReuseTime)
IsSandBoxModeEnabled = atomic.NewBool(false)
MaxPreparedStmtCountValue = atomic.NewInt64(DefMaxPreparedStmtCount)
SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats)
)

var (
Expand Down
18 changes: 18 additions & 0 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

// HandleDDLEvent begins to process a ddl task.
Expand Down Expand Up @@ -165,8 +167,16 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
if err != nil {
return err
}
if len(newColGlobalStats.MissingPartitionStats) > 0 {
logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", tblInfo.Name.L),
zap.String("item", "columns"), zap.Strings("missing", newColGlobalStats.MissingPartitionStats))
}
for i := 0; i < newColGlobalStats.Num; i++ {
hg, cms, topN := newColGlobalStats.Hg[i], newColGlobalStats.Cms[i], newColGlobalStats.TopN[i]
if hg == nil {
// All partitions have no stats so global stats are not created.
continue
}
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount, 0, hg, cms, topN, 2, 1, false)
if err != nil {
Expand Down Expand Up @@ -195,8 +205,16 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
if err != nil {
return err
}
if len(newIndexGlobalStats.MissingPartitionStats) > 0 {
logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", tblInfo.Name.L),
zap.String("item", "index "+idx.Name.L), zap.Strings("missing", newIndexGlobalStats.MissingPartitionStats))
}
for i := 0; i < newIndexGlobalStats.Num; i++ {
hg, cms, topN := newIndexGlobalStats.Hg[i], newIndexGlobalStats.Cms[i], newIndexGlobalStats.TopN[i]
if hg == nil {
// All partitions have no stats so global stats are not created.
continue
}
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, newIndexGlobalStats.ModifyCount, 1, hg, cms, topN, 2, 1, false)
if err != nil {
Expand Down
97 changes: 67 additions & 30 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,14 @@ func (h *Handle) UpdateSessionVar() error {
// In the column statistics, the variable `num` is equal to the number of columns in the partition table.
// In the index statistics, the variable `num` is always equal to one.
type GlobalStats struct {
Num int
Count int64
ModifyCount int64
Hg []*statistics.Histogram
Cms []*statistics.CMSketch
TopN []*statistics.TopN
Fms []*statistics.FMSketch
Hg []*statistics.Histogram
Cms []*statistics.CMSketch
TopN []*statistics.TopN
Fms []*statistics.FMSketch
MissingPartitionStats []string
Num int
Count int64
ModifyCount int64
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
Expand All @@ -682,7 +683,24 @@ func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
return
}
globalTableInfo := globalTable.Meta()
return h.mergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs, tablePartitionStats)
globalStats, err = h.mergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs, tablePartitionStats)
if err != nil {
return
}
if len(globalStats.MissingPartitionStats) > 0 {
var item string
if isIndex == 0 {
item = "columns"
} else {
item = "index"
if len(histIDs) > 0 {
item += " " + globalTableInfo.FindIndexNameByID(histIDs[0])
}
}
logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", globalTableInfo.Name.L),
zap.String("item", item), zap.Strings("missing", globalStats.MissingPartitionStats))
}
return
}

func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) {
Expand All @@ -706,10 +724,6 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
isIndex int, histIDs []int64,
allPartitionStats map[int64]*statistics.Table) (globalStats *GlobalStats, err error) {
partitionNum := len(globalTableInfo.Partition.Definitions)
partitionIDs := make([]int64, 0, partitionNum)
for i := 0; i < partitionNum; i++ {
partitionIDs = append(partitionIDs, globalTableInfo.Partition.Definitions[i].ID)
}

// initialized the globalStats
globalStats = new(GlobalStats)
Expand Down Expand Up @@ -744,6 +758,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
allFms[i] = make([]*statistics.FMSketch, 0, partitionNum)
}

skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats
for _, def := range globalTableInfo.Partition.Definitions {
partitionID := def.ID
h.mu.Lock()
Expand All @@ -760,8 +775,14 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
}
// If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats
if allPartitionStats == nil || partitionStats == nil || !ok {
partitionStats, err = h.loadTablePartitionStats(tableInfo, &def)
if err != nil {
var err1 error
partitionStats, err1 = h.loadTablePartitionStats(tableInfo, &def)
if err1 != nil {
if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err) {
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L))
continue
}
err = err1
return
}
if allPartitionStats == nil {
Expand All @@ -771,45 +792,61 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
}
for i := 0; i < globalStats.Num; i++ {
_, hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex == 1)
skipPartition := false
if !analyzed {
var errMsg string
var missingPart string
if isIndex == 0 {
errMsg = fmt.Sprintf("table `%s` partition `%s` column `%s`", tableInfo.Name.L, def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
errMsg = fmt.Sprintf("table `%s` partition `%s` index `%s`", tableInfo.Name.L, def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
}
err = types.ErrPartitionStatsMissing.GenWithStackByArgs(errMsg)
return
if !skipMissingPartitionStats {
err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart)
skipPartition = true
}
// partition stats is not empty but column stats(hist, topn) is missing
if partitionStats.Count > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) {
var errMsg string
var missingPart string
if isIndex == 0 {
errMsg = fmt.Sprintf("table `%s` partition `%s` column `%s`", tableInfo.Name.L, def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
errMsg = fmt.Sprintf("table `%s` partition `%s` index `%s`", tableInfo.Name.L, def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
}
err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(errMsg)
return
if !skipMissingPartitionStats {
err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topn")
skipPartition = true
}
if i == 0 {
// In a partition, we will only update globalStats.Count once
globalStats.Count += partitionStats.Count
globalStats.ModifyCount += partitionStats.ModifyCount
}
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
allTopN[i] = append(allTopN[i], topN)
allFms[i] = append(allFms[i], fms)
if !skipPartition {
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
allTopN[i] = append(allTopN[i], topN)
allFms[i] = append(allFms[i], fms)
}
}
}

// After collect all of the statistics from the partition-level stats,
// we should merge them together.
for i := 0; i < globalStats.Num; i++ {
if len(allHg[i]) == 0 {
// If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0`
// correctly. It can avoid unexpected behaviors such as nil pointer panic.
continue
}
// Merge CMSketch
globalStats.Cms[i] = allCms[i][0].Copy()
for j := 1; j < partitionNum; j++ {
for j := 1; j < len(allCms[i]); j++ {
err = globalStats.Cms[i].MergeCMSketch(allCms[i][j])
if err != nil {
return
Expand Down Expand Up @@ -839,7 +876,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,

// Update NDV of global-level stats
globalStats.Fms[i] = allFms[i][0].Copy()
for j := 1; j < partitionNum; j++ {
for j := 1; j < len(allFms[i]); j++ {
if globalStats.Fms[i] == nil {
globalStats.Fms[i] = allFms[i][j].Copy()
} else {
Expand Down
25 changes: 25 additions & 0 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3557,3 +3557,28 @@ insert into t1 values
require.Len(t, rows, 1)
require.Equal(t, "finished", rows[0][7])
}

func TestSkipMissingPartitionStats(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("set @@tidb_skip_missing_partition_stats = 1")
tk.MustExec("create table t (a int, b int, c int, index idx_b(b)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200), partition p2 values less than (300))")
tk.MustExec("insert into t values (1,1,1), (2,2,2), (101,101,101), (102,102,102), (201,201,201), (202,202,202)")
h := dom.StatsHandle()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table t partition p0, p1")
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tblInfo := tbl.Meta()
globalStats := h.GetTableStats(tblInfo)
require.Equal(t, 6, int(globalStats.Count))
require.Equal(t, 2, int(globalStats.ModifyCount))
for _, col := range globalStats.Columns {
require.True(t, col.IsStatsInitialized())
}
for _, idx := range globalStats.Indices {
require.True(t, idx.IsStatsInitialized())
}
}

0 comments on commit fa1fa8e

Please sign in to comment.