@@ -185,51 +185,47 @@ func (s *jobScheduler) close() {
185
185
}
186
186
187
187
// getJob reads tidb_ddl_job and returns the first runnable DDL job.
188
- func (s * jobScheduler ) getJob (se * sess.Session , tp jobType ) (* model.Job , error ) {
188
+ func (s * jobScheduler ) getJob (se * sess.Session ) (* model.Job , bool , error ) {
189
189
defer s .runningJobs .resetAllPending ()
190
190
191
- not := "not"
192
- label := "get_job_general"
193
- if tp == jobTypeReorg {
194
- not = ""
195
- label = "get_job_reorg"
196
- }
197
- const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in
191
+ const getJobSQL = `select job_meta, processing, reorg from mysql.tidb_ddl_job where job_id in
198
192
(select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing)
199
- and %s reorg %s order by processing desc, job_id`
193
+ %s order by processing desc, job_id`
200
194
var excludedJobIDs string
201
195
if ids := s .runningJobs .allIDs (); len (ids ) > 0 {
202
196
excludedJobIDs = fmt .Sprintf ("and job_id not in (%s)" , ids )
203
197
}
204
- sql := fmt .Sprintf (getJobSQL , not , excludedJobIDs )
205
- rows , err := se .Execute (context .Background (), sql , label )
198
+ sql := fmt .Sprintf (getJobSQL , excludedJobIDs )
199
+ rows , err := se .Execute (context .Background (), sql , "get_job" )
206
200
if err != nil {
207
- return nil , errors .Trace (err )
201
+ return nil , false , errors .Trace (err )
208
202
}
209
203
for _ , row := range rows {
210
204
jobBinary := row .GetBytes (0 )
211
205
isJobProcessing := row .GetInt64 (1 ) == 1
206
+ isReorg := row .GetInt64 (2 ) != 0
212
207
213
208
job := model.Job {}
214
209
err = job .Decode (jobBinary )
215
210
if err != nil {
216
- return nil , errors .Trace (err )
211
+ return nil , isReorg , errors .Trace (err )
217
212
}
218
213
214
+ involving := job .GetInvolvingSchemaInfo ()
219
215
isRunnable , err := s .processJobDuringUpgrade (se , & job )
220
216
if err != nil {
221
- return nil , errors .Trace (err )
217
+ return nil , isReorg , errors .Trace (err )
222
218
}
223
219
if ! isRunnable {
220
+ s .runningJobs .addPending (involving )
224
221
continue
225
222
}
226
223
227
224
// The job has already been picked up, just return to continue it.
228
225
if isJobProcessing {
229
- return & job , nil
226
+ return & job , isReorg , nil
230
227
}
231
228
232
- involving := job .GetInvolvingSchemaInfo ()
233
229
if ! s .runningJobs .checkRunnable (job .ID , involving ) {
234
230
s .runningJobs .addPending (involving )
235
231
continue
@@ -241,11 +237,11 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error)
241
237
zap .Error (err ),
242
238
zap .Stringer ("job" , & job ))
243
239
s .runningJobs .addPending (involving )
244
- return nil , errors .Trace (err )
240
+ return nil , isReorg , errors .Trace (err )
245
241
}
246
- return & job , nil
242
+ return & job , isReorg , nil
247
243
}
248
- return nil , nil
244
+ return nil , false , nil
249
245
}
250
246
251
247
func hasSysDB (job * model.Job ) bool {
@@ -394,8 +390,7 @@ func (s *jobScheduler) startDispatch() error {
394
390
continue
395
391
}
396
392
failpoint .InjectCall ("beforeAllLoadDDLJobAndRun" )
397
- s .loadDDLJobAndRun (se , s .generalDDLWorkerPool , jobTypeGeneral )
398
- s .loadDDLJobAndRun (se , s .reorgWorkerPool , jobTypeReorg )
393
+ s .loadDDLJobAndRun (se )
399
394
}
400
395
}
401
396
@@ -436,30 +431,32 @@ func (s *jobScheduler) checkAndUpdateClusterState(needUpdate bool) error {
436
431
return nil
437
432
}
438
433
439
- func (s * jobScheduler ) loadDDLJobAndRun (se * sess.Session , pool * workerPool , tp jobType ) {
440
- wk , err := pool .get ()
441
- if err != nil || wk == nil {
442
- logutil .DDLLogger ().Debug (fmt .Sprintf ("[ddl] no %v worker available now" , pool .tp ()), zap .Error (err ))
443
- return
444
- }
445
-
434
+ func (s * jobScheduler ) loadDDLJobAndRun (se * sess.Session ) {
446
435
s .mu .RLock ()
447
- s .mu .hook .OnGetJobBefore (pool . tp (). String () )
436
+ s .mu .hook .OnGetJobBefore ()
448
437
s .mu .RUnlock ()
449
438
450
439
startTime := time .Now ()
451
- job , err := s .getJob (se , tp )
440
+ job , isReorg , err := s .getJob (se )
452
441
if job == nil || err != nil {
453
442
if err != nil {
454
- wk . jobLogger ( job ).Warn ("get job met error" , zap .Duration ("take time" , time .Since (startTime )), zap .Error (err ))
443
+ logutil . DDLLogger ( ).Warn ("get job met error" , zap .Duration ("take time" , time .Since (startTime )), zap .Error (err ))
455
444
}
456
- pool .put (wk )
457
445
return
458
446
}
459
447
s .mu .RLock ()
460
- s .mu .hook .OnGetJobAfter (pool . tp (). String (), job )
448
+ s .mu .hook .OnGetJobAfter (job )
461
449
s .mu .RUnlock ()
462
450
451
+ pool := s .generalDDLWorkerPool
452
+ if isReorg {
453
+ pool = s .reorgWorkerPool
454
+ }
455
+ wk , err := pool .get ()
456
+ if err != nil || wk == nil {
457
+ logutil .DDLLogger ().Debug (fmt .Sprintf ("[ddl] no %v worker available now" , pool .tp ()), zap .Error (err ))
458
+ return
459
+ }
463
460
s .delivery2Worker (wk , pool , job )
464
461
}
465
462
@@ -526,10 +523,18 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.
526
523
jobID , involvedSchemaInfos := job .ID , job .GetInvolvingSchemaInfo ()
527
524
s .runningJobs .addRunning (jobID , involvedSchemaInfos )
528
525
metrics .DDLRunningJobCount .WithLabelValues (pool .tp ().String ()).Inc ()
529
- s .wg .RunWithLog (func () {
526
+ s .wg .Run (func () {
530
527
defer func () {
528
+ r := recover ()
529
+ if r != nil {
530
+ logutil .DDLLogger ().Error ("panic in delivery2Worker" , zap .Any ("recover" , r ), zap .Stack ("stack" ))
531
+ }
531
532
failpoint .InjectCall ("afterDelivery2Worker" , job )
532
- s .runningJobs .removeRunning (jobID , involvedSchemaInfos )
533
+ // Because there is a gap between `allIDs()` and `checkRunnable()`,
534
+ // we append unfinished job to pending atomically to prevent `getJob()`
535
+ // chosing another runnable job that involves the same schema object.
536
+ moveRunningJobsToPending := r != nil || (job != nil && ! job .IsFinished ())
537
+ s .runningJobs .finishOrPendJob (jobID , involvedSchemaInfos , moveRunningJobsToPending )
533
538
asyncNotify (s .ddlJobNotifyCh )
534
539
metrics .DDLRunningJobCount .WithLabelValues (pool .tp ().String ()).Dec ()
535
540
pool .put (wk )
0 commit comments