diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 6ecdc4b8c7925..799ad69bcc94b 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -87,7 +87,7 @@ const ( batchAddingJobs = 10 reorgWorkerCnt = 10 - generalWorkerCnt = 1 + generalWorkerCnt = 10 localWorkerCnt = 10 // checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list. diff --git a/pkg/ddl/ddl_api.go b/pkg/ddl/ddl_api.go index d24ff7fa84548..f13ee74fc09d4 100644 --- a/pkg/ddl/ddl_api.go +++ b/pkg/ddl/ddl_api.go @@ -646,6 +646,10 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) (er Args: []any{fkCheck}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, + InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ + Database: old.Name.L, + Table: model.InvolvingAll, + }}, } err = d.DoDDLJob(ctx, job) @@ -2796,6 +2800,21 @@ func (d *ddl) createTableWithInfoJob( return nil, err } + var involvedSchemaInfos []model.InvolvingSchemaInfo + if len(tbInfo.ForeignKeys) > 0 { + involvedSchemaInfos = make([]model.InvolvingSchemaInfo, 0, 1+len(tbInfo.ForeignKeys)) + involvedSchemaInfos = append(involvedSchemaInfos, model.InvolvingSchemaInfo{ + Database: schema.Name.L, + Table: tbInfo.Name.L, + }) + for _, fk := range tbInfo.ForeignKeys { + involvedSchemaInfos = append(involvedSchemaInfos, model.InvolvingSchemaInfo{ + Database: fk.RefSchema.L, + Table: fk.RefTable.L, + }) + } + } + var actionType model.ActionType args := []any{tbInfo} switch { @@ -2810,15 +2829,16 @@ func (d *ddl) createTableWithInfoJob( } job = &model.Job{ - SchemaID: schema.ID, - TableID: tbInfo.ID, - SchemaName: schema.Name.L, - TableName: tbInfo.Name.L, - Type: actionType, - BinlogInfo: &model.HistoryInfo{}, - Args: args, - CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - SQLMode: ctx.GetSessionVars().SQLMode, + SchemaID: schema.ID, + TableID: tbInfo.ID, + SchemaName: schema.Name.L, + TableName: tbInfo.Name.L, + Type: actionType, + BinlogInfo: &model.HistoryInfo{}, + Args: args, + CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, + SQLMode: ctx.GetSessionVars().SQLMode, + InvolvingSchemaInfo: involvedSchemaInfos, } return job, nil } diff --git a/pkg/ddl/ddl_running_jobs.go b/pkg/ddl/ddl_running_jobs.go index 9a984a51dbe14..5eff9e253d89d 100644 --- a/pkg/ddl/ddl_running_jobs.go +++ b/pkg/ddl/ddl_running_jobs.go @@ -126,6 +126,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool { if _, ok := tbls[model.InvolvingAll]; ok { return false } + if info.Table == model.InvolvingAll { + return false + } if info.Table == model.InvolvingNone { continue } diff --git a/pkg/ddl/ddl_running_jobs_test.go b/pkg/ddl/ddl_running_jobs_test.go index d4a2210e042d4..fdae3c8e97ebe 100644 --- a/pkg/ddl/ddl_running_jobs_test.go +++ b/pkg/ddl/ddl_running_jobs_test.go @@ -68,6 +68,7 @@ func TestRunningJobs(t *testing.T) { job2 := mkJob(2, "db2.t3") j.add(job1.ID, job1.GetInvolvingSchemaInfo()) j.add(job2.ID, job2.GetInvolvingSchemaInfo()) + require.False(t, j.checkRunnable(mkJob(0, "db1.*"))) require.Equal(t, "1,2", orderedAllIDs(j.allIDs())) runnable = j.checkRunnable(mkJob(0, "db1.t1")) require.False(t, runnable)