Skip to content

Commit 976e4f2

Browse files
tangentati-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#50076
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 19843ce commit 976e4f2

File tree

3 files changed

+69
-36
lines changed

3 files changed

+69
-36
lines changed

ddl/ddl.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,16 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
769769
d.wg.Run(d.PollTiFlashRoutine)
770770

771771
ingest.InitGlobalLightningEnv()
772+
<<<<<<< HEAD:ddl/ddl.go
773+
=======
774+
d.ownerManager.SetRetireOwnerHook(func() {
775+
// Since this instance is not DDL owner anymore, we clean up the processing job info.
776+
if ingest.LitBackCtxMgr != nil {
777+
ingest.LitBackCtxMgr.MarkJobFinish()
778+
}
779+
d.runningJobs = newRunningJobs()
780+
})
781+
>>>>>>> bc841979a53 (ddl: fix unstable test TestCreateDropCreateTable (#50076)):pkg/ddl/ddl.go
772782

773783
return nil
774784
}

ddl/ddl_running_jobs.go

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,57 +28,96 @@ import (
2828

2929
type runningJobs struct {
3030
sync.RWMutex
31-
ids map[int64]struct{}
32-
runningSchema map[string]map[string]struct{} // database -> table -> struct{}
33-
runningJobIDs string
31+
// processingIDs records the IDs of the jobs that are being processed by a worker.
32+
processingIDs map[int64]struct{}
33+
processingIDsStr string
34+
35+
// unfinishedIDs records the IDs of the jobs that are not finished yet.
36+
// It is not necessarily being processed by a worker.
37+
unfinishedIDs map[int64]struct{}
38+
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}
3439
}
3540

3641
func newRunningJobs() *runningJobs {
3742
return &runningJobs{
38-
ids: make(map[int64]struct{}),
39-
runningSchema: make(map[string]map[string]struct{}),
43+
processingIDs: make(map[int64]struct{}),
44+
unfinishedSchema: make(map[string]map[string]struct{}),
45+
unfinishedIDs: make(map[int64]struct{}),
4046
}
4147
}
4248

4349
func (j *runningJobs) add(job *model.Job) {
4450
j.Lock()
4551
defer j.Unlock()
46-
j.ids[job.ID] = struct{}{}
52+
j.processingIDs[job.ID] = struct{}{}
4753
j.updateInternalRunningJobIDs()
54+
55+
if _, ok := j.unfinishedIDs[job.ID]; ok {
56+
// Already exists, no need to add it again.
57+
return
58+
}
59+
j.unfinishedIDs[job.ID] = struct{}{}
4860
for _, info := range job.GetInvolvingSchemaInfo() {
49-
if _, ok := j.runningSchema[info.Database]; !ok {
50-
j.runningSchema[info.Database] = make(map[string]struct{})
61+
if _, ok := j.unfinishedSchema[info.Database]; !ok {
62+
j.unfinishedSchema[info.Database] = make(map[string]struct{})
5163
}
52-
j.runningSchema[info.Database][info.Table] = struct{}{}
64+
j.unfinishedSchema[info.Database][info.Table] = struct{}{}
5365
}
5466
}
5567

5668
func (j *runningJobs) remove(job *model.Job) {
5769
j.Lock()
5870
defer j.Unlock()
59-
delete(j.ids, job.ID)
71+
delete(j.processingIDs, job.ID)
6072
j.updateInternalRunningJobIDs()
61-
for _, info := range job.GetInvolvingSchemaInfo() {
62-
if db, ok := j.runningSchema[info.Database]; ok {
63-
delete(db, info.Table)
73+
74+
if job.IsFinished() || job.IsSynced() {
75+
delete(j.unfinishedIDs, job.ID)
76+
for _, info := range job.GetInvolvingSchemaInfo() {
77+
if db, ok := j.unfinishedSchema[info.Database]; ok {
78+
delete(db, info.Table)
79+
}
80+
if len(j.unfinishedSchema[info.Database]) == 0 {
81+
delete(j.unfinishedSchema, info.Database)
82+
}
6483
}
65-
if len(j.runningSchema[info.Database]) == 0 {
66-
delete(j.runningSchema, info.Database)
84+
}
85+
}
86+
87+
func (j *runningJobs) allIDs() string {
88+
j.RLock()
89+
defer j.RUnlock()
90+
return j.processingIDsStr
91+
}
92+
93+
func (j *runningJobs) updateInternalRunningJobIDs() {
94+
var sb strings.Builder
95+
i := 0
96+
for id := range j.processingIDs {
97+
sb.WriteString(strconv.Itoa(int(id)))
98+
if i != len(j.processingIDs)-1 {
99+
sb.WriteString(",")
67100
}
101+
i++
68102
}
103+
j.processingIDsStr = sb.String()
69104
}
70105

71106
func (j *runningJobs) checkRunnable(job *model.Job) bool {
72107
j.RLock()
73108
defer j.RUnlock()
109+
if _, ok := j.processingIDs[job.ID]; ok {
110+
// Already processing by a worker. Skip running it again.
111+
return false
112+
}
74113
for _, info := range job.GetInvolvingSchemaInfo() {
75-
if _, ok := j.runningSchema[model.InvolvingAll]; ok {
114+
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
76115
return false
77116
}
78117
if info.Database == model.InvolvingNone {
79118
continue
80119
}
81-
if tbls, ok := j.runningSchema[info.Database]; ok {
120+
if tbls, ok := j.unfinishedSchema[info.Database]; ok {
82121
if _, ok := tbls[model.InvolvingAll]; ok {
83122
return false
84123
}
@@ -92,22 +131,3 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
92131
}
93132
return true
94133
}
95-
96-
func (j *runningJobs) allIDs() string {
97-
j.RLock()
98-
defer j.RUnlock()
99-
return j.runningJobIDs
100-
}
101-
102-
func (j *runningJobs) updateInternalRunningJobIDs() {
103-
var sb strings.Builder
104-
i := 0
105-
for id := range j.ids {
106-
sb.WriteString(strconv.Itoa(int(id)))
107-
if i != len(j.ids)-1 {
108-
sb.WriteString(",")
109-
}
110-
i++
111-
}
112-
j.runningJobIDs = sb.String()
113-
}

ddl/ddl_running_jobs_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,19 @@ func TestRunningJobs(t *testing.T) {
9595
runnable = j.checkRunnable(mkJob(0, "db100.t100"))
9696
require.False(t, runnable)
9797

98+
job5.State = model.JobStateDone
9899
j.remove(job5)
99100
require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs()))
100101
runnable = j.checkRunnable(mkJob(0, "db100.t100"))
101102
require.True(t, runnable)
102103

104+
job3.State = model.JobStateDone
103105
j.remove(job3)
104106
require.Equal(t, "1,2,4", orderedAllIDs(j.allIDs()))
105107
runnable = j.checkRunnable(mkJob(0, "db1.t100"))
106108
require.True(t, runnable)
107109

110+
job1.State = model.JobStateDone
108111
j.remove(job1)
109112
require.Equal(t, "2,4", orderedAllIDs(j.allIDs()))
110113
runnable = j.checkRunnable(mkJob(0, "db1.t1"))

0 commit comments

Comments
 (0)