Skip to content

Commit 8f56847

Browse files
authored
ddl: enlarge general ddl worker pool (#54041)
ref #53246
1 parent 7b25992 commit 8f56847

File tree

4 files changed

+34
-10
lines changed

4 files changed

+34
-10
lines changed

pkg/ddl/ddl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ const (
8787
batchAddingJobs = 10
8888

8989
reorgWorkerCnt = 10
90-
generalWorkerCnt = 1
90+
generalWorkerCnt = 10
9191
localWorkerCnt = 10
9292

9393
// checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list.

pkg/ddl/ddl_api.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,10 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) (er
646646
Args: []any{fkCheck},
647647
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
648648
SQLMode: ctx.GetSessionVars().SQLMode,
649+
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
650+
Database: old.Name.L,
651+
Table: model.InvolvingAll,
652+
}},
649653
}
650654

651655
err = d.DoDDLJob(ctx, job)
@@ -2796,6 +2800,21 @@ func (d *ddl) createTableWithInfoJob(
27962800
return nil, err
27972801
}
27982802

2803+
var involvedSchemaInfos []model.InvolvingSchemaInfo
2804+
if len(tbInfo.ForeignKeys) > 0 {
2805+
involvedSchemaInfos = make([]model.InvolvingSchemaInfo, 0, 1+len(tbInfo.ForeignKeys))
2806+
involvedSchemaInfos = append(involvedSchemaInfos, model.InvolvingSchemaInfo{
2807+
Database: schema.Name.L,
2808+
Table: tbInfo.Name.L,
2809+
})
2810+
for _, fk := range tbInfo.ForeignKeys {
2811+
involvedSchemaInfos = append(involvedSchemaInfos, model.InvolvingSchemaInfo{
2812+
Database: fk.RefSchema.L,
2813+
Table: fk.RefTable.L,
2814+
})
2815+
}
2816+
}
2817+
27992818
var actionType model.ActionType
28002819
args := []any{tbInfo}
28012820
switch {
@@ -2810,15 +2829,16 @@ func (d *ddl) createTableWithInfoJob(
28102829
}
28112830

28122831
job = &model.Job{
2813-
SchemaID: schema.ID,
2814-
TableID: tbInfo.ID,
2815-
SchemaName: schema.Name.L,
2816-
TableName: tbInfo.Name.L,
2817-
Type: actionType,
2818-
BinlogInfo: &model.HistoryInfo{},
2819-
Args: args,
2820-
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
2821-
SQLMode: ctx.GetSessionVars().SQLMode,
2832+
SchemaID: schema.ID,
2833+
TableID: tbInfo.ID,
2834+
SchemaName: schema.Name.L,
2835+
TableName: tbInfo.Name.L,
2836+
Type: actionType,
2837+
BinlogInfo: &model.HistoryInfo{},
2838+
Args: args,
2839+
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
2840+
SQLMode: ctx.GetSessionVars().SQLMode,
2841+
InvolvingSchemaInfo: involvedSchemaInfos,
28222842
}
28232843
return job, nil
28242844
}

pkg/ddl/ddl_running_jobs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
126126
if _, ok := tbls[model.InvolvingAll]; ok {
127127
return false
128128
}
129+
if info.Table == model.InvolvingAll {
130+
return false
131+
}
129132
if info.Table == model.InvolvingNone {
130133
continue
131134
}

pkg/ddl/ddl_running_jobs_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func TestRunningJobs(t *testing.T) {
6868
job2 := mkJob(2, "db2.t3")
6969
j.add(job1.ID, job1.GetInvolvingSchemaInfo())
7070
j.add(job2.ID, job2.GetInvolvingSchemaInfo())
71+
require.False(t, j.checkRunnable(mkJob(0, "db1.*")))
7172
require.Equal(t, "1,2", orderedAllIDs(j.allIDs()))
7273
runnable = j.checkRunnable(mkJob(0, "db1.t1"))
7374
require.False(t, runnable)

0 commit comments

Comments
 (0)