Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 1 addition & 99 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package ddl

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"runtime"
"slices"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -658,102 +656,6 @@ func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*mod
return dbInfo, tbl, err
}

const (
addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values"
updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d"
)

func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error {
failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr"))
}
})
if len(jobWs) == 0 {
return nil
}
var sql bytes.Buffer
sql.WriteString(addDDLJobSQL)
for i, jobW := range jobWs {
// TODO remove this check when all job type pass args in this way.
if jobW.JobArgs != nil {
jobW.FillArgs(jobW.JobArgs)
}
injectModifyJobArgFailPoint(jobWs)
b, err := jobW.Encode(true)
if err != nil {
return err
}
if i != 0 {
sql.WriteString(",")
}
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(),
strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)),
util.WrapKey2String(b), jobW.Type, jobW.Started())
}
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(ctx, sql.String(), "insert_job")
logutil.DDLLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("sql", sql.String()))
return errors.Trace(err)
}

func makeStringForIDs(ids []int64) string {
set := make(map[int64]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}

s := make([]string, 0, len(set))
for id := range set {
s = append(s, strconv.FormatInt(id, 10))
}
slices.Sort(s)
return strings.Join(s, ",")
}

func job2SchemaIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos)*2)
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.OldSchemaID, info.NewSchemaID)
}
return makeStringForIDs(ids)
case model.ActionRenameTable:
oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID
ids := []int64{oldSchemaID, jobW.SchemaID}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.SchemaID, args.PTSchemaID})
default:
return strconv.FormatInt(jobW.SchemaID, 10)
}
}

func job2TableIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos))
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.TableID)
}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.TableID, args.PTTableID})
case model.ActionTruncateTable:
newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID
return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
default:
return strconv.FormatInt(jobW.TableID, 10)
}
}

func updateDDLJob2Table(
ctx context.Context,
se *sess.Session,
Expand All @@ -764,7 +666,7 @@ func updateDDLJob2Table(
if err != nil {
return err
}
sql := fmt.Sprintf(updateDDLJobSQL, util.WrapKey2String(b), job.ID)
sql := fmt.Sprintf("update mysql.tidb_ddl_job set job_meta = %s where job_id = %d", util.WrapKey2String(b), job.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think original updataDDLJobSQL variable would be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason I inline it is nearly all other place use string literal directly, and it's only used once

I do agree to define it as const if it's or will be used multiple times, but seems we won't

_, err = se.Execute(ctx, sql, "update_job")
return errors.Trace(err)
}
Expand Down
94 changes: 94 additions & 0 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package ddl

import (
"bytes"
"context"
"fmt"
"math"
"slices"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -677,6 +680,97 @@ func lockGlobalIDKey(ctx context.Context, ddlSe *sess.Session, txn kv.Transactio
return forUpdateTs, err
}

func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error {
failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr"))
}
})
if len(jobWs) == 0 {
return nil
}
var sql bytes.Buffer
sql.WriteString("insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values")
for i, jobW := range jobWs {
// TODO remove this check when all job type pass args in this way.
if jobW.JobArgs != nil {
jobW.FillArgs(jobW.JobArgs)
}
injectModifyJobArgFailPoint(jobWs)
b, err := jobW.Encode(true)
if err != nil {
return err
}
if i != 0 {
sql.WriteString(",")
}
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(),
strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)),
ddlutil.WrapKey2String(b), jobW.Type, jobW.Started())
}
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(ctx, sql.String(), "insert_job")
logutil.DDLLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("sql", sql.String()))
return errors.Trace(err)
}

func makeStringForIDs(ids []int64) string {
set := make(map[int64]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}

s := make([]string, 0, len(set))
for id := range set {
s = append(s, strconv.FormatInt(id, 10))
}
slices.Sort(s)
return strings.Join(s, ",")
}

func job2SchemaIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos)*2)
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.OldSchemaID, info.NewSchemaID)
}
return makeStringForIDs(ids)
case model.ActionRenameTable:
oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID
ids := []int64{oldSchemaID, jobW.SchemaID}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.SchemaID, args.PTSchemaID})
default:
return strconv.FormatInt(jobW.SchemaID, 10)
}
}

func job2TableIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos))
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.TableID)
}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.TableID, args.PTTableID})
case model.ActionTruncateTable:
newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID
return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
default:
return strconv.FormatInt(jobW.TableID, 10)
}
}

// TODO this failpoint is only checking how job scheduler handle
// corrupted job args, we should test it there by UT, not here.
func injectModifyJobArgFailPoint(jobWs []*JobWrapper) {
Expand Down