Skip to content

Commit 0efbfd1

Browse files
committed
opt notify job, ut
1 parent 1de5d27 commit 0efbfd1

File tree

4 files changed

+94
-13
lines changed

4 files changed

+94
-13
lines changed

pkg/ddl/ddl.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import (
6464
"github.com/pingcap/tidb/pkg/util/dbterror"
6565
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
6666
"github.com/pingcap/tidb/pkg/util/gcutil"
67+
"github.com/pingcap/tidb/pkg/util/generic"
6768
"github.com/pingcap/tidb/pkg/util/syncutil"
6869
"github.com/tikv/client-go/v2/tikvrpc"
6970
clientv3 "go.etcd.io/etcd/client/v3"
@@ -366,15 +367,17 @@ type ddlCtx struct {
366367
ownerManager owner.Manager
367368
schemaSyncer syncer.SchemaSyncer
368369
stateSyncer syncer.StateSyncer
369-
ddlJobDoneCh chan struct{}
370-
ddlEventCh chan<- *statsutil.DDLEvent
371-
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
372-
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
373-
infoCache *infoschema.InfoCache
374-
statsHandle *handle.Handle
375-
tableLockCkr util.DeadTableLockChecker
376-
etcdCli *clientv3.Client
377-
autoidCli *autoid.ClientDiscover
370+
// ddlJobDoneChMap is used to notify the session that the DDL job is finished.
371+
// jobID -> chan struct{}
372+
ddlJobDoneChMap generic.SyncMap[int64, chan struct{}]
373+
ddlEventCh chan<- *statsutil.DDLEvent
374+
lease time.Duration // lease is schema lease, default 45s, see config.Lease.
375+
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
376+
infoCache *infoschema.InfoCache
377+
statsHandle *handle.Handle
378+
tableLockCkr util.DeadTableLockChecker
379+
etcdCli *clientv3.Client
380+
autoidCli *autoid.ClientDiscover
378381

379382
*waitSchemaSyncedController
380383
*schemaVersionManager
@@ -618,6 +621,27 @@ func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) {
618621
rc.notifyJobState(job.State)
619622
}
620623

624+
func (dc *ddlCtx) initJobDoneCh(jobID int64) {
625+
dc.ddlJobDoneChMap.Store(jobID, make(chan struct{}, 1))
626+
}
627+
628+
func (dc *ddlCtx) getJobDoneCh(jobID int64) (chan struct{}, bool) {
629+
return dc.ddlJobDoneChMap.Load(jobID)
630+
}
631+
632+
func (dc *ddlCtx) delJobDoneCh(jobID int64) {
633+
dc.ddlJobDoneChMap.Delete(jobID)
634+
}
635+
636+
func (dc *ddlCtx) notifyJobDone(jobID int64) {
637+
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
638+
select {
639+
case ch <- struct{}{}:
640+
default:
641+
}
642+
}
643+
}
644+
621645
// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
622646
func EnableTiFlashPoll(d any) {
623647
if dd, ok := d.(*ddl); ok {
@@ -711,7 +735,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
711735
uuid: id,
712736
store: opt.Store,
713737
lease: opt.Lease,
714-
ddlJobDoneCh: make(chan struct{}, 1),
738+
ddlJobDoneChMap: generic.NewSyncMap[int64, chan struct{}](10),
715739
ownerManager: manager,
716740
schemaSyncer: schemaSyncer,
717741
stateSyncer: stateSyncer,
@@ -1177,6 +1201,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
11771201

11781202
// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
11791203
err := <-task.errChs[0]
1204+
defer d.delJobDoneCh(job.ID)
11801205
if err != nil {
11811206
// The transaction of enqueuing job is failed.
11821207
return errors.Trace(err)
@@ -1214,13 +1239,18 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
12141239
recordLastDDLInfo(ctx, historyJob)
12151240
}()
12161241
i := 0
1242+
notifyCh, ok := d.getJobDoneCh(job.ID)
1243+
if !ok {
1244+
// shouldn't happen, just give it a dummy one
1245+
notifyCh = make(chan struct{})
1246+
}
12171247
for {
12181248
failpoint.Inject("storeCloseInLoop", func(_ failpoint.Value) {
12191249
_ = d.Stop()
12201250
})
12211251

12221252
select {
1223-
case <-d.ddlJobDoneCh:
1253+
case <-notifyCh:
12241254
case <-ticker.C:
12251255
i++
12261256
ticker = updateTickerInterval(ticker, 10*d.lease, job, i)

pkg/ddl/ddl_worker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,10 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
493493

494494
jobTasks = append(jobTasks, job)
495495
injectModifyJobArgFailPoint(job)
496+
// only need it for non-local mode.
497+
if !tasks[0].job.LocalMode {
498+
d.initJobDoneCh(job.ID)
499+
}
496500
}
497501

498502
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
@@ -860,7 +864,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
860864
return err
861865
}
862866
CleanupDDLReorgHandles(job, w.sess)
863-
asyncNotify(d.ddlJobDoneCh)
867+
d.notifyJobDone(job.ID)
864868
return nil
865869
}
866870

pkg/ddl/syncer/syncer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ func decodeJobVersionEvent(kv *mvccpb.KeyValue, tp mvccpb.Event_EventType, prefi
553553
if err != nil {
554554
return 0, "", 0, false
555555
}
556-
// there is Value in DELETE event, so we need to check it.
556+
// there is no Value in DELETE event, so we need to check it.
557557
if tp == mvccpb.PUT {
558558
schemaVer, err = strconv.ParseInt(string(kv.Value), 10, 64)
559559
if err != nil {

pkg/ddl/syncer/syncer_nokit_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package syncer
16+
17+
import (
18+
"testing"
19+
20+
"github.com/pingcap/tidb/pkg/ddl/util"
21+
"github.com/stretchr/testify/require"
22+
"go.etcd.io/etcd/api/v3/mvccpb"
23+
)
24+
25+
func TestDecodeJobVersionEvent(t *testing.T) {
26+
prefix := util.DDLAllSchemaVersionsByJob + "/"
27+
_, _, _, valid := decodeJobVersionEvent(&mvccpb.KeyValue{Key: []byte(prefix + "1")}, mvccpb.PUT, prefix)
28+
require.False(t, valid)
29+
_, _, _, valid = decodeJobVersionEvent(&mvccpb.KeyValue{Key: []byte(prefix + "a/aa")}, mvccpb.PUT, prefix)
30+
require.False(t, valid)
31+
_, _, _, valid = decodeJobVersionEvent(&mvccpb.KeyValue{
32+
Key: []byte(prefix + "1/aa"), Value: []byte("aa")}, mvccpb.PUT, prefix)
33+
require.False(t, valid)
34+
jobID, tidbID, schemaVer, valid := decodeJobVersionEvent(&mvccpb.KeyValue{
35+
Key: []byte(prefix + "1/aa"), Value: []byte("123")}, mvccpb.PUT, prefix)
36+
require.True(t, valid)
37+
require.EqualValues(t, 1, jobID)
38+
require.EqualValues(t, "aa", tidbID)
39+
require.EqualValues(t, 123, schemaVer)
40+
// value is not used on delete
41+
jobID, tidbID, schemaVer, valid = decodeJobVersionEvent(&mvccpb.KeyValue{
42+
Key: []byte(prefix + "1/aa"), Value: []byte("aaaa")}, mvccpb.DELETE, prefix)
43+
require.True(t, valid)
44+
require.EqualValues(t, 1, jobID)
45+
require.EqualValues(t, "aa", tidbID)
46+
require.EqualValues(t, 0, schemaVer)
47+
}

0 commit comments

Comments
 (0)