Skip to content

Commit b71f0c0

Browse files
authored
statistics: handle the prune mode correctly in the refresher (#57096)
ref #55906
1 parent f238540 commit b71f0c0

File tree

6 files changed

+143
-35
lines changed

6 files changed

+143
-35
lines changed

pkg/statistics/handle/autoanalyze/autoanalyze.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
333333
sa.refresher.ProcessDMLChangesForTest()
334334
sa.refresher.RequeueFailedJobsForTest()
335335
}
336-
analyzed := sa.refresher.AnalyzeHighestPriorityTables()
336+
analyzed := sa.refresher.AnalyzeHighestPriorityTables(sctx)
337337
// During the test, we need to wait for the auto analyze job to be finished.
338338
if intest.InTest {
339339
sa.refresher.WaitAutoAnalyzeFinishedForTest()

pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -167,21 +167,37 @@ func TestRefreshLastAnalysisDuration(t *testing.T) {
167167
require.Len(t, runningJobs, 2)
168168
}
169169

170-
func TestProcessDMLChanges(t *testing.T) {
170+
func testProcessDMLChanges(t *testing.T, partitioned bool) {
171171
store, dom := testkit.CreateMockStoreAndDomain(t)
172172
handle := dom.StatsHandle()
173173
tk := testkit.NewTestKit(t, store)
174-
tk.MustExec("use test")
175-
tk.MustExec("create table t1 (a int)")
176-
tk.MustExec("create table t2 (a int)")
174+
ctx := context.Background()
175+
if partitioned {
176+
tk.MustExec("use test")
177+
tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))")
178+
tk.MustExec("create table t2 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))")
179+
// Because we don't handle the DDL events in unit tests by default,
180+
// we need to use this way to make sure the stats record for the global table is created.
181+
// Insert some rows into the tables.
182+
tk.MustExec("insert into t1 values (11)")
183+
tk.MustExec("insert into t2 values (12)")
184+
require.NoError(t, handle.DumpStatsDeltaToKV(true))
185+
// Analyze the tables.
186+
tk.MustExec("analyze table t1")
187+
tk.MustExec("analyze table t2")
188+
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
189+
} else {
190+
tk.MustExec("use test")
191+
tk.MustExec("create table t1 (a int)")
192+
tk.MustExec("create table t2 (a int)")
193+
}
177194
tk.MustExec("insert into t1 values (1)")
178-
tk.MustExec("insert into t2 values (1)")
195+
tk.MustExec("insert into t2 values (1), (2)")
179196
statistics.AutoAnalyzeMinCnt = 0
180197
defer func() {
181198
statistics.AutoAnalyzeMinCnt = 1000
182199
}()
183200

184-
ctx := context.Background()
185201
require.NoError(t, handle.DumpStatsDeltaToKV(true))
186202
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
187203
schema := pmodel.NewCIStr("test")
@@ -205,10 +221,10 @@ func TestProcessDMLChanges(t *testing.T) {
205221
require.NoError(t, job2.Analyze(handle, dom.SysProcTracker()))
206222
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
207223

208-
// Insert 10 rows into t1.
209-
tk.MustExec("insert into t1 values (2), (3), (4), (5), (6), (7), (8), (9), (10), (11)")
210-
// Insert 2 rows into t2.
211-
tk.MustExec("insert into t2 values (2), (3)")
224+
// Insert 9 rows into t1.
225+
tk.MustExec("insert into t1 values (3), (4), (5), (6), (7), (8), (9), (10), (11)")
226+
// Insert 1 row into t2.
227+
tk.MustExec("insert into t2 values (3)")
212228

213229
// Dump the stats to kv.
214230
require.NoError(t, handle.DumpStatsDeltaToKV(true))
@@ -242,6 +258,14 @@ func TestProcessDMLChanges(t *testing.T) {
242258
require.Equal(t, tbl2.Meta().ID, updatedJob2.GetTableID(), "t2 should have higher weight due to smaller table size")
243259
}
244260

261+
func TestProcessDMLChanges(t *testing.T) {
262+
testProcessDMLChanges(t, false)
263+
}
264+
265+
func TestProcessDMLChangesPartitioned(t *testing.T) {
266+
testProcessDMLChanges(t, true)
267+
}
268+
245269
func TestProcessDMLChangesWithRunningJobs(t *testing.T) {
246270
store, dom := testkit.CreateMockStoreAndDomain(t)
247271
handle := dom.StatsHandle()

pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ go_test(
3333
"worker_test.go",
3434
],
3535
flaky = True,
36-
shard_count = 8,
36+
shard_count = 9,
3737
deps = [
3838
":refresher",
3939
"//pkg/parser/model",
@@ -42,6 +42,7 @@ go_test(
4242
"//pkg/statistics",
4343
"//pkg/statistics/handle/autoanalyze/priorityqueue",
4444
"//pkg/statistics/handle/types",
45+
"//pkg/statistics/handle/util",
4546
"//pkg/testkit",
4647
"//pkg/testkit/testsetup",
4748
"@com_github_stretchr_testify//require",

pkg/statistics/handle/autoanalyze/refresher/refresher.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,33 +82,29 @@ func (r *Refresher) UpdateConcurrency() {
8282
}
8383

8484
// AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them.
85-
func (r *Refresher) AnalyzeHighestPriorityTables() bool {
86-
se, err := r.statsHandle.SPool().Get()
87-
if err != nil {
88-
statslogutil.StatsLogger().Error("Failed to get session context", zap.Error(err))
89-
return false
90-
}
91-
defer r.statsHandle.SPool().Put(se)
92-
93-
sctx := se.(sessionctx.Context)
85+
// Note: Make sure the session has the latest variable values.
86+
// Usually, this is done by the caller through `util.CallWithSCtx`.
87+
func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
9488
parameters := exec.GetAutoAnalyzeParameters(sctx)
95-
err = r.setAutoAnalysisTimeWindow(parameters)
89+
err := r.setAutoAnalysisTimeWindow(parameters)
9690
if err != nil {
9791
statslogutil.StatsLogger().Error("Set auto analyze time window failed", zap.Error(err))
9892
return false
9993
}
10094
if !r.isWithinTimeWindow() {
10195
return false
10296
}
97+
currentAutoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
98+
currentPruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
10399
if !r.jobs.IsInitialized() {
104100
if err := r.jobs.Initialize(); err != nil {
105101
statslogutil.StatsLogger().Error("Failed to initialize the queue", zap.Error(err))
106102
return false
107103
}
104+
r.lastSeenAutoAnalyzeRatio = currentAutoAnalyzeRatio
105+
r.lastSeenPruneMode = currentPruneMode
108106
} else {
109107
// Only do this if the queue is already initialized.
110-
currentAutoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
111-
currentPruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
112108
if currentAutoAnalyzeRatio != r.lastSeenAutoAnalyzeRatio || currentPruneMode != r.lastSeenPruneMode {
113109
r.lastSeenAutoAnalyzeRatio = currentAutoAnalyzeRatio
114110
r.lastSeenPruneMode = currentPruneMode

pkg/statistics/handle/autoanalyze/refresher/refresher_test.go

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,66 @@ import (
1919
"testing"
2020

2121
pmodel "github.com/pingcap/tidb/pkg/parser/model"
22+
"github.com/pingcap/tidb/pkg/sessionctx"
2223
"github.com/pingcap/tidb/pkg/statistics"
2324
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher"
25+
"github.com/pingcap/tidb/pkg/statistics/handle/util"
2426
"github.com/pingcap/tidb/pkg/testkit"
2527
"github.com/stretchr/testify/require"
2628
)
2729

30+
func TestChangePruneMode(t *testing.T) {
31+
statistics.AutoAnalyzeMinCnt = 0
32+
defer func() {
33+
statistics.AutoAnalyzeMinCnt = 1000
34+
}()
35+
36+
store, dom := testkit.CreateMockStoreAndDomain(t)
37+
handle := dom.StatsHandle()
38+
tk := testkit.NewTestKit(t, store)
39+
tk.MustExec("use test")
40+
tk.MustExec("create table t1 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (140))")
41+
tk.MustExec("insert into t1 values (0, 0)")
42+
require.NoError(t, handle.DumpStatsDeltaToKV(true))
43+
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
44+
tk.MustExec("analyze table t1")
45+
r := refresher.NewRefresher(handle, dom.SysProcTracker(), dom.DDLNotifier())
46+
defer r.Close()
47+
48+
// Insert more data to each partition.
49+
tk.MustExec("insert into t1 values (1, 1), (11, 11)")
50+
require.NoError(t, handle.DumpStatsDeltaToKV(true))
51+
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
52+
53+
// Two jobs are added because the prune mode is static.
54+
tk.MustExec("set global tidb_partition_prune_mode = 'static'")
55+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
56+
require.True(t, handle.HandleAutoAnalyze())
57+
return nil
58+
}))
59+
r.WaitAutoAnalyzeFinishedForTest()
60+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
61+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
62+
return nil
63+
}))
64+
r.WaitAutoAnalyzeFinishedForTest()
65+
require.Equal(t, 0, r.Len())
66+
67+
// Insert more data to each partition.
68+
tk.MustExec("insert into t1 values (2, 2), (3, 3), (4, 4), (12, 12), (13, 13), (14, 14)")
69+
require.NoError(t, handle.DumpStatsDeltaToKV(true))
70+
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
71+
72+
// One job is added because the prune mode is dynamic.
73+
tk.MustExec("set global tidb_partition_prune_mode = 'dynamic'")
74+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
75+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
76+
return nil
77+
}))
78+
r.WaitAutoAnalyzeFinishedForTest()
79+
require.Equal(t, 0, r.Len())
80+
}
81+
2882
func TestSkipAnalyzeTableWhenAutoAnalyzeRatioIsZero(t *testing.T) {
2983
statistics.AutoAnalyzeMinCnt = 0
3084
defer func() {
@@ -67,12 +121,19 @@ func TestSkipAnalyzeTableWhenAutoAnalyzeRatioIsZero(t *testing.T) {
67121
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
68122
defer r.Close()
69123
// No jobs are added.
70-
require.False(t, r.AnalyzeHighestPriorityTables())
124+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
125+
require.False(t, r.AnalyzeHighestPriorityTables(sctx))
126+
return nil
127+
}))
71128
require.Equal(t, 0, r.Len())
72129
// Enable the auto analyze.
73130
tk.MustExec("set global tidb_auto_analyze_ratio = 0.2")
74131
// Jobs are added.
75-
require.True(t, r.AnalyzeHighestPriorityTables())
132+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
133+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
134+
return nil
135+
}))
136+
require.Equal(t, 0, r.Len())
76137
}
77138

78139
func TestIgnoreNilOrPseudoStatsOfPartitionedTable(t *testing.T) {
@@ -92,7 +153,10 @@ func TestIgnoreNilOrPseudoStatsOfPartitionedTable(t *testing.T) {
92153
sysProcTracker := dom.SysProcTracker()
93154
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
94155
defer r.Close()
95-
require.False(t, r.AnalyzeHighestPriorityTables())
156+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
157+
require.False(t, r.AnalyzeHighestPriorityTables(sctx))
158+
return nil
159+
}))
96160
require.Equal(t, 0, r.Len(), "No jobs are added because table stats are nil")
97161
}
98162

@@ -113,7 +177,10 @@ func TestIgnoreNilOrPseudoStatsOfNonPartitionedTable(t *testing.T) {
113177
sysProcTracker := dom.SysProcTracker()
114178
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
115179
defer r.Close()
116-
require.False(t, r.AnalyzeHighestPriorityTables())
180+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
181+
require.False(t, r.AnalyzeHighestPriorityTables(sctx))
182+
return nil
183+
}))
117184
require.Equal(t, 0, r.Len(), "No jobs are added because table stats are nil")
118185
}
119186

@@ -158,7 +225,10 @@ func TestIgnoreTinyTable(t *testing.T) {
158225
sysProcTracker := dom.SysProcTracker()
159226
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
160227
defer r.Close()
161-
require.True(t, r.AnalyzeHighestPriorityTables())
228+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
229+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
230+
return nil
231+
}))
162232
require.Equal(t, 0, r.Len(), "Only t1 is added to the job queue, because t2 is a tiny table(not enough data)")
163233
}
164234

@@ -194,7 +264,10 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
194264
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
195265
defer r.Close()
196266
// Analyze t1 first.
197-
require.True(t, r.AnalyzeHighestPriorityTables())
267+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
268+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
269+
return nil
270+
}))
198271
r.WaitAutoAnalyzeFinishedForTest()
199272
require.NoError(t, handle.DumpStatsDeltaToKV(true))
200273
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
@@ -212,7 +285,10 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
212285
tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2)
213286
require.Equal(t, int64(6), tblStats2.ModifyCount)
214287
// Do one more round.
215-
require.True(t, r.AnalyzeHighestPriorityTables())
288+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
289+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
290+
return nil
291+
}))
216292
r.WaitAutoAnalyzeFinishedForTest()
217293
// t2 is analyzed.
218294
pid2 = tbl2.Meta().GetPartitionInfo().Definitions[1].ID
@@ -257,7 +333,10 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {
257333
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
258334
defer r.Close()
259335
// Analyze tables concurrently.
260-
require.True(t, r.AnalyzeHighestPriorityTables())
336+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
337+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
338+
return nil
339+
}))
261340
r.WaitAutoAnalyzeFinishedForTest()
262341
require.NoError(t, handle.DumpStatsDeltaToKV(true))
263342
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
@@ -284,7 +363,10 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {
284363
require.Equal(t, int64(4), tblStats3.ModifyCount)
285364

286365
// Do one more round to analyze t3.
287-
require.True(t, r.AnalyzeHighestPriorityTables())
366+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
367+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
368+
return nil
369+
}))
288370
r.WaitAutoAnalyzeFinishedForTest()
289371
require.NoError(t, handle.DumpStatsDeltaToKV(true))
290372
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
@@ -322,7 +404,10 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) {
322404
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
323405
defer r.Close()
324406

325-
r.AnalyzeHighestPriorityTables()
407+
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
408+
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
409+
return nil
410+
}))
326411
r.WaitAutoAnalyzeFinishedForTest()
327412

328413
is := dom.InfoSchema()

pkg/statistics/handle/updatetest/update_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,8 @@ func TestAutoAnalyzePartitionTableAfterAddingIndex(t *testing.T) {
12841284
tblInfo := tbl.Meta()
12851285
idxInfo := tblInfo.Indices[0]
12861286
require.Nil(t, h.GetTableStats(tblInfo).GetIdx(idxInfo.ID))
1287-
require.True(t, h.HandleAutoAnalyze())
1287+
require.Eventually(t, func() bool {
1288+
return h.HandleAutoAnalyze()
1289+
}, 3*time.Second, time.Millisecond*100)
12881290
require.NotNil(t, h.GetTableStats(tblInfo).GetIdx(idxInfo.ID))
12891291
}

0 commit comments

Comments
 (0)