Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
id = int(backfillContextID.Add(1))
}

batchCnt := rInfo.ReorgMeta.GetBatchSize()
batchCnt := rInfo.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))
return &backfillCtx{
id: id,
ddlCtx: rInfo.jobCtx.oldDDLCtx,
Expand Down Expand Up @@ -432,7 +432,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {

// Change the batch size dynamically.
currentBatchCnt := w.GetCtx().batchCnt
targetBatchSize := job.ReorgMeta.GetBatchSize()
targetBatchSize := job.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))
if targetBatchSize != currentBatchCnt {
w.GetCtx().batchCnt = targetBatchSize
logger.Info("adjust ddl job config success",
Expand Down Expand Up @@ -707,7 +707,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(

//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrency()
importConc := job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
if err != nil {
Expand Down Expand Up @@ -776,7 +776,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, reorgInfo.Job, avgRowSize)
err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
Expand Down Expand Up @@ -817,7 +817,8 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize)
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(
job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
Expand Down Expand Up @@ -1036,7 +1037,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
return
case <-ticker.C:
currentWorkerCnt := scheduler.currentWorkerSize()
targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency()
targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))
if currentWorkerCnt != targetWorkerCnt {
err := scheduler.adjustWorkerSize()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
Expand Down Expand Up @@ -152,7 +153,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
ddlObj.etcdCli,
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrency(),
job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())),
job.RealStartTS,
)
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -180,7 +181,8 @@ func NewAddIndexIngestPipeline(
}

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.GetBatchSize(), rm)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr,
reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), rm)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener)
Expand Down Expand Up @@ -245,7 +247,8 @@ func NewWriteIndexToExternalStoragePipeline(
})

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.GetBatchSize(), nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil,
reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), nil)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID,
tbl, indexes, extStore, srcChkPool, writerCnt,
Expand Down Expand Up @@ -274,7 +277,7 @@ func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *syn
return &sync.Pool{
New: func() any {
return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes,
reorgMeta.GetBatchSize())
reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())))
},
}
}
Expand Down Expand Up @@ -598,7 +601,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
func (w *tableScanWorker) getChunk() *chunk.Chunk {
targetCap := ingest.CopReadBatchSize(w.hintBatchSize)
if w.reorgMeta != nil {
targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSize())
targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())))
}
chk := w.srcChkPool.Get().(*chunk.Chunk)
if chk.Capacity() != targetCap {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses
if err != nil {
return nil, err
}
workerCnt := info.ReorgMeta.GetConcurrency()
workerCnt := info.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))
return &txnBackfillScheduler{
ctx: ctx,
reorgInfo: info,
Expand Down Expand Up @@ -247,7 +247,7 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context)
}

func (b *txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency()
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))
return min(workerCnt, maxBackfillWorkerSize)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,16 +1171,16 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) {
insertMockJob2Table(tk, &job)
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID))
j := getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.GetConcurrency(), 8)
require.Equal(t, j.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), 8)

tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID))
j = getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.GetBatchSize(), 256)
require.Equal(t, j.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), 256)

tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID))
j = getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.GetConcurrency(), 16)
require.Equal(t, j.ReorgMeta.GetBatchSize(), 512)
require.Equal(t, j.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), 16)
require.Equal(t, j.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), 512)
deleteJobMetaByID(tk, job.ID)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4971,8 +4971,8 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
zap.String("targetScope", reorgMeta.TargetScope),
zap.Int("concurrency", reorgMeta.GetConcurrency()),
zap.Int("batchSize", reorgMeta.GetBatchSize()),
zap.Int("concurrency", reorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))),
zap.Int("batchSize", reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))),
)
return reorgMeta, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2511,7 +2511,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
})
} else {
job := reorgInfo.Job
workerCntLimit := job.ReorgMeta.GetConcurrency()
workerCntLimit := job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))
cpuCount, err := handle.GetCPUCountOfNode(ctx)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,8 @@ func (w *worker) runOneJobStep(
return
case model.JobStateRunning:
if latestJob.IsAlterable() {
job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrency())
job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSize())
job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())))
job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())))
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/meta/model/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ type DDLReorgMeta struct {
BatchSize int64 `json:"batch_size"`
}

// GetConcurrency gets the concurrency from DDLReorgMeta.
func (dm *DDLReorgMeta) GetConcurrency() int {
// GetConcurrency gets the concurrency from DDLReorgMeta,
// pass the default value in case of the reorg meta coming from old cluster and Concurrency is 0.
func (dm *DDLReorgMeta) GetConcurrency(defaultVal int) int {
if dm == nil || atomic.LoadInt64(&dm.Concurrency) == 0 {
return defaultVal
}
return int(atomic.LoadInt64(&dm.Concurrency))
}

Expand All @@ -93,7 +97,10 @@ func (dm *DDLReorgMeta) SetConcurrency(concurrency int) {
}

// GetBatchSize gets the batch size from DDLReorgMeta.
func (dm *DDLReorgMeta) GetBatchSize() int {
func (dm *DDLReorgMeta) GetBatchSize(defaultVal int) int {
if dm == nil || atomic.LoadInt64(&dm.BatchSize) == 0 {
return defaultVal
}
return int(atomic.LoadInt64(&dm.BatchSize))
}

Expand Down