Skip to content

Commit afacd10

Browse files
authored
ddl: support modify the related reorg config by SQL (#57336)
ref #57229
1 parent 851af35 commit afacd10

File tree

10 files changed

+437
-0
lines changed

10 files changed

+437
-0
lines changed

pkg/ddl/db_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"math"
21+
"strconv"
2122
"strings"
2223
"sync"
2324
"testing"
@@ -1124,3 +1125,124 @@ func TestDDLJobErrEntrySizeTooLarge(t *testing.T) {
11241125
tk.MustExec("create table t1 (a int);")
11251126
tk.MustExec("alter table t add column b int;") // Should not block.
11261127
}
1128+
1129+
func insertMockJob2Table(tk *testkit.TestKit, job *model.Job) {
1130+
b, err := job.Encode(false)
1131+
tk.RequireNoError(err)
1132+
sql := fmt.Sprintf("insert into mysql.tidb_ddl_job(job_id, job_meta) values(%s, ?);",
1133+
strconv.FormatInt(job.ID, 10))
1134+
tk.MustExec(sql, b)
1135+
}
1136+
1137+
func getJobMetaByID(t *testing.T, tk *testkit.TestKit, jobID int64) *model.Job {
1138+
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_job where job_id = %s",
1139+
strconv.FormatInt(jobID, 10))
1140+
rows := tk.MustQuery(sql)
1141+
res := rows.Rows()
1142+
require.Len(t, res, 1)
1143+
require.Len(t, res[0], 1)
1144+
jobBinary := []byte(res[0][0].(string))
1145+
job := model.Job{}
1146+
err := job.Decode(jobBinary)
1147+
require.NoError(t, err)
1148+
return &job
1149+
}
1150+
1151+
func deleteJobMetaByID(tk *testkit.TestKit, jobID int64) {
1152+
sql := fmt.Sprintf("delete from mysql.tidb_ddl_job where job_id = %s",
1153+
strconv.FormatInt(jobID, 10))
1154+
tk.MustExec(sql)
1155+
}
1156+
1157+
func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) {
1158+
store := testkit.CreateMockStore(t)
1159+
tk := testkit.NewTestKit(t, store)
1160+
tk.MustExec("use test")
1161+
tk.MustExec("create table t (a int);")
1162+
1163+
job := model.Job{
1164+
ID: 1,
1165+
Type: model.ActionAddIndex,
1166+
ReorgMeta: &model.DDLReorgMeta{
1167+
Concurrency: 4,
1168+
BatchSize: 128,
1169+
},
1170+
}
1171+
insertMockJob2Table(tk, &job)
1172+
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID))
1173+
j := getJobMetaByID(t, tk, job.ID)
1174+
require.Equal(t, j.ReorgMeta.Concurrency, 8)
1175+
1176+
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID))
1177+
j = getJobMetaByID(t, tk, job.ID)
1178+
require.Equal(t, j.ReorgMeta.BatchSize, 256)
1179+
1180+
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID))
1181+
j = getJobMetaByID(t, tk, job.ID)
1182+
require.Equal(t, j.ReorgMeta.Concurrency, 16)
1183+
require.Equal(t, j.ReorgMeta.BatchSize, 512)
1184+
deleteJobMetaByID(tk, job.ID)
1185+
}
1186+
1187+
func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) {
1188+
store := testkit.CreateMockStore(t)
1189+
tk := testkit.NewTestKit(t, store)
1190+
tk.MustExec("use test")
1191+
tk.MustExec("create table t (a int);")
1192+
1193+
// invalid config value
1194+
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]")
1195+
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]")
1196+
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]")
1197+
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]")
1198+
1199+
// invalid job id
1200+
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running")
1201+
1202+
job := model.Job{
1203+
ID: 1,
1204+
Type: model.ActionAddColumn,
1205+
}
1206+
insertMockJob2Table(tk, &job)
1207+
// unsupported job type
1208+
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID),
1209+
"unsupported DDL operation: add column, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job")
1210+
deleteJobMetaByID(tk, 1)
1211+
1212+
job = model.Job{
1213+
ID: 1,
1214+
Type: model.ActionAddIndex,
1215+
ReorgMeta: &model.DDLReorgMeta{
1216+
IsDistReorg: true,
1217+
},
1218+
}
1219+
insertMockJob2Table(tk, &job)
1220+
// unsupported job type
1221+
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID),
1222+
"unsupported DDL operation: add index, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job")
1223+
deleteJobMetaByID(tk, 1)
1224+
}
1225+
1226+
func TestAdminAlterDDLJobCommitFailed(t *testing.T) {
1227+
store := testkit.CreateMockStore(t)
1228+
tk := testkit.NewTestKit(t, store)
1229+
tk.MustExec("use test")
1230+
tk.MustExec("create table t (a int);")
1231+
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed", `return(true)`)
1232+
defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed")
1233+
1234+
job := model.Job{
1235+
ID: 1,
1236+
Type: model.ActionAddIndex,
1237+
ReorgMeta: &model.DDLReorgMeta{
1238+
Concurrency: 4,
1239+
BatchSize: 128,
1240+
},
1241+
}
1242+
insertMockJob2Table(tk, &job)
1243+
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID),
1244+
"mock commit failed on admin alter ddl jobs")
1245+
j := getJobMetaByID(t, tk, job.ID)
1246+
require.Equal(t, j.ReorgMeta, job.ReorgMeta)
1247+
deleteJobMetaByID(tk, job.ID)
1248+
}

pkg/executor/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ go_library(
102102
"//pkg/ddl/label",
103103
"//pkg/ddl/placement",
104104
"//pkg/ddl/schematracker",
105+
"//pkg/ddl/session",
106+
"//pkg/ddl/util",
105107
"//pkg/distsql",
106108
"//pkg/distsql/context",
107109
"//pkg/disttask/framework/handle",

pkg/executor/builder.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ func (b *executorBuilder) build(p base.Plan) exec.Executor {
222222
return b.buildPauseDDLJobs(v)
223223
case *plannercore.ResumeDDLJobs:
224224
return b.buildResumeDDLJobs(v)
225+
case *plannercore.AlterDDLJob:
226+
return b.buildAlterDDLJob(v)
225227
case *plannercore.ShowNextRowID:
226228
return b.buildShowNextRowID(v)
227229
case *plannercore.ShowDDL:
@@ -359,6 +361,15 @@ func (b *executorBuilder) buildResumeDDLJobs(v *plannercore.ResumeDDLJobs) exec.
359361
return e
360362
}
361363

364+
func (b *executorBuilder) buildAlterDDLJob(v *plannercore.AlterDDLJob) exec.Executor {
365+
e := &AlterDDLJobExec{
366+
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
367+
jobID: v.JobID,
368+
AlterOpts: v.Options,
369+
}
370+
return e
371+
}
372+
362373
func (b *executorBuilder) buildShowNextRowID(v *plannercore.ShowNextRowID) exec.Executor {
363374
e := &ShowNextRowIDExec{
364375
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),

pkg/executor/operate_ddl_jobs.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,16 @@ import (
1919
"fmt"
2020
"strconv"
2121

22+
"github.com/pingcap/errors"
23+
"github.com/pingcap/failpoint"
24+
"github.com/pingcap/tidb/pkg/ddl"
25+
sess "github.com/pingcap/tidb/pkg/ddl/session"
26+
"github.com/pingcap/tidb/pkg/ddl/util"
2227
"github.com/pingcap/tidb/pkg/executor/internal/exec"
28+
"github.com/pingcap/tidb/pkg/expression"
2329
"github.com/pingcap/tidb/pkg/kv"
30+
"github.com/pingcap/tidb/pkg/meta/model"
31+
"github.com/pingcap/tidb/pkg/planner/core"
2432
"github.com/pingcap/tidb/pkg/sessionctx"
2533
"github.com/pingcap/tidb/pkg/util/chunk"
2634
)
@@ -85,3 +93,126 @@ type PauseDDLJobsExec struct {
8593
type ResumeDDLJobsExec struct {
8694
*CommandDDLJobsExec
8795
}
96+
97+
// AlterDDLJobExec indicates an Executor for alter config of a DDL Job.
98+
type AlterDDLJobExec struct {
99+
exec.BaseExecutor
100+
jobID int64
101+
AlterOpts []*core.AlterDDLJobOpt
102+
}
103+
104+
// Open implements the Executor Open interface.
105+
func (e *AlterDDLJobExec) Open(ctx context.Context) error {
106+
newSess, err := e.GetSysSession()
107+
if err != nil {
108+
return err
109+
}
110+
defer e.ReleaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), newSess)
111+
112+
return e.processAlterDDLJobConfig(ctx, newSess)
113+
}
114+
115+
func getJobMetaFromTable(
116+
ctx context.Context,
117+
se *sess.Session,
118+
jobID int64,
119+
) (*model.Job, error) {
120+
sql := fmt.Sprintf("select job_meta from mysql.%s where job_id = %s",
121+
ddl.JobTable, strconv.FormatInt(jobID, 10))
122+
rows, err := se.Execute(ctx, sql, "get_job_by_id")
123+
if err != nil {
124+
return nil, errors.Trace(err)
125+
}
126+
if len(rows) == 0 {
127+
return nil, fmt.Errorf("ddl job %d is not running", jobID)
128+
}
129+
jobBinary := rows[0].GetBytes(0)
130+
job := model.Job{}
131+
err = job.Decode(jobBinary)
132+
if err != nil {
133+
return nil, errors.Trace(err)
134+
}
135+
return &job, nil
136+
}
137+
138+
func updateJobMeta2Table(
139+
ctx context.Context,
140+
se *sess.Session,
141+
job *model.Job,
142+
) error {
143+
b, err := job.Encode(false)
144+
if err != nil {
145+
return err
146+
}
147+
sql := fmt.Sprintf("update mysql.%s set job_meta = %s where job_id = %d",
148+
ddl.JobTable, util.WrapKey2String(b), job.ID)
149+
_, err = se.Execute(ctx, sql, "update_job")
150+
return errors.Trace(err)
151+
}
152+
153+
const alterDDLJobMaxRetryCnt = 3
154+
155+
// processAlterDDLJobConfig try to alter the ddl job configs.
156+
// In case of failure, it will retry alterDDLJobMaxRetryCnt times.
157+
func (e *AlterDDLJobExec) processAlterDDLJobConfig(
158+
ctx context.Context,
159+
sessCtx sessionctx.Context,
160+
) (err error) {
161+
ns := sess.NewSession(sessCtx)
162+
var job *model.Job
163+
for tryN := uint(0); tryN < alterDDLJobMaxRetryCnt; tryN++ {
164+
if err = ns.Begin(ctx); err != nil {
165+
continue
166+
}
167+
job, err = getJobMetaFromTable(ctx, ns, e.jobID)
168+
if err != nil {
169+
continue
170+
}
171+
if !job.IsAlterable() {
172+
return fmt.Errorf("unsupported DDL operation: %s, "+
173+
"only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job", job.Type.String())
174+
}
175+
if err = e.updateReorgMeta(job, model.AdminCommandByEndUser); err != nil {
176+
continue
177+
}
178+
if err = updateJobMeta2Table(ctx, ns, job); err != nil {
179+
continue
180+
}
181+
182+
failpoint.Inject("mockAlterDDLJobCommitFailed", func(val failpoint.Value) {
183+
if val.(bool) {
184+
ns.Rollback()
185+
failpoint.Return(errors.New("mock commit failed on admin alter ddl jobs"))
186+
}
187+
})
188+
189+
if err = ns.Commit(ctx); err != nil {
190+
ns.Rollback()
191+
continue
192+
}
193+
return nil
194+
}
195+
return err
196+
}
197+
198+
func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error {
199+
for _, opt := range e.AlterOpts {
200+
switch opt.Name {
201+
case core.AlterDDLJobThread:
202+
if opt.Value != nil {
203+
cons := opt.Value.(*expression.Constant)
204+
job.ReorgMeta.Concurrency = int(cons.Value.GetInt64())
205+
}
206+
job.AdminOperator = byWho
207+
case core.AlterDDLJobBatchSize:
208+
if opt.Value != nil {
209+
cons := opt.Value.(*expression.Constant)
210+
job.ReorgMeta.BatchSize = int(cons.Value.GetInt64())
211+
}
212+
job.AdminOperator = byWho
213+
default:
214+
return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
215+
}
216+
}
217+
return nil
218+
}

pkg/meta/model/job.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,14 @@ func (job *Job) IsPausable() bool {
640640
return job.NotStarted() || (job.IsRunning() && job.IsRollbackable())
641641
}
642642

643+
// IsAlterable checks whether the job type can be altered.
644+
func (job *Job) IsAlterable() bool {
645+
// Currently, only non-distributed add index reorg task can be altered
646+
return job.Type == ActionAddIndex && !job.ReorgMeta.IsDistReorg ||
647+
job.Type == ActionModifyColumn ||
648+
job.Type == ActionReorganizePartition
649+
}
650+
643651
// IsResumable checks whether the job can be rollback.
644652
func (job *Job) IsResumable() bool {
645653
return job.IsPaused()

pkg/planner/core/common_plans.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,35 @@ type ResumeDDLJobs struct {
148148
JobIDs []int64
149149
}
150150

151+
const (
152+
// AlterDDLJobThread alter reorg worker count
153+
AlterDDLJobThread = "thread"
154+
// AlterDDLJobBatchSize alter reorg batch size
155+
AlterDDLJobBatchSize = "batch_size"
156+
// AlterDDLJobMaxWriteSpeed alter reorg max write speed
157+
AlterDDLJobMaxWriteSpeed = "max_write_speed"
158+
)
159+
160+
var allowedAlterDDLJobParams = map[string]struct{}{
161+
AlterDDLJobThread: {},
162+
AlterDDLJobBatchSize: {},
163+
AlterDDLJobMaxWriteSpeed: {},
164+
}
165+
166+
// AlterDDLJobOpt represents alter ddl job option.
167+
type AlterDDLJobOpt struct {
168+
Name string
169+
Value expression.Expression
170+
}
171+
172+
// AlterDDLJob is the plan of admin alter ddl job
173+
type AlterDDLJob struct {
174+
baseSchemaProducer
175+
176+
JobID int64
177+
Options []*AlterDDLJobOpt
178+
}
179+
151180
// ReloadExprPushdownBlacklist reloads the data from expr_pushdown_blacklist table.
152181
type ReloadExprPushdownBlacklist struct {
153182
baseSchemaProducer

0 commit comments

Comments
 (0)