Skip to content

Commit ba5823b

Browse files
authored
*: move ddl notifier as domain member and test pub/sub (#56776)
ref #55722
1 parent 64ecfa1 commit ba5823b

File tree

19 files changed

+216
-169
lines changed

19 files changed

+216
-169
lines changed

pkg/ddl/constant.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,11 @@ const (
120120
key idx_task_key(task_key),
121121
key idx_state_update_time(state_update_time))`
122122

123+
// NotifierTableName is `tidb_ddl_notifier`.
124+
NotifierTableName = "tidb_ddl_notifier"
125+
123126
// NotifierTableSQL is the CREATE TABLE SQL of `tidb_ddl_notifier`.
124-
NotifierTableSQL = `CREATE TABLE tidb_ddl_notifier (
127+
NotifierTableSQL = `CREATE TABLE ` + NotifierTableName + ` (
125128
ddl_job_id BIGINT,
126129
sub_job_id BIGINT COMMENT '-1 if the schema change does not belong to a multi-schema change DDL or a merged DDL. 0 or positive numbers representing the sub-job index of a multi-schema change DDL or a merged DDL',
127130
schema_change LONGBLOB COMMENT 'SchemaChangeEvent at rest',

pkg/ddl/ddl.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ type ddl struct {
262262
enableTiFlashPoll *atomicutil.Bool
263263
sysTblMgr systable.Manager
264264
minJobIDRefresher *systable.MinJobIDRefresher
265+
eventPublishStore notifier.Store
265266

266267
executor *executor
267268
jobSubmitter *JobSubmitter
@@ -572,14 +573,35 @@ func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *mo
572573
return nil
573574
}
574575

575-
if intest.InTest && notifier.DefaultStore != nil {
576+
ch := jobCtx.oldDDLCtx.ddlEventCh
577+
if ch != nil {
578+
forLoop:
579+
for i := 0; i < 10; i++ {
580+
select {
581+
case ch <- e:
582+
break forLoop
583+
default:
584+
time.Sleep(time.Microsecond * 10)
585+
}
586+
}
587+
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
588+
}
589+
590+
if intest.InTest && jobCtx.eventPublishStore != nil {
576591
failpoint.Inject("asyncNotifyEventError", func() {
577592
failpoint.Return(errors.New("mock publish event error"))
578593
})
579594
if subJobID == noSubJob && job.MultiSchemaInfo != nil {
580595
subJobID = int64(job.MultiSchemaInfo.Seq)
581596
}
582-
err := notifier.PubSchemaChange(jobCtx.ctx, sctx, job.ID, subJobID, e)
597+
err := notifier.PubSchemeChangeToStore(
598+
jobCtx.stepCtx,
599+
sctx,
600+
job.ID,
601+
subJobID,
602+
e,
603+
jobCtx.eventPublishStore,
604+
)
583605
if err != nil {
584606
logutil.DDLLogger().Error("Error publish schema change event",
585607
zap.Int64("jobID", job.ID),
@@ -589,19 +611,6 @@ func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *mo
589611
}
590612
return nil
591613
}
592-
593-
ch := jobCtx.oldDDLCtx.ddlEventCh
594-
if ch != nil {
595-
for i := 0; i < 10; i++ {
596-
select {
597-
case ch <- e:
598-
return nil
599-
default:
600-
time.Sleep(time.Microsecond * 10)
601-
}
602-
}
603-
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
604-
}
605614
return nil
606615
}
607616

@@ -665,6 +674,7 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
665674
d := &ddl{
666675
ddlCtx: ddlCtx,
667676
enableTiFlashPoll: atomicutil.NewBool(true),
677+
eventPublishStore: opt.EventPublishStore,
668678
}
669679

670680
taskexecutor.RegisterTaskType(proto.Backfill,

pkg/ddl/index.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,14 +1095,17 @@ SwitchIndexState:
10951095
}
10961096
job.FillFinishedArgs(a)
10971097

1098+
addIndexEvent := notifier.NewAddIndexEvent(tblInfo, allIndexInfos)
1099+
err2 := asyncNotifyEvent(jobCtx, addIndexEvent, job, noSubJob, w.sess)
1100+
if err2 != nil {
1101+
return ver, errors.Trace(err2)
1102+
}
1103+
10981104
// Finish this job.
10991105
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
11001106
if !job.ReorgMeta.IsDistReorg && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
11011107
ingest.LitBackCtxMgr.Unregister(job.ID)
11021108
}
1103-
// TODO: store this event to the notifier.
1104-
// For now, it is not used and just for placeholder.
1105-
_ = notifier.NewAddIndexEvent(tblInfo, allIndexInfos)
11061109
logutil.DDLLogger().Info("run add index job done",
11071110
zap.String("charset", job.Charset),
11081111
zap.String("collation", job.Collate))

pkg/ddl/job_scheduler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/pingcap/kvproto/pkg/kvrpcpb"
3232
"github.com/pingcap/tidb/pkg/ddl/ingest"
3333
"github.com/pingcap/tidb/pkg/ddl/logutil"
34+
"github.com/pingcap/tidb/pkg/ddl/notifier"
3435
"github.com/pingcap/tidb/pkg/ddl/schemaver"
3536
"github.com/pingcap/tidb/pkg/ddl/serverstate"
3637
sess "github.com/pingcap/tidb/pkg/ddl/session"
@@ -108,6 +109,7 @@ func (l *ownerListener) OnBecomeOwner() {
108109
unSyncedTracker: newUnSyncedJobTracker(),
109110
schemaVerMgr: newSchemaVersionManager(l.ddl.store),
110111
schemaVerSyncer: l.ddl.schemaVerSyncer,
112+
eventPublishStore: l.ddl.eventPublishStore,
111113

112114
ddlCtx: l.ddl.ddlCtx,
113115
ddlJobNotifyCh: l.jobSubmitter.ddlJobNotifyCh,
@@ -141,6 +143,7 @@ type jobScheduler struct {
141143
unSyncedTracker *unSyncedJobTracker
142144
schemaVerMgr *schemaVersionManager
143145
schemaVerSyncer schemaver.Syncer
146+
eventPublishStore notifier.Store
144147

145148
// those fields are created or initialized on start
146149
reorgWorkerPool *workerPool
@@ -534,6 +537,7 @@ func (s *jobScheduler) getJobRunCtx(jobID int64, traceInfo *model.TraceInfo) *jo
534537
autoidCli: s.autoidCli,
535538
store: s.store,
536539
schemaVerSyncer: s.schemaVerSyncer,
540+
eventPublishStore: s.eventPublishStore,
537541

538542
notifyCh: ch,
539543
logger: tidblogutil.LoggerWithTraceInfo(

pkg/ddl/job_worker.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/failpoint"
2828
"github.com/pingcap/kvproto/pkg/kvrpcpb"
2929
"github.com/pingcap/tidb/pkg/ddl/logutil"
30+
"github.com/pingcap/tidb/pkg/ddl/notifier"
3031
"github.com/pingcap/tidb/pkg/ddl/schemaver"
3132
sess "github.com/pingcap/tidb/pkg/ddl/session"
3233
"github.com/pingcap/tidb/pkg/ddl/systable"
@@ -81,11 +82,12 @@ type jobContext struct {
8182
*schemaVersionManager
8283
// ctx is the context of job scheduler. When worker is running the job, it should
8384
// use stepCtx instead.
84-
ctx context.Context
85-
infoCache *infoschema.InfoCache
86-
autoidCli *autoid.ClientDiscover
87-
store kv.Storage
88-
schemaVerSyncer schemaver.Syncer
85+
ctx context.Context
86+
infoCache *infoschema.InfoCache
87+
autoidCli *autoid.ClientDiscover
88+
store kv.Storage
89+
schemaVerSyncer schemaver.Syncer
90+
eventPublishStore notifier.Store
8991

9092
// per job fields, they are not changed in the life cycle of this context.
9193

pkg/ddl/notifier/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"//pkg/ddl/session",
1515
"//pkg/kv",
1616
"//pkg/meta/model",
17+
"//pkg/session/types",
1718
"//pkg/sessionctx",
1819
"//pkg/util/intest",
1920
"//pkg/util/logutil",
@@ -33,6 +34,7 @@ go_test(
3334
flaky = True,
3435
shard_count = 6,
3536
deps = [
37+
"//pkg/ddl",
3638
"//pkg/ddl/session",
3739
"//pkg/meta/model",
3840
"//pkg/parser/model",

pkg/ddl/notifier/publish.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,11 @@ import (
2020
sess "github.com/pingcap/tidb/pkg/ddl/session"
2121
)
2222

23-
// PubSchemaChange publishes schema changes to the cluster to notify other
24-
// components. It stages changes in given `se` so they will be visible when `se`
25-
// further commits. When the schema change is not from multi-schema change DDL,
26-
// `multiSchemaChangeSeq` is -1. Otherwise, `multiSchemaChangeSeq` is the sub-job
27-
// index of the multi-schema change DDL.
28-
func PubSchemaChange(
29-
ctx context.Context,
30-
se *sess.Session,
31-
ddlJobID int64,
32-
multiSchemaChangeSeq int64,
33-
event *SchemaChangeEvent,
34-
) error {
35-
return PubSchemeChangeToStore(
36-
ctx,
37-
se,
38-
ddlJobID,
39-
multiSchemaChangeSeq,
40-
event,
41-
DefaultStore,
42-
)
43-
}
44-
45-
// PubSchemeChangeToStore is exposed for testing. Caller should use
46-
// PubSchemaChange instead.
23+
// PubSchemeChangeToStore publishes schema changes to the store to notify
24+
// subscribers on the Store. It stages changes in given `se` so they will be
25+
// visible when `se` further commits. When the schema change is not from
26+
// multi-schema change DDL, `multiSchemaChangeSeq` is -1. Otherwise,
27+
// `multiSchemaChangeSeq` is the sub-job index of the multi-schema change DDL.
4728
func PubSchemeChangeToStore(
4829
ctx context.Context,
4930
se *sess.Session,

pkg/ddl/notifier/store.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,6 @@ type Store interface {
3737
List(ctx context.Context, se *sess.Session) ([]*schemaChange, error)
3838
}
3939

40-
// DefaultStore is the system table store. Still WIP now.
41-
var DefaultStore Store
42-
4340
type tableStore struct {
4441
db string
4542
table string

0 commit comments

Comments
 (0)