Skip to content

Commit 7b9d3ab

Browse files
committed
statistics: use DDL subscriber updating stats meta
Signed-off-by: Rustin170506 <[email protected]>
1 parent 71e70af commit 7b9d3ab

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1026
-876
lines changed

pkg/ddl/ddl.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -588,18 +588,21 @@ func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *mo
588588
return nil
589589
}
590590

591-
ch := jobCtx.oldDDLCtx.ddlEventCh
592-
if ch != nil {
593-
forLoop:
594-
for i := 0; i < 10; i++ {
595-
select {
596-
case ch <- e:
597-
break forLoop
598-
default:
599-
time.Sleep(time.Microsecond * 10)
591+
// Only for test.
592+
if intest.InTest {
593+
ch := jobCtx.oldDDLCtx.ddlEventCh
594+
if ch != nil {
595+
forLoop:
596+
for i := 0; i < 10; i++ {
597+
select {
598+
case ch <- e:
599+
break forLoop
600+
default:
601+
time.Sleep(time.Microsecond * 10)
602+
}
600603
}
604+
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
601605
}
602-
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
603606
}
604607

605608
intest.Assert(jobCtx.eventPublishStore != nil, "eventPublishStore should not be nil")

pkg/ddl/tests/partition/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ go_test(
3333
"//pkg/sessionctx",
3434
"//pkg/sessionctx/variable",
3535
"//pkg/sessiontxn",
36+
"//pkg/statistics/handle/util",
3637
"//pkg/store/gcworker",
3738
"//pkg/store/mockstore",
3839
"//pkg/table",

pkg/ddl/tests/partition/db_partition_test.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/pingcap/tidb/pkg/sessionctx"
4343
"github.com/pingcap/tidb/pkg/sessionctx/variable"
4444
"github.com/pingcap/tidb/pkg/sessiontxn"
45+
"github.com/pingcap/tidb/pkg/statistics/handle/util"
4546
"github.com/pingcap/tidb/pkg/store/mockstore"
4647
"github.com/pingcap/tidb/pkg/table"
4748
"github.com/pingcap/tidb/pkg/table/tables"
@@ -2947,7 +2948,11 @@ func TestRemoveKeyPartitioning(t *testing.T) {
29472948
tk.MustExec("create database RemovePartitioning")
29482949
tk.MustExec("use RemovePartitioning")
29492950
tk.MustExec(`create table t (a varchar(255), b varchar(255), key (a,b), key (b)) partition by key (a) partitions 7`)
2950-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
2951+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
2952+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
2953+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
2954+
}, util.FlagWrapTxn)
2955+
require.NoError(t, err)
29512956
// Fill the data with ascii strings
29522957
for i := 32; i <= 126; i++ {
29532958
tk.MustExec(fmt.Sprintf(`insert into t values (char(%d,%d,%d),char(%d,%d,%d,%d))`, i, i, i, i, i, i, i))
@@ -2979,7 +2984,11 @@ func TestRemoveKeyPartitioning(t *testing.T) {
29792984
" KEY `b` (`b`)\n" +
29802985
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
29812986
// Statistics are updated asynchronously
2982-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
2987+
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
2988+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
2989+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
2990+
}, util.FlagWrapTxn)
2991+
require.NoError(t, err)
29832992
// And also cached and lazy loaded
29842993
h.Clear()
29852994
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
@@ -2997,7 +3006,11 @@ func TestRemoveListPartitioning(t *testing.T) {
29973006
tk.MustExec("create database RemoveListPartitioning")
29983007
tk.MustExec("use RemoveListPartitioning")
29993008
tk.MustExec(`create table t (a int, b varchar(255), key (a,b), key (b)) partition by list (a) (partition p0 values in (0), partition p1 values in (1), partition p2 values in (2), partition p3 values in (3), partition p4 values in (4))`)
3000-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
3009+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
3010+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
3011+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
3012+
}, util.FlagWrapTxn)
3013+
require.NoError(t, err)
30013014
// Fill the data with ascii strings
30023015
for i := 32; i <= 126; i++ {
30033016
tk.MustExec(fmt.Sprintf(`insert into t values (%d,char(%d,%d,%d,%d))`, i%5, i, i, i, i))
@@ -3025,7 +3038,11 @@ func TestRemoveListPartitioning(t *testing.T) {
30253038
" KEY `b` (`b`)\n" +
30263039
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
30273040
// Statistics are updated asynchronously
3028-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
3041+
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
3042+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
3043+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
3044+
}, util.FlagWrapTxn)
3045+
require.NoError(t, err)
30293046
// And also cached and lazy loaded
30303047
h.Clear()
30313048
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
@@ -3043,7 +3060,11 @@ func TestRemoveListColumnPartitioning(t *testing.T) {
30433060
tk.MustExec("create database RemoveListPartitioning")
30443061
tk.MustExec("use RemoveListPartitioning")
30453062
tk.MustExec(`create table t (a varchar(255), b varchar(255), key (a,b), key (b)) partition by list columns (a) (partition p0 values in ("0"), partition p1 values in ("1"), partition p2 values in ("2"), partition p3 values in ("3"), partition p4 values in ("4"))`)
3046-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
3063+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
3064+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
3065+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
3066+
}, util.FlagWrapTxn)
3067+
require.NoError(t, err)
30473068
// Fill the data with ascii strings
30483069
for i := 32; i <= 126; i++ {
30493070
tk.MustExec(fmt.Sprintf(`insert into t values ("%d",char(%d,%d,%d,%d))`, i%5, i, i, i, i))
@@ -3071,7 +3092,11 @@ func TestRemoveListColumnPartitioning(t *testing.T) {
30713092
" KEY `b` (`b`)\n" +
30723093
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
30733094
// Statistics are updated asynchronously
3074-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
3095+
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
3096+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
3097+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
3098+
}, util.FlagWrapTxn)
3099+
require.NoError(t, err)
30753100
// And also cached and lazy loaded
30763101
h.Clear()
30773102
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
@@ -3089,7 +3114,11 @@ func TestRemoveListColumnsPartitioning(t *testing.T) {
30893114
tk.MustExec("create database RemoveListPartitioning")
30903115
tk.MustExec("use RemoveListPartitioning")
30913116
tk.MustExec(`create table t (a int, b varchar(255), key (a,b), key (b)) partition by list columns (a,b) (partition p0 values in ((0,"0")), partition p1 values in ((1,"1")), partition p2 values in ((2,"2")), partition p3 values in ((3,"3")), partition p4 values in ((4,"4")))`)
3092-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
3117+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
3118+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
3119+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
3120+
}, util.FlagWrapTxn)
3121+
require.NoError(t, err)
30933122
// Fill the data
30943123
for i := 32; i <= 126; i++ {
30953124
tk.MustExec(fmt.Sprintf(`insert into t values (%d,"%d")`, i%5, i%5))
@@ -3117,7 +3146,11 @@ func TestRemoveListColumnsPartitioning(t *testing.T) {
31173146
" KEY `b` (`b`)\n" +
31183147
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
31193148
// Statistics are updated asynchronously
3120-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
3149+
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
3150+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
3151+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
3152+
}, util.FlagWrapTxn)
3153+
require.NoError(t, err)
31213154
// And also cached and lazy loaded
31223155
h.Clear()
31233156
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))

pkg/domain/domain.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,9 +2413,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
24132413
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
24142414
// These tasks do not interfere with or depend on the initialization process.
24152415
do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker")
2416-
do.wg.Run(func() {
2417-
do.handleDDLEvent()
2418-
}, "handleDDLEvent")
2416+
do.ddlNotifier.RegisterHandler(notifier.StatsMetaHandlerID, do.StatsHandle().HandleDDLEvent)
24192417
// Wait for the stats worker to finish the initialization.
24202418
// Otherwise, we may start the auto analyze worker before the stats cache is initialized.
24212419
do.wg.Run(
@@ -2615,24 +2613,6 @@ func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle)
26152613
}
26162614
}
26172615

2618-
func (do *Domain) handleDDLEvent() {
2619-
logutil.BgLogger().Info("handleDDLEvent started.")
2620-
defer util.Recover(metrics.LabelDomain, "handleDDLEvent", nil, false)
2621-
statsHandle := do.StatsHandle()
2622-
for {
2623-
select {
2624-
case <-do.exit:
2625-
return
2626-
// This channel is sent only by ddl owner.
2627-
case t := <-statsHandle.DDLEventCh():
2628-
err := statsHandle.HandleDDLEvent(t)
2629-
if err != nil {
2630-
logutil.BgLogger().Error("handle ddl event failed", zap.String("event", t.String()), zap.Error(err))
2631-
}
2632-
}
2633-
}
2634-
}
2635-
26362616
func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
26372617
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
26382618
logutil.BgLogger().Info("updateStatsWorker started.")

pkg/executor/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ go_test(
439439
"//pkg/sessiontxn/staleread",
440440
"//pkg/statistics",
441441
"//pkg/statistics/handle/storage",
442+
"//pkg/statistics/handle/util",
442443
"//pkg/statistics/util",
443444
"//pkg/store/copr",
444445
"//pkg/store/driver/error",

pkg/executor/infoschema_reader_test.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@ import (
2727
"github.com/pingcap/failpoint"
2828
"github.com/pingcap/kvproto/pkg/kvrpcpb"
2929
"github.com/pingcap/tidb/pkg/domain/infosync"
30+
"github.com/pingcap/tidb/pkg/kv"
3031
"github.com/pingcap/tidb/pkg/meta/model"
3132
"github.com/pingcap/tidb/pkg/parser/auth"
3233
"github.com/pingcap/tidb/pkg/parser/mysql"
34+
"github.com/pingcap/tidb/pkg/sessionctx"
3335
"github.com/pingcap/tidb/pkg/sessionctx/variable"
36+
"github.com/pingcap/tidb/pkg/statistics/handle/util"
3437
"github.com/pingcap/tidb/pkg/store/mockstore"
3538
"github.com/pingcap/tidb/pkg/testkit"
3639
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
@@ -173,7 +176,11 @@ func TestDataForTableStatsField(t *testing.T) {
173176
tk.MustExec("use test")
174177
tk.MustExec("drop table if exists t")
175178
tk.MustExec("create table t (c int, d int, e char(5), index idx(e))")
176-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
179+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
180+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
181+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
182+
}, util.FlagWrapTxn)
183+
require.NoError(t, err)
177184
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
178185
testkit.Rows("0 0 0 0"))
179186
tk.MustExec(`insert into t(c, d, e) values(1, 2, "c"), (2, 3, "d"), (3, 4, "e")`)
@@ -200,7 +207,11 @@ func TestDataForTableStatsField(t *testing.T) {
200207
// Test partition table.
201208
tk.MustExec("drop table if exists t")
202209
tk.MustExec(`CREATE TABLE t (a int, b int, c varchar(5), primary key(a), index idx(c)) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16))`)
203-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
210+
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
211+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
212+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
213+
}, util.FlagWrapTxn)
214+
require.NoError(t, err)
204215
tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`)
205216
require.NoError(t, h.DumpStatsDeltaToKV(true))
206217
require.NoError(t, h.Update(context.Background(), is))
@@ -219,7 +230,11 @@ func TestPartitionsTable(t *testing.T) {
219230
testkit.WithPruneMode(tk, variable.Static, func() {
220231
tk.MustExec("DROP TABLE IF EXISTS `test_partitions`;")
221232
tk.MustExec(`CREATE TABLE test_partitions (a int, b int, c varchar(5), primary key(a), index idx(c)) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16));`)
222-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
233+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
234+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
235+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
236+
}, util.FlagWrapTxn)
237+
require.NoError(t, err)
223238
tk.MustExec(`insert into test_partitions(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`)
224239

225240
tk.MustQuery("select PARTITION_NAME, PARTITION_DESCRIPTION from information_schema.PARTITIONS where table_name='test_partitions';").Check(
@@ -245,7 +260,11 @@ func TestPartitionsTable(t *testing.T) {
245260
// Test for table has no partitions.
246261
tk.MustExec("DROP TABLE IF EXISTS `test_partitions_1`;")
247262
tk.MustExec(`CREATE TABLE test_partitions_1 (a int, b int, c varchar(5), primary key(a), index idx(c));`)
248-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
263+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
264+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
265+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
266+
}, util.FlagWrapTxn)
267+
require.NoError(t, err)
249268
tk.MustExec(`insert into test_partitions_1(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`)
250269
require.NoError(t, h.DumpStatsDeltaToKV(true))
251270
require.NoError(t, h.Update(context.Background(), is))

pkg/executor/test/analyzetest/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ go_test(
2727
"//pkg/sessionctx",
2828
"//pkg/sessionctx/variable",
2929
"//pkg/statistics",
30+
"//pkg/statistics/handle/util",
3031
"//pkg/testkit",
3132
"//pkg/testkit/analyzehelper",
3233
"//pkg/util/dbterror/exeerrors",

pkg/executor/test/analyzetest/analyze_test.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/pingcap/tidb/pkg/sessionctx"
4040
"github.com/pingcap/tidb/pkg/sessionctx/variable"
4141
"github.com/pingcap/tidb/pkg/statistics"
42+
"github.com/pingcap/tidb/pkg/statistics/handle/util"
4243
"github.com/pingcap/tidb/pkg/testkit"
4344
"github.com/pingcap/tidb/pkg/testkit/analyzehelper"
4445
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
@@ -496,7 +497,11 @@ func TestAdjustSampleRateNote(t *testing.T) {
496497
statsHandle := domain.GetDomain(tk.Session().(sessionctx.Context)).StatsHandle()
497498
tk.MustExec("drop table if exists t")
498499
tk.MustExec("create table t(a int, index index_a(a))")
499-
require.NoError(t, statsHandle.HandleDDLEvent(<-statsHandle.DDLEventCh()))
500+
err := util.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error {
501+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
502+
return statsHandle.HandleDDLEvent(ctx, sctx, <-statsHandle.DDLEventCh())
503+
}, util.FlagWrapTxn)
504+
require.NoError(t, err)
500505
is := tk.Session().(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema)
501506
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
502507
require.NoError(t, err)
@@ -2719,7 +2724,11 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) {
27192724
tk.MustExec("create table t(a int)")
27202725
analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a")
27212726
h := dom.StatsHandle()
2722-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
2727+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
2728+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
2729+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
2730+
}, util.FlagWrapTxn)
2731+
require.NoError(t, err)
27232732
tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
27242733
require.NoError(t, err)
27252734
tid := tbl.Meta().ID
@@ -2836,7 +2845,11 @@ func TestAnalyzeMVIndex(t *testing.T) {
28362845
"index ij_binary((cast(j->'$.bin' as binary(50) array)))," +
28372846
"index ij_char((cast(j->'$.char' as char(50) array)))" +
28382847
")")
2839-
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
2848+
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
2849+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
2850+
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
2851+
}, util.FlagWrapTxn)
2852+
require.NoError(t, err)
28402853
jsonData := []map[string]any{
28412854
{
28422855
"signed": []int64{1, 2, 300, 300, 0, 4, 5, -40000},

pkg/planner/cardinality/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ go_test(
8080
"//pkg/sessionctx/stmtctx",
8181
"//pkg/sessionctx/variable",
8282
"//pkg/statistics",
83+
"//pkg/statistics/handle/util",
8384
"//pkg/testkit",
8485
"//pkg/testkit/testdata",
8586
"//pkg/testkit/testmain",

0 commit comments

Comments
 (0)