Skip to content

Commit 5d990c6

Browse files
authored
ddl: sync schema version using watch, notify sessions on owner node by job id (#53217)
ref #53246
1 parent 1c4a9c6 commit 5d990c6

File tree

8 files changed

+514
-50
lines changed

8 files changed

+514
-50
lines changed

pkg/ddl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ go_library(
143143
"//pkg/util/engine",
144144
"//pkg/util/filter",
145145
"//pkg/util/gcutil",
146+
"//pkg/util/generic",
146147
"//pkg/util/hack",
147148
"//pkg/util/intest",
148149
"//pkg/util/logutil",

pkg/ddl/ddl.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import (
6464
"github.com/pingcap/tidb/pkg/util/dbterror"
6565
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
6666
"github.com/pingcap/tidb/pkg/util/gcutil"
67+
"github.com/pingcap/tidb/pkg/util/generic"
6768
"github.com/pingcap/tidb/pkg/util/syncutil"
6869
"github.com/tikv/client-go/v2/tikvrpc"
6970
clientv3 "go.etcd.io/etcd/client/v3"
@@ -366,15 +367,17 @@ type ddlCtx struct {
366367
ownerManager owner.Manager
367368
schemaSyncer syncer.SchemaSyncer
368369
stateSyncer syncer.StateSyncer
369-
ddlJobDoneCh chan struct{}
370-
ddlEventCh chan<- *statsutil.DDLEvent
371-
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
372-
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
373-
infoCache *infoschema.InfoCache
374-
statsHandle *handle.Handle
375-
tableLockCkr util.DeadTableLockChecker
376-
etcdCli *clientv3.Client
377-
autoidCli *autoid.ClientDiscover
370+
// ddlJobDoneChMap is used to notify the session that the DDL job is finished.
371+
// jobID -> chan struct{}
372+
ddlJobDoneChMap generic.SyncMap[int64, chan struct{}]
373+
ddlEventCh chan<- *statsutil.DDLEvent
374+
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
375+
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
376+
infoCache *infoschema.InfoCache
377+
statsHandle *handle.Handle
378+
tableLockCkr util.DeadTableLockChecker
379+
etcdCli *clientv3.Client
380+
autoidCli *autoid.ClientDiscover
378381

379382
*waitSchemaSyncedController
380383
*schemaVersionManager
@@ -618,6 +621,27 @@ func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) {
618621
rc.notifyJobState(job.State)
619622
}
620623

624+
func (dc *ddlCtx) initJobDoneCh(jobID int64) {
625+
dc.ddlJobDoneChMap.Store(jobID, make(chan struct{}, 1))
626+
}
627+
628+
func (dc *ddlCtx) getJobDoneCh(jobID int64) (chan struct{}, bool) {
629+
return dc.ddlJobDoneChMap.Load(jobID)
630+
}
631+
632+
func (dc *ddlCtx) delJobDoneCh(jobID int64) {
633+
dc.ddlJobDoneChMap.Delete(jobID)
634+
}
635+
636+
func (dc *ddlCtx) notifyJobDone(jobID int64) {
637+
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
638+
select {
639+
case ch <- struct{}{}:
640+
default:
641+
}
642+
}
643+
}
644+
621645
// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
622646
func EnableTiFlashPoll(d any) {
623647
if dd, ok := d.(*ddl); ok {
@@ -711,7 +735,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
711735
uuid: id,
712736
store: opt.Store,
713737
lease: opt.Lease,
714-
ddlJobDoneCh: make(chan struct{}, 1),
738+
ddlJobDoneChMap: generic.NewSyncMap[int64, chan struct{}](10),
715739
ownerManager: manager,
716740
schemaSyncer: schemaSyncer,
717741
stateSyncer: stateSyncer,
@@ -811,6 +835,9 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
811835
})
812836
d.wg.Run(d.startDispatchLoop)
813837
d.wg.Run(d.startLocalWorkerLoop)
838+
d.wg.Run(func() {
839+
d.schemaSyncer.SyncJobSchemaVerLoop(d.ctx)
840+
})
814841
}
815842

816843
// Start implements DDL.Start interface.
@@ -1169,6 +1196,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
11691196

11701197
// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
11711198
err := <-task.errChs[0]
1199+
defer d.delJobDoneCh(job.ID)
11721200
if err != nil {
11731201
// The transaction of enqueuing job is failed.
11741202
return errors.Trace(err)
@@ -1205,13 +1233,14 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
12051233
recordLastDDLInfo(ctx, historyJob)
12061234
}()
12071235
i := 0
1236+
notifyCh, _ := d.getJobDoneCh(job.ID)
12081237
for {
12091238
failpoint.Inject("storeCloseInLoop", func(_ failpoint.Value) {
12101239
_ = d.Stop()
12111240
})
12121241

12131242
select {
1214-
case <-d.ddlJobDoneCh:
1243+
case <-notifyCh:
12151244
case <-ticker.C:
12161245
i++
12171246
ticker = updateTickerInterval(ticker, 10*d.lease, job, i)

pkg/ddl/ddl_worker.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,9 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
494494

495495
jobTasks = append(jobTasks, job)
496496
injectModifyJobArgFailPoint(job)
497+
if !job.LocalMode {
498+
d.initJobDoneCh(job.ID)
499+
}
497500
}
498501

499502
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
@@ -881,7 +884,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
881884
return err
882885
}
883886
CleanupDDLReorgHandles(job, w.sess)
884-
asyncNotify(d.ddlJobDoneCh)
887+
d.notifyJobDone(job.ID)
885888
return nil
886889
}
887890

pkg/ddl/mock.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, jobID int6
146146
}
147147
}
148148

149+
// SyncJobSchemaVerLoop implements SchemaSyncer.SyncJobSchemaVerLoop interface.
150+
func (*MockSchemaSyncer) SyncJobSchemaVerLoop(context.Context) {
151+
}
152+
149153
// Close implements SchemaSyncer.Close interface.
150154
func (*MockSchemaSyncer) Close() {}
151155

pkg/ddl/syncer/BUILD.bazel

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"//pkg/metrics",
1616
"//pkg/sessionctx/variable",
1717
"//pkg/util",
18+
"//pkg/util/disttask",
1819
"@com_github_pingcap_errors//:errors",
1920
"@com_github_pingcap_failpoint//:failpoint",
2021
"@io_etcd_go_etcd_api_v3//mvccpb",
@@ -30,20 +31,23 @@ go_test(
3031
timeout = "short",
3132
srcs = [
3233
"state_syncer_test.go",
34+
"syncer_nokit_test.go",
3335
"syncer_test.go",
3436
],
37+
embed = [":syncer"],
3538
flaky = True,
36-
shard_count = 3,
39+
shard_count = 6,
3740
deps = [
38-
":syncer",
3941
"//pkg/ddl",
4042
"//pkg/ddl/util",
43+
"//pkg/domain/infosync",
4144
"//pkg/infoschema",
4245
"//pkg/parser/terror",
4346
"//pkg/sessionctx/variable",
4447
"//pkg/store/mockstore",
4548
"//pkg/util",
4649
"@com_github_pingcap_errors//:errors",
50+
"@com_github_pingcap_failpoint//:failpoint",
4751
"@com_github_stretchr_testify//require",
4852
"@io_etcd_go_etcd_api_v3//mvccpb",
4953
"@io_etcd_go_etcd_client_v3//:client",

0 commit comments

Comments
 (0)