Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if len(tasks) == 0 {
return nil
}
tableAndPartitionIDs := make([]int64, 0, len(tasks))
for _, task := range tasks {
tableID := getTableIDFromTask(task)
tableAndPartitionIDs = append(tableAndPartitionIDs, tableID.TableID)
if tableID.IsPartitionTable() {
tableAndPartitionIDs = append(tableAndPartitionIDs, tableID.PartitionID)
}
}

// Get the min number of goroutines for parallel execution.
buildStatsConcurrency, err := getBuildStatsConcurrency(e.Ctx())
Expand Down Expand Up @@ -186,7 +194,7 @@ TASKLOOP:
if err != nil {
sessionVars.StmtCtx.AppendWarning(err)
}
return statsHandle.Update(ctx, infoSchema)
return statsHandle.Update(ctx, infoSchema, tableAndPartitionIDs...)
}

func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, resultsCh chan *statistics.AnalyzeResults) error {
Expand Down
12 changes: 9 additions & 3 deletions pkg/statistics/handle/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func benchCopyAndUpdate(b *testing.B, c types.StatsCache) {
defer wg.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
c.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{t1},
})
}()
}
wg.Wait()
Expand All @@ -51,7 +53,9 @@ func benchPutGet(b *testing.B, c types.StatsCache) {
defer wg.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
c.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{t1},
})
}(i)
}
for i := 0; i < b.N; i++ {
Expand All @@ -73,7 +77,9 @@ func benchGet(b *testing.B, c types.StatsCache) {
defer w.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
c.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{t1},
})
}(i)
}
w.Wait()
Expand Down
48 changes: 36 additions & 12 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package cache

import (
"context"
"slices"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -64,19 +66,35 @@ func NewStatsCacheImplForTest() (types.StatsCache, error) {
}

// Update reads stats meta from store and updates the stats map.
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) error {
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, tableAndPartitionIDs ...int64) error {
start := time.Now()
lastVersion := s.GetNextCheckVersionWithOffset()
var (
rows []chunk.Row
err error
skipMoveForwardStatsCache bool
rows []chunk.Row
err error
)
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
rows, _, err = util.ExecRows(
sctx,
"SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? order by version",
lastVersion,
)
query := "SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? "
args := []any{lastVersion}

if len(tableAndPartitionIDs) > 0 {
// When updating specific tables, we skip incrementing the max stats version to avoid missing
// delta updates for other tables. The max version only advances when doing a full update.
skipMoveForwardStatsCache = true
// Sort and deduplicate the table IDs to remove duplicates
slices.Sort(tableAndPartitionIDs)
tableAndPartitionIDs = slices.Compact(tableAndPartitionIDs)
// Convert table IDs to strings since the SQL executor only accepts string arrays for IN clauses
tableStringIDs := make([]string, 0, len(tableAndPartitionIDs))
for _, tableID := range tableAndPartitionIDs {
tableStringIDs = append(tableStringIDs, strconv.FormatInt(tableID, 10))
}
query += "and table_id in (%?) "
args = append(args, tableStringIDs)
}
query += "order by version"
rows, _, err = util.ExecRows(sctx, query, args...)
return err
}); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -150,7 +168,13 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) e
tables = append(tables, tbl)
}

s.UpdateStatsCache(tables, deletedTableIDs)
s.UpdateStatsCache(types.CacheUpdate{
Updated: tables,
Deleted: deletedTableIDs,
Options: types.UpdateOptions{
SkipMoveForward: skipMoveForwardStatsCache,
},
})
dur := time.Since(start)
tidbmetrics.StatsDeltaLoadHistogram.Observe(dur.Seconds())
return nil
Expand Down Expand Up @@ -191,12 +215,12 @@ func (s *StatsCacheImpl) replace(newCache *StatsCache) {
}

// UpdateStatsCache updates the cache with the new cache.
func (s *StatsCacheImpl) UpdateStatsCache(tables []*statistics.Table, deletedIDs []int64) {
func (s *StatsCacheImpl) UpdateStatsCache(cacheUpdate types.CacheUpdate) {
if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota {
s.Load().Update(tables, deletedIDs)
s.Load().Update(cacheUpdate.Updated, cacheUpdate.Deleted, cacheUpdate.Options.SkipMoveForward)
} else {
// TODO: remove this branch because we will always enable quota.
newCache := s.Load().CopyAndUpdate(tables, deletedIDs)
newCache := s.Load().CopyAndUpdate(cacheUpdate.Updated, cacheUpdate.Deleted)
s.replace(newCache)
}
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int
}

// Update updates the new statistics table cache.
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64) {
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64, skipMoveForwardStatsCache bool) {
for _, tbl := range tables {
id := tbl.PhysicalID
metrics.UpdateCounter.Inc()
Expand All @@ -174,10 +174,12 @@ func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64) {
sc.c.Del(id)
}

// update the maxTblStatsVer
for _, t := range tables {
if oldVersion := sc.maxTblStatsVer.Load(); t.Version > oldVersion {
sc.maxTblStatsVer.CompareAndSwap(oldVersion, t.Version)
if !skipMoveForwardStatsCache {
// update the maxTblStatsVer
for _, t := range tables {
if oldVersion := sc.maxTblStatsVer.Load(); t.Version > oldVersion {
sc.maxTblStatsVer.CompareAndSwap(oldVersion, t.Version)
}
}
}
}
4 changes: 3 additions & 1 deletion pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (h *Handle) getPartitionStats(tblInfo *model.TableInfo, pid int64, returnPs
tbl = statistics.PseudoTable(tblInfo, false, true)
tbl.PhysicalID = pid
if tblInfo.GetPartitionInfo() == nil || h.Len() < 64 {
h.UpdateStatsCache([]*statistics.Table{tbl}, nil)
h.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{tbl},
})
}
return tbl
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/handletest/statstest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/config",
"//pkg/parser/model",
Expand Down
32 changes: 32 additions & 0 deletions pkg/statistics/handle/handletest/statstest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@ import (
"github.com/stretchr/testify/require"
)

func TestStatsCacheProcess(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("create table t (c1 int, c2 int)")
testKit.MustExec("insert into t values(1, 2)")
analyzehelper.TriggerPredicateColumnsCollection(t, testKit, store, "t", "c1", "c2")
do := dom
is := do.InfoSchema()
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()
statsTbl := do.StatsHandle().GetTableStats(tableInfo)
require.True(t, statsTbl.Pseudo)
require.Zero(t, statsTbl.Version)
currentVersion := do.StatsHandle().MaxTableStatsVersion()
testKit.MustExec("analyze table t")
statsTbl = do.StatsHandle().GetTableStats(tableInfo)
require.False(t, statsTbl.Pseudo)
require.NotZero(t, statsTbl.Version)
require.Equal(t, currentVersion, do.StatsHandle().MaxTableStatsVersion())
newVersion := do.StatsHandle().GetNextCheckVersionWithOffset()
require.Equal(t, currentVersion, newVersion, "analyze should not move forward the stats cache version")

// Insert more rows
testKit.MustExec("insert into t values(2, 3)")
require.NoError(t, do.StatsHandle().DumpStatsDeltaToKV(true))
require.NoError(t, do.StatsHandle().Update(context.Background(), is))
newVersion = do.StatsHandle().MaxTableStatsVersion()
require.NotEqual(t, currentVersion, newVersion, "update with no table should move forward the stats cache version")
}

func TestStatsCache(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
Expand Down
12 changes: 9 additions & 3 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsHandle statstypes.
if loadNeeded && !analyzed {
fakeCol := statistics.EmptyColumn(tblInfo.ID, tblInfo.PKIsHandle, colInfo)
statsTbl.SetCol(col.ID, fakeCol)
statsHandle.UpdateStatsCache([]*statistics.Table{statsTbl}, nil)
statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{statsTbl},
})
}
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
return nil
Expand Down Expand Up @@ -720,7 +722,9 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsHandle statstypes.
}
}
statsTbl.SetCol(col.ID, colHist)
statsHandle.UpdateStatsCache([]*statistics.Table{statsTbl}, nil)
statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{statsTbl},
})
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
if col.IsSyncLoadFailed {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
Expand Down Expand Up @@ -782,7 +786,9 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, is infoschema.InfoSchema
tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, idxHist.LastUpdateVersion)
}
tbl.SetIdx(idx.ID, idxHist)
statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{tbl},
})
if idx.IsSyncLoadFailed {
logutil.BgLogger().Warn("Hist for index should already be loaded as sync but not found.",
zap.Int64("table_id", idx.TableID),
Expand Down
4 changes: 3 additions & 1 deletion pkg/statistics/handle/storage/stats_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ func (s *statsReadWriter) ReloadExtendedStatistics() error {
}
tables = append(tables, t)
}
s.statsHandler.UpdateStatsCache(tables, nil)
s.statsHandler.UpdateStatsCache(statstypes.CacheUpdate{
Updated: tables,
})
return nil
}, util.FlagWrapTxn)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/statistics/handle/storage/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ func removeExtendedStatsItem(statsCache types.StatsCache,
}
newTbl := tbl.Copy()
delete(newTbl.ExtendedStats.Stats, statsName)
statsCache.UpdateStatsCache([]*statistics.Table{newTbl}, nil)
statsCache.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{newTbl},
})
}

var changeGlobalStatsTables = []string{
Expand Down
4 changes: 3 additions & 1 deletion pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableIt
tbl.StatsVer = statistics.Version0
}
}
s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
s.statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{tbl},
})
return true
}
22 changes: 19 additions & 3 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ type StatsAnalyze interface {
Close()
}

// CacheUpdate encapsulates changes to be made to the stats cache
type CacheUpdate struct {
Updated []*statistics.Table
Deleted []int64
Options UpdateOptions
}

// UpdateOptions contains configuration for cache updates
type UpdateOptions struct {
// SkipMoveForward controls whether to skip updating the cache's max version number.
// When true, the cache max version number stays unchanged even after updates.
// This improves performance when analyzing a small number of tables by avoiding
// unnecessary full cache reloads that would normally be triggered by version changes.
SkipMoveForward bool
}

// StatsCache is used to manage all table statistics in memory.
type StatsCache interface {
// Close closes this cache.
Expand All @@ -204,7 +220,7 @@ type StatsCache interface {
Clear()

// Update reads stats meta from store and updates the stats map.
Update(ctx context.Context, is infoschema.InfoSchema) error
Update(ctx context.Context, is infoschema.InfoSchema, tableAndPartitionIDs ...int64) error

// MemConsumed returns its memory usage.
MemConsumed() (size int64)
Expand All @@ -215,8 +231,8 @@ type StatsCache interface {
// Put puts this table stats into the cache.
Put(tableID int64, t *statistics.Table)

// UpdateStatsCache updates the cache.
UpdateStatsCache(addedTables []*statistics.Table, deletedTableIDs []int64)
// UpdateStatsCache applies a batch of changes to the cache
UpdateStatsCache(update CacheUpdate)

// GetNextCheckVersionWithOffset returns the last version with offset.
// It is used to fetch updated statistics from the stats meta table.
Expand Down