Skip to content

Commit 0592b61

Browse files
committed
statistics: refactor stats meta handling to use DeltaUpdate for multi-table support (pingcap#58657)
ref pingcap#57869
1 parent 722347b commit 0592b61

File tree

3 files changed

+95
-32
lines changed

3 files changed

+95
-32
lines changed

pkg/statistics/handle/ddl/subscriber.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -413,9 +413,7 @@ func updateGlobalTableStats4DropPartition(
413413
ctx,
414414
sctx,
415415
startTS,
416-
variable.TableDelta{Count: count, Delta: delta},
417-
globalTableInfo.ID,
418-
isLocked,
416+
storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked),
419417
))
420418
}
421419

@@ -597,9 +595,7 @@ func updateGlobalTableStats4TruncatePartition(
597595
ctx,
598596
sctx,
599597
startTS,
600-
variable.TableDelta{Count: count, Delta: delta},
601-
globalTableInfo.ID,
602-
isLocked,
598+
storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked),
603599
)
604600
if err != nil {
605601
fields := truncatePartitionsLogFields(

pkg/statistics/handle/storage/update.go

+75-21
Original file line numberDiff line numberDiff line change
@@ -56,36 +56,90 @@ func UpdateStatsVersion(ctx context.Context, sctx sessionctx.Context) error {
5656
return nil
5757
}
5858

59-
// UpdateStatsMeta update the stats meta stat for this Table.
59+
// DeltaUpdate is the delta update for stats meta.
60+
type DeltaUpdate struct {
61+
Delta variable.TableDelta
62+
TableID int64
63+
IsLocked bool
64+
}
65+
66+
// NewDeltaUpdate creates a new DeltaUpdate.
67+
func NewDeltaUpdate(tableID int64, delta variable.TableDelta, isLocked bool) *DeltaUpdate {
68+
return &DeltaUpdate{
69+
Delta: delta,
70+
TableID: tableID,
71+
IsLocked: isLocked,
72+
}
73+
}
74+
75+
// UpdateStatsMeta updates the stats meta for multiple tables.
76+
// It uses the INSERT INTO ... ON DUPLICATE KEY UPDATE syntax to fill the missing records.
6077
func UpdateStatsMeta(
6178
ctx context.Context,
6279
sctx sessionctx.Context,
6380
startTS uint64,
64-
delta variable.TableDelta,
65-
id int64,
66-
isLocked bool,
81+
updates ...*DeltaUpdate,
6782
) (err error) {
68-
if isLocked {
69-
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_table_locked.
70-
// Note: For locked tables, it is possible that the record gets deleted. So it can be negative.
71-
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_table_locked (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
72-
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)",
73-
startTS, id, delta.Count, delta.Delta)
74-
} else {
75-
if delta.Delta < 0 {
76-
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
77-
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+
78-
"update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)",
79-
startTS, id, delta.Count, -delta.Delta, -delta.Delta)
83+
if len(updates) == 0 {
84+
return nil
85+
}
86+
87+
// Separate locked and unlocked updates
88+
var lockedValues, unlockedPosValues, unlockedNegValues []string
89+
var cacheInvalidateIDs []int64
90+
91+
for _, update := range updates {
92+
if update.IsLocked {
93+
lockedValues = append(lockedValues, fmt.Sprintf("(%d, %d, %d, %d)",
94+
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
8095
} else {
81-
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
82-
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
83-
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS,
84-
id, delta.Count, delta.Delta)
96+
if update.Delta.Delta < 0 {
97+
unlockedNegValues = append(unlockedNegValues, fmt.Sprintf("(%d, %d, %d, %d)",
98+
startTS, update.TableID, update.Delta.Count, -update.Delta.Delta))
99+
} else {
100+
unlockedPosValues = append(unlockedPosValues, fmt.Sprintf("(%d, %d, %d, %d)",
101+
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
102+
}
103+
cacheInvalidateIDs = append(cacheInvalidateIDs, update.TableID)
85104
}
105+
}
106+
107+
// Execute locked updates
108+
if len(lockedValues) > 0 {
109+
sql := fmt.Sprintf("insert into mysql.stats_table_locked (version, table_id, modify_count, count) values %s "+
110+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
111+
"count = count + values(count)", strings.Join(lockedValues, ","))
112+
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
113+
return err
114+
}
115+
}
116+
117+
// Execute unlocked updates with positive delta
118+
if len(unlockedPosValues) > 0 {
119+
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
120+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
121+
"count = count + values(count)", strings.Join(unlockedPosValues, ","))
122+
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
123+
return err
124+
}
125+
}
126+
127+
// Execute unlocked updates with negative delta
128+
if len(unlockedNegValues) > 0 {
129+
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
130+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
131+
"count = if(count > values(count), count - values(count), 0)", strings.Join(unlockedNegValues, ","))
132+
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
133+
return err
134+
}
135+
}
136+
137+
// Invalidate cache for all unlocked tables
138+
for _, id := range cacheInvalidateIDs {
86139
cache.TableRowStatsCache.Invalidate(id)
87140
}
88-
return err
141+
142+
return nil
89143
}
90144

91145
// DumpTableStatColSizeToKV dumps the column size stats to storage.

pkg/statistics/handle/usage/session_stats_collect.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
179179
}
180180
tableOrPartitionLocked := isTableLocked || isPartitionLocked
181181
isLocked = tableOrPartitionLocked
182-
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta,
183-
physicalTableID, tableOrPartitionLocked); err != nil {
182+
if err = storage.UpdateStatsMeta(
183+
utilstats.StatsCtx,
184+
sctx,
185+
statsVersion,
186+
storage.NewDeltaUpdate(physicalTableID, delta, tableOrPartitionLocked),
187+
); err != nil {
184188
return err
185189
}
186190
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
@@ -197,7 +201,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
197201
// To sum up, we only need to update the global-stats when the table and the partition are not locked.
198202
if !isTableLocked && !isPartitionLocked {
199203
// If it's a partitioned table and its global-stats exists, update its count and modify_count as well.
200-
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, tableID, isTableLocked); err != nil {
204+
if err = storage.UpdateStatsMeta(
205+
utilstats.StatsCtx,
206+
sctx,
207+
statsVersion,
208+
storage.NewDeltaUpdate(tableID, delta, isTableLocked),
209+
); err != nil {
201210
return err
202211
}
203212
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
@@ -210,8 +219,12 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic
210219
isTableLocked = true
211220
}
212221
isLocked = isTableLocked
213-
if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta,
214-
physicalTableID, isTableLocked); err != nil {
222+
if err = storage.UpdateStatsMeta(
223+
utilstats.StatsCtx,
224+
sctx,
225+
statsVersion,
226+
storage.NewDeltaUpdate(physicalTableID, delta, isTableLocked),
227+
); err != nil {
215228
return err
216229
}
217230
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()

0 commit comments

Comments
 (0)