From e4f319a017f9d7fb59213e3d61c8996ea8745d9f Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Sun, 17 Nov 2024 23:27:06 +0800 Subject: [PATCH 01/22] txn-mode worker cnt and batch size --- pkg/ddl/backfilling.go | 33 +++++++++++++++++++++++++++++++++ pkg/ddl/job_worker.go | 5 +++++ 2 files changed, 38 insertions(+) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index bcb80bea944de..284b2996eb5e7 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -432,7 +432,13 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // Change the batch size dynamically. newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + oldBatchCnt := w.GetCtx().batchCnt w.GetCtx().batchCnt = newBatchCnt + if w.GetCtx().batchCnt != oldBatchCnt { + logger.Info("adjust backfill batch size success", + zap.Int("current batch size", w.GetCtx().batchCnt), + zap.Int64("job ID", job.ID)) + } result := w.handleBackfillTask(d, task, bf) w.sendResult(result) @@ -978,6 +984,33 @@ func (dc *ddlCtx) writePhysicalTableRecord( return nil }) + // update the worker cnt goroutine + go func() { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + currWorkerCnt := scheduler.currentWorkerSize() + newWorkerCnt := reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + if currWorkerCnt != newWorkerCnt { + err := scheduler.adjustWorkerSize() + if err != nil { + logutil.DDLLogger().Warn("cannot adjust backfill worker count", + zap.Int64("job ID", reorgInfo.ID), + zap.Error(err)) + } else { + logutil.DDLLogger().Info("adjust backfill worker count success", + zap.Int("current worker count", scheduler.currentWorkerSize()), + zap.Int64("job ID", reorgInfo.ID)) + } + } + } + } + }() + return eg.Wait() } diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 219dd24781b5e..f1bc56a718ee0 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -863,6 +863,11 @@ func (w *worker) runOneJobStep( return case model.JobStateDone, model.JobStateSynced: return + case model.JobStateRunning: + if latestJob.IsAlterable() { + job.ReorgMeta.Concurrency = latestJob.ReorgMeta.Concurrency + job.ReorgMeta.BatchSize = latestJob.ReorgMeta.BatchSize + } } } } From 835e5806c2a1ae2f8a6346429335aae8a69afb4d Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 18 Nov 2024 17:04:31 +0800 Subject: [PATCH 02/22] ingest worker cnt and batch size --- pkg/ddl/backfilling.go | 63 +++++++++++++------ pkg/ddl/backfilling_operators.go | 22 +++++-- pkg/ddl/export_test.go | 2 +- pkg/disttask/operator/operator.go | 5 ++ pkg/disttask/operator/pipeline.go | 8 +++ .../addindextest3/operator_test.go | 2 +- 6 files changed, 77 insertions(+), 25 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 284b2996eb5e7..0368c4fb2e3b7 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -431,10 +431,10 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { }) // Change the batch size dynamically. - newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) - oldBatchCnt := w.GetCtx().batchCnt - w.GetCtx().batchCnt = newBatchCnt - if w.GetCtx().batchCnt != oldBatchCnt { + currentBatchCnt := w.GetCtx().batchCnt + targetBatchSize := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + if targetBatchSize != currentBatchCnt { + w.GetCtx().batchCnt = targetBatchSize logger.Info("adjust backfill batch size success", zap.Int("current batch size", w.GetCtx().batchCnt), zap.Int64("job ID", job.ID)) @@ -776,6 +776,38 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( if err != nil { return err } + + // Adjust worker pool size dynamically. + go func() { + opR, opW := pipe.GetLocalIngestModeReaderAndWriter() + reader, ok := opR.(*TableScanOperator) + if !ok { + logutil.DDLIngestLogger().Error("unexpected operator type", zap.Int64("jobID", job.ID), zap.Error(err)) + } + writer, ok := opW.(*IndexIngestOperator) + if !ok { + logutil.DDLIngestLogger().Error("unexpected operator type", zap.Int64("jobID", job.ID), zap.Error(err)) + } + ticker := time.NewTicker(UpdateReorgCfgInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + newReaderCnt, newWriterCnt := expectedIngestWorkerCnt( + job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), + avgRowSize) + reader.TuneWorkerPoolSize(int32(newReaderCnt)) + writer.TuneWorkerPoolSize(int32(newWriterCnt)) + logutil.DDLIngestLogger().Info("adjust backfill worker count", + zap.Int64("jobID", job.ID), + zap.Int("table scan operator count", newReaderCnt), + zap.Int("index ingest operator count", newWriterCnt)) + } + } + }() + err = executeAndClosePipeline(opCtx, pipe) if err != nil { err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines) @@ -831,6 +863,9 @@ func (s *localRowCntListener) SetTotal(total int) { s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total)) } +// UpdateReorgCfgInterval is the interval to check and update reorg configuration. +const UpdateReorgCfgInterval = 2 * time.Second + // writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition. // For a partitioned table, it should be handled partition by partition. // @@ -935,14 +970,6 @@ func (dc *ddlCtx) writePhysicalTableRecord( zap.Int64("job ID", reorgInfo.ID), zap.Error(err2)) } - // We try to adjust the worker size regularly to reduce - // the overhead of loading the DDL related global variables. - err2 = scheduler.adjustWorkerSize() - if err2 != nil { - logutil.DDLLogger().Warn("cannot adjust backfill worker size", - zap.Int64("job ID", reorgInfo.ID), - zap.Error(err2)) - } failpoint.InjectCall("afterUpdateReorgMeta") } } @@ -986,16 +1013,16 @@ func (dc *ddlCtx) writePhysicalTableRecord( // update the worker cnt goroutine go func() { - t := time.NewTicker(1 * time.Second) - defer t.Stop() + ticker := time.NewTicker(UpdateReorgCfgInterval) + defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-t.C: - currWorkerCnt := scheduler.currentWorkerSize() - newWorkerCnt := reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) - if currWorkerCnt != newWorkerCnt { + case <-ticker.C: + currentWorkerCnt := scheduler.currentWorkerSize() + targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + if currentWorkerCnt != targetWorkerCnt { err := scheduler.adjustWorkerSize() if err != nil { logutil.DDLLogger().Warn("cannot adjust backfill worker count", diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 7cf087aad1058..8ab87d8aaaf26 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/pkg/resourcemanager/util" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -172,9 +173,13 @@ func NewAddIndexIngestPipeline( } srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize) readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) + rm := reorgMeta + if rm.IsDistReorg { + rm = nil + } srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize, rm) ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener) @@ -239,7 +244,7 @@ func NewWriteIndexToExternalStoragePipeline( }) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize, nil) writeOp := NewWriteExternalStoreOperator( ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, @@ -482,6 +487,7 @@ func NewTableScanOperator( concurrency int, cpMgr *ingest.CheckpointManager, hintBatchSize int, + reorgMeta *model.DDLReorgMeta, ) *TableScanOperator { totalCount := new(atomic.Int64) pool := workerpool.NewWorkerPool( @@ -498,6 +504,7 @@ func NewTableScanOperator( cpMgr: cpMgr, hintBatchSize: hintBatchSize, totalCount: totalCount, + reorgMeta: reorgMeta, } }) return &TableScanOperator{ @@ -521,6 +528,7 @@ type tableScanWorker struct { srcChkPool chan *chunk.Chunk cpMgr *ingest.CheckpointManager + reorgMeta *model.DDLReorgMeta hintBatchSize int totalCount *atomic.Int64 } @@ -588,10 +596,14 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor } func (w *tableScanWorker) getChunk() *chunk.Chunk { + targetCap := ingest.CopReadBatchSize(w.hintBatchSize) + if w.reorgMeta != nil { + targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) + } chk := <-w.srcChkPool - newCap := ingest.CopReadBatchSize(w.hintBatchSize) - if chk.Capacity() != newCap { - chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap) + if chk.Capacity() != targetCap { + chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, targetCap) + logutil.Logger(w.ctx).Info("adjust backfill batch size success", zap.Int("current batch size", targetCap)) } chk.Reset() return chk diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index e815a3c6d1b9a..8f651549cb264 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -48,7 +48,7 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, opCtx, cancel := ddl.NewLocalOperatorCtx(context.Background(), 1) defer cancel() src := testutil.NewOperatorTestSource(ddl.TableScanTask{ID: 1, Start: startKey, End: endKey}) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0, nil) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp) diff --git a/pkg/disttask/operator/operator.go b/pkg/disttask/operator/operator.go index c17c33fcbf249..d6c690d650db6 100644 --- a/pkg/disttask/operator/operator.go +++ b/pkg/disttask/operator/operator.go @@ -93,6 +93,11 @@ func (c *AsyncOperator[T, R]) SetSink(ch DataChannel[R]) { c.pool.SetResultSender(ch.Channel()) } +// TuneWorkerPoolSize tunes the worker pool size. +func (c *AsyncOperator[T, R]) TuneWorkerPoolSize(workerNum int32) { + c.pool.Tune(workerNum) +} + type asyncWorker[T, R any] struct { transform func(T) R } diff --git a/pkg/disttask/operator/pipeline.go b/pkg/disttask/operator/pipeline.go index a920eddb781b6..7ded88b19fff1 100644 --- a/pkg/disttask/operator/pipeline.go +++ b/pkg/disttask/operator/pipeline.go @@ -65,3 +65,11 @@ func (p *AsyncPipeline) String() string { } return "AsyncPipeline[" + strings.Join(opStrs, " -> ") + "]" } + +// GetLocalIngestModeReaderAndWriter returns the reader and writer in the local ingest mode. +func (p *AsyncPipeline) GetLocalIngestModeReaderAndWriter() (operator1, operator2 Operator) { + if len(p.ops) != 4 { + return nil, nil + } + return p.ops[1], p.ops[2] +} diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 5fac1ec6063ae..e0784ded8982b 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -96,7 +96,7 @@ func TestBackfillOperators(t *testing.T) { ctx := context.Background() opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) src := testutil.NewOperatorTestSource(opTasks...) - scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil, 0) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 3, nil, 0, nil) sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]() operator.Compose[ddl.TableScanTask](src, scanOp) From 6fe6e67550fc9a266187be323c2aa20a3f329a30 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 18 Nov 2024 17:54:45 +0800 Subject: [PATCH 03/22] use sync.Pool replace buffered channel chunk --- pkg/ddl/backfilling_operators.go | 36 ++++++++++++++++---------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 8ab87d8aaaf26..e63bd071b234d 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -20,6 +20,7 @@ import ( "fmt" "path" "strconv" + "sync" "sync/atomic" "time" @@ -171,7 +172,7 @@ func NewAddIndexIngestPipeline( if err != nil { return nil, err } - srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize) + srcChkPool := createChunkPool(copCtx, reorgMeta) readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) rm := reorgMeta if rm.IsDistReorg { @@ -226,7 +227,7 @@ func NewWriteIndexToExternalStoragePipeline( if err != nil { return nil, err } - srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize) + srcChkPool := createChunkPool(copCtx, reorgMeta) readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) backend, err := storage.ParseBackend(extStoreURI, nil) @@ -269,14 +270,13 @@ func NewWriteIndexToExternalStoragePipeline( ), nil } -func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk { - poolSize := ingest.CopReadChunkPoolSize(hintConc) - batchSize := ingest.CopReadBatchSize(hintBatchSize) - srcChkPool := make(chan *chunk.Chunk, poolSize) - for i := 0; i < poolSize; i++ { - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize) +func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *sync.Pool { + return &sync.Pool{ + New: func() interface{} { + return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, + reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) + }, } - return srcChkPool } // TableScanTask contains the start key and the end key of a region. @@ -483,7 +483,7 @@ func NewTableScanOperator( ctx *OperatorCtx, sessPool opSessPool, copCtx copr.CopContext, - srcChkPool chan *chunk.Chunk, + srcChkPool *sync.Pool, concurrency int, cpMgr *ingest.CheckpointManager, hintBatchSize int, @@ -525,7 +525,7 @@ type tableScanWorker struct { copCtx copr.CopContext sessPool opSessPool se *session.Session - srcChkPool chan *chunk.Chunk + srcChkPool *sync.Pool cpMgr *ingest.CheckpointManager reorgMeta *model.DDLReorgMeta @@ -600,7 +600,7 @@ func (w *tableScanWorker) getChunk() *chunk.Chunk { if w.reorgMeta != nil { targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) } - chk := <-w.srcChkPool + chk := w.srcChkPool.Get().(*chunk.Chunk) if chk.Capacity() != targetCap { chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, targetCap) logutil.Logger(w.ctx).Info("adjust backfill batch size success", zap.Int("current batch size", targetCap)) @@ -610,7 +610,7 @@ func (w *tableScanWorker) getChunk() *chunk.Chunk { } func (w *tableScanWorker) recycleChunk(chk *chunk.Chunk) { - w.srcChkPool <- chk + w.srcChkPool.Put(chk) } // WriteExternalStoreOperator writes index records to external storage. @@ -630,7 +630,7 @@ func NewWriteExternalStoreOperator( tbl table.PhysicalTable, indexes []table.Index, store storage.ExternalStorage, - srcChunkPool chan *chunk.Chunk, + srcChunkPool *sync.Pool, concurrency int, onClose external.OnCloseFunc, memoryQuota uint64, @@ -716,7 +716,7 @@ func NewIndexIngestOperator( tbl table.PhysicalTable, indexes []table.Index, engines []ingest.Engine, - srcChunkPool chan *chunk.Chunk, + srcChunkPool *sync.Pool, concurrency int, reorgMeta *model.DDLReorgMeta, cpMgr *ingest.CheckpointManager, @@ -777,7 +777,7 @@ type indexIngestExternalWorker struct { func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) { defer func() { if ck.Chunk != nil { - w.srcChunkPool <- ck.Chunk + w.srcChunkPool.Put(ck.Chunk) } }() rs, err := w.indexIngestBaseWorker.HandleTask(ck) @@ -799,7 +799,7 @@ type indexIngestLocalWorker struct { func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) { defer func() { if ck.Chunk != nil { - w.srcChunkPool <- ck.Chunk + w.srcChunkPool.Put(ck.Chunk) } }() rs, err := w.indexIngestBaseWorker.HandleTask(ck) @@ -839,7 +839,7 @@ type indexIngestBaseWorker struct { restore func(sessionctx.Context) writers []ingest.Writer - srcChunkPool chan *chunk.Chunk + srcChunkPool *sync.Pool // only available in global sort totalCount *atomic.Int64 } From 85c733e11964a0ab1a2f217a7c1d4c3af755d163 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 18 Nov 2024 20:40:04 +0800 Subject: [PATCH 04/22] refactor get reorg config --- pkg/ddl/backfilling.go | 10 ++--- pkg/ddl/backfilling_dist_executor.go | 3 +- pkg/ddl/backfilling_operators.go | 9 ++--- pkg/ddl/backfilling_scheduler.go | 4 +- pkg/ddl/db_test.go | 8 ++-- pkg/ddl/executor.go | 8 ++-- pkg/ddl/export_test.go | 9 +++-- pkg/ddl/index.go | 2 +- pkg/ddl/job_worker.go | 4 +- pkg/executor/operate_ddl_jobs.go | 4 +- pkg/meta/model/reorg.go | 38 +++++++++++-------- .../addindextest3/operator_test.go | 22 ++++++++--- 12 files changed, 70 insertions(+), 51 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 0368c4fb2e3b7..f54e2ef479202 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -194,7 +194,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo, id = int(backfillContextID.Add(1)) } - batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + batchCnt := rInfo.ReorgMeta.GetBatchSize() return &backfillCtx{ id: id, ddlCtx: rInfo.jobCtx.oldDDLCtx, @@ -432,7 +432,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // Change the batch size dynamically. currentBatchCnt := w.GetCtx().batchCnt - targetBatchSize := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + targetBatchSize := job.ReorgMeta.GetBatchSize() if targetBatchSize != currentBatchCnt { w.GetCtx().batchCnt = targetBatchSize logger.Info("adjust backfill batch size success", @@ -707,7 +707,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + importConc := job.ReorgMeta.GetConcurrency() bcCtx, err := ingest.LitBackCtxMgr.Register( ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS) if err != nil { @@ -796,7 +796,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return case <-ticker.C: newReaderCnt, newWriterCnt := expectedIngestWorkerCnt( - job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), + job.ReorgMeta.GetConcurrency(), avgRowSize) reader.TuneWorkerPoolSize(int32(newReaderCnt)) writer.TuneWorkerPoolSize(int32(newWriterCnt)) @@ -1021,7 +1021,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( return case <-ticker.C: currentWorkerCnt := scheduler.currentWorkerSize() - targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency() if currentWorkerCnt != targetWorkerCnt { err := scheduler.adjustWorkerSize() if err != nil { diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index d9b5e6c062273..3e4127668c09a 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" @@ -153,7 +152,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, - job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), + job.ReorgMeta.GetConcurrency(), job.RealStartTS, ) } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index e63bd071b234d..fe7e8b7157da5 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/pkg/resourcemanager/util" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -180,7 +179,7 @@ func NewAddIndexIngestPipeline( } srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize, rm) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.GetBatchSize(), rm) ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener) @@ -245,7 +244,7 @@ func NewWriteIndexToExternalStoragePipeline( }) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize, nil) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.GetBatchSize(), nil) writeOp := NewWriteExternalStoreOperator( ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, @@ -274,7 +273,7 @@ func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *syn return &sync.Pool{ New: func() interface{} { return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, - reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) + reorgMeta.GetBatchSize()) }, } } @@ -598,7 +597,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor func (w *tableScanWorker) getChunk() *chunk.Chunk { targetCap := ingest.CopReadBatchSize(w.hintBatchSize) if w.reorgMeta != nil { - targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) + targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSize()) } chk := w.srcChkPool.Get().(*chunk.Chunk) if chk.Capacity() != targetCap { diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index ddf78334e2200..6bf9f6f0fd5ed 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -87,7 +87,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses if err != nil { return nil, err } - workerCnt := info.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + workerCnt := info.ReorgMeta.GetConcurrency() return &txnBackfillScheduler{ ctx: ctx, reorgInfo: info, @@ -247,7 +247,7 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) } func (b *txnBackfillScheduler) expectedWorkerSize() (size int) { - workerCnt := b.reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency() return min(workerCnt, maxBackfillWorkerSize) } diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 2cd989ce1107f..f53d9e05f5317 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1171,16 +1171,16 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) { insertMockJob2Table(tk, &job) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID)) j := getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.Concurrency, 8) + require.Equal(t, j.ReorgMeta.GetConcurrency(), 8) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID)) j = getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.BatchSize, 256) + require.Equal(t, j.ReorgMeta.GetBatchSize(), 256) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID)) j = getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.Concurrency, 16) - require.Equal(t, j.ReorgMeta.BatchSize, 512) + require.Equal(t, j.ReorgMeta.GetConcurrency(), 16) + require.Equal(t, j.ReorgMeta.GetBatchSize(), 512) deleteJobMetaByID(tk, job.ID) } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 0a3c0ceb846ad..4327eb0661f22 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4943,10 +4943,10 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. reorgMeta.IsFastReorg = variable.EnableFastReorg.Load() reorgMeta.TargetScope = variable.ServiceScope.Load() if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok { - reorgMeta.Concurrency = variable.TidbOptInt(sv, 0) + reorgMeta.SetConcurrency(variable.TidbOptInt(sv, 0)) } if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { - reorgMeta.BatchSize = variable.TidbOptInt(sv, 0) + reorgMeta.SetBatchSize(variable.TidbOptInt(sv, 0)) } if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg { @@ -4971,8 +4971,8 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. zap.Bool("enableDistTask", reorgMeta.IsDistReorg), zap.Bool("enableFastReorg", reorgMeta.IsFastReorg), zap.String("targetScope", reorgMeta.TargetScope), - zap.Int("concurrency", reorgMeta.Concurrency), - zap.Int("batchSize", reorgMeta.BatchSize), + zap.Int("concurrency", reorgMeta.GetConcurrency()), + zap.Int("batchSize", reorgMeta.GetBatchSize()), ) return reorgMeta, nil } diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 8f651549cb264..2f0007fe273eb 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -16,6 +16,7 @@ package ddl_test import ( "context" + "sync" "time" "github.com/ngaut/pools" @@ -41,9 +42,11 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, return ctx, nil }, 8, 8, 0) sessPool := session.NewSessionPool(resPool) - srcChkPool := make(chan *chunk.Chunk, 10) - for i := 0; i < 10; i++ { - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize) + srcChkPool := &sync.Pool{ + New: func() interface{} { + return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, + batchSize) + }, } opCtx, cancel := ddl.NewLocalOperatorCtx(context.Background(), 1) defer cancel() diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 0a733715625d5..bee8cd7330b24 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2511,7 +2511,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { }) } else { job := reorgInfo.Job - workerCntLimit := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + workerCntLimit := job.ReorgMeta.GetConcurrency() cpuCount, err := handle.GetCPUCountOfNode(ctx) if err != nil { return err diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index f1bc56a718ee0..45d574352c279 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -865,8 +865,8 @@ func (w *worker) runOneJobStep( return case model.JobStateRunning: if latestJob.IsAlterable() { - job.ReorgMeta.Concurrency = latestJob.ReorgMeta.Concurrency - job.ReorgMeta.BatchSize = latestJob.ReorgMeta.BatchSize + job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrency()) + job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSize()) } } } diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 4aa14405aa665..90f1d0471a078 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -201,13 +201,13 @@ func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminComma case core.AlterDDLJobThread: if opt.Value != nil { cons := opt.Value.(*expression.Constant) - job.ReorgMeta.Concurrency = int(cons.Value.GetInt64()) + job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64())) } job.AdminOperator = byWho case core.AlterDDLJobBatchSize: if opt.Value != nil { cons := opt.Value.(*expression.Constant) - job.ReorgMeta.BatchSize = int(cons.Value.GetInt64()) + job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64())) } job.AdminOperator = byWho default: diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index 01be1d69fcbc1..c8e645d92798c 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -16,6 +16,7 @@ package model import ( "encoding/json" + "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -71,26 +72,31 @@ type DDLReorgMeta struct { ResourceGroupName string `json:"resource_group_name"` Version int64 `json:"version"` TargetScope string `json:"target_scope"` - // These two variables are set when corresponding session variables are set explicitly. When they are set, - // user cannot change it by setting the global one. Otherwise, they can be adjusted dynamically through global var. - Concurrency int `json:"concurrency"` - BatchSize int `json:"batch_size"` + // These two variables are used to control the concurrency and batch size of the reorganization process. + // They can be adjusted dynamically through `admin alter ddl jobs` command. + // Note: Don't get or set these two variables directly, use the functions instead. + Concurrency int64 `json:"concurrency"` + BatchSize int64 `json:"batch_size"` } -// GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta or returns the default value. -func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { - if dm == nil || dm.Concurrency == 0 { - return defaultVal - } - return dm.Concurrency +// GetConcurrency gets the concurrency from DDLReorgMeta. +func (dm *DDLReorgMeta) GetConcurrency() int { + return int(atomic.LoadInt64(&dm.Concurrency)) } -// GetBatchSizeOrDefault gets the batch size from DDLReorgMeta or returns the default value. -func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { - if dm == nil || dm.BatchSize == 0 { - return defaultVal - } - return dm.BatchSize +// SetConcurrency sets the concurrency in DDLReorgMeta. +func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { + atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) +} + +// GetBatchSize gets the batch size from DDLReorgMeta. +func (dm *DDLReorgMeta) GetBatchSize() int { + return int(atomic.LoadInt64(&dm.BatchSize)) +} + +// SetBatchSize sets the batch size in DDLReorgMeta. +func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { + atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) } const ( diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index e0784ded8982b..d7742a7b1ec0f 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -17,7 +17,9 @@ package addindextest import ( "context" "fmt" + "github.com/pingcap/tidb/pkg/table/tables" "strings" + "sync" "sync/atomic" "testing" @@ -35,7 +37,6 @@ import ( pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" - "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/tests/realtikvtest" @@ -48,6 +49,12 @@ func init() { }) } +func TestDd(t *testing.T) { + for i := 0; i < 1; i++ { + TestBackfillOperators(t) + } +} + func TestBackfillOperators(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store) @@ -88,9 +95,10 @@ func TestBackfillOperators(t *testing.T) { var chunkResults []ddl.IndexRecordChunk { // Make sure the buffer is large enough since the chunks do not recycled. - srcChkPool := make(chan *chunk.Chunk, regionCnt*2) - for i := 0; i < regionCnt*2; i++ { - srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, 100) + srcChkPool := &sync.Pool{ + New: func() interface{} { + return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, 100) + }, } ctx := context.Background() @@ -134,7 +142,11 @@ func TestBackfillOperators(t *testing.T) { values = append(values, val) } - srcChkPool := make(chan *chunk.Chunk, regionCnt*2) + srcChkPool := &sync.Pool{ + New: func() interface{} { + return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, 100) + }, + } pTbl := tbl.(table.PhysicalTable) index := tables.NewIndex(pTbl.GetPhysicalID(), tbl.Meta(), idxInfo) mockBackendCtx := &ingest.MockBackendCtx{} From 35bedece453e7197be4c0337bb4f571f9bf58cf8 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 18 Nov 2024 21:34:18 +0800 Subject: [PATCH 05/22] fix nogo --- pkg/ddl/backfilling_operators.go | 2 +- tests/realtikvtest/addindextest3/operator_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index fe7e8b7157da5..180c8f5d7e981 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -271,7 +271,7 @@ func NewWriteIndexToExternalStoragePipeline( func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *sync.Pool { return &sync.Pool{ - New: func() interface{} { + New: func() any { return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, reorgMeta.GetBatchSize()) }, diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index d7742a7b1ec0f..74bf85d894ad9 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -96,7 +96,7 @@ func TestBackfillOperators(t *testing.T) { { // Make sure the buffer is large enough since the chunks do not recycled. srcChkPool := &sync.Pool{ - New: func() interface{} { + New: func() any { return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, 100) }, } @@ -143,7 +143,7 @@ func TestBackfillOperators(t *testing.T) { } srcChkPool := &sync.Pool{ - New: func() interface{} { + New: func() any { return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, 100) }, } From 9859b8b9d3032f92705c1f0c45fe85757b4066fd Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 18 Nov 2024 23:42:01 +0800 Subject: [PATCH 06/22] fix nogo --- pkg/ddl/export_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 2f0007fe273eb..4585bcf297d6f 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -43,7 +43,7 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, }, 8, 8, 0) sessPool := session.NewSessionPool(resPool) srcChkPool := &sync.Pool{ - New: func() interface{} { + New: func() any { return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize) }, From cd537bddd2419337bf50d5061bb4ed336ef7813b Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Mon, 18 Nov 2024 23:58:34 +0800 Subject: [PATCH 07/22] fix nogo --- tests/realtikvtest/addindextest3/operator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 74bf85d894ad9..215eda7d0d055 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -17,7 +17,6 @@ package addindextest import ( "context" "fmt" - "github.com/pingcap/tidb/pkg/table/tables" "strings" "sync" "sync/atomic" @@ -37,6 +36,7 @@ import ( pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/tests/realtikvtest" From 18321ec1bdc8f16f744b4c75db3636aefafff73b Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 12:14:32 +0800 Subject: [PATCH 08/22] update --- pkg/ddl/backfilling.go | 43 +++++++++++-------- pkg/disttask/operator/operator.go | 5 +++ .../addindextest3/operator_test.go | 6 --- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index f54e2ef479202..f185aac4c9118 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -780,30 +780,39 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( // Adjust worker pool size dynamically. go func() { opR, opW := pipe.GetLocalIngestModeReaderAndWriter() - reader, ok := opR.(*TableScanOperator) - if !ok { - logutil.DDLIngestLogger().Error("unexpected operator type", zap.Int64("jobID", job.ID), zap.Error(err)) + if opR == nil || opW == nil { + logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID)) + return } - writer, ok := opW.(*IndexIngestOperator) - if !ok { - logutil.DDLIngestLogger().Error("unexpected operator type", zap.Int64("jobID", job.ID), zap.Error(err)) + reader, readerOk := opR.(*TableScanOperator) + writer, writerOk := opW.(*IndexIngestOperator) + if !readerOk || !writerOk { + logutil.DDLIngestLogger().Error( + "unexpected operator types, config can't be adjusted", + zap.Int64("jobID", job.ID), + zap.Bool("isReaderValid", readerOk), + zap.Bool("isWriterValid", writerOk), + ) + return } - ticker := time.NewTicker(UpdateReorgCfgInterval) + ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: - newReaderCnt, newWriterCnt := expectedIngestWorkerCnt( - job.ReorgMeta.GetConcurrency(), - avgRowSize) - reader.TuneWorkerPoolSize(int32(newReaderCnt)) - writer.TuneWorkerPoolSize(int32(newWriterCnt)) + targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize) + currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize() + if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt { + continue + } + reader.TuneWorkerPoolSize(int32(targetReaderCnt)) + writer.TuneWorkerPoolSize(int32(targetWriterCnt)) logutil.DDLIngestLogger().Info("adjust backfill worker count", zap.Int64("jobID", job.ID), - zap.Int("table scan operator count", newReaderCnt), - zap.Int("index ingest operator count", newWriterCnt)) + zap.Int("table scan operator count", targetReaderCnt), + zap.Int("index ingest operator count", targetWriterCnt)) } } }() @@ -863,8 +872,8 @@ func (s *localRowCntListener) SetTotal(total int) { s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total)) } -// UpdateReorgCfgInterval is the interval to check and update reorg configuration. -const UpdateReorgCfgInterval = 2 * time.Second +// UpdateDDLJobReorgCfgInterval is the interval to check and update reorg configuration. +const UpdateDDLJobReorgCfgInterval = 2 * time.Second // writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition. // For a partitioned table, it should be handled partition by partition. @@ -1013,7 +1022,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( // update the worker cnt goroutine go func() { - ticker := time.NewTicker(UpdateReorgCfgInterval) + ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval) defer ticker.Stop() for { select { diff --git a/pkg/disttask/operator/operator.go b/pkg/disttask/operator/operator.go index d6c690d650db6..d66baffedab29 100644 --- a/pkg/disttask/operator/operator.go +++ b/pkg/disttask/operator/operator.go @@ -98,6 +98,11 @@ func (c *AsyncOperator[T, R]) TuneWorkerPoolSize(workerNum int32) { c.pool.Tune(workerNum) } +// GetWorkerPoolSize returns the worker pool size. +func (c *AsyncOperator[T, R]) GetWorkerPoolSize() int32 { + return c.pool.Cap() +} + type asyncWorker[T, R any] struct { transform func(T) R } diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 215eda7d0d055..57b204a4007c2 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -49,12 +49,6 @@ func init() { }) } -func TestDd(t *testing.T) { - for i := 0; i < 1; i++ { - TestBackfillOperators(t) - } -} - func TestBackfillOperators(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store) From fa4ab649c5f0e2a587bf8db36a40b3d0bc9f9ad6 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 12:38:26 +0800 Subject: [PATCH 09/22] unify log info --- pkg/ddl/backfilling.go | 20 +++++++++----------- pkg/ddl/backfilling_operators.go | 2 +- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index f185aac4c9118..bc5eecccc7ef9 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -435,9 +435,9 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { targetBatchSize := job.ReorgMeta.GetBatchSize() if targetBatchSize != currentBatchCnt { w.GetCtx().batchCnt = targetBatchSize - logger.Info("adjust backfill batch size success", - zap.Int("current batch size", w.GetCtx().batchCnt), - zap.Int64("job ID", job.ID)) + logger.Info("adjust ddl job config success", + zap.Int64("job ID", job.ID), + zap.Int("current batch size", w.GetCtx().batchCnt)) } result := w.handleBackfillTask(d, task, bf) w.sendResult(result) @@ -809,10 +809,10 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( } reader.TuneWorkerPoolSize(int32(targetReaderCnt)) writer.TuneWorkerPoolSize(int32(targetWriterCnt)) - logutil.DDLIngestLogger().Info("adjust backfill worker count", + logutil.DDLIngestLogger().Info("adjust ddl job config success", zap.Int64("jobID", job.ID), - zap.Int("table scan operator count", targetReaderCnt), - zap.Int("index ingest operator count", targetWriterCnt)) + zap.Int32("table scan operator count", reader.GetWorkerPoolSize()), + zap.Int32("index ingest operator count", writer.GetWorkerPoolSize())) } } }() @@ -1034,13 +1034,11 @@ func (dc *ddlCtx) writePhysicalTableRecord( if currentWorkerCnt != targetWorkerCnt { err := scheduler.adjustWorkerSize() if err != nil { - logutil.DDLLogger().Warn("cannot adjust backfill worker count", - zap.Int64("job ID", reorgInfo.ID), + logutil.DDLLogger().Error("adjust ddl job config failed", zap.Error(err)) } else { - logutil.DDLLogger().Info("adjust backfill worker count success", - zap.Int("current worker count", scheduler.currentWorkerSize()), - zap.Int64("job ID", reorgInfo.ID)) + logutil.DDLLogger().Info("adjust ddl job config success", + zap.Int("current worker count", scheduler.currentWorkerSize())) } } } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 180c8f5d7e981..429e52c2532e1 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -602,7 +602,7 @@ func (w *tableScanWorker) getChunk() *chunk.Chunk { chk := w.srcChkPool.Get().(*chunk.Chunk) if chk.Capacity() != targetCap { chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, targetCap) - logutil.Logger(w.ctx).Info("adjust backfill batch size success", zap.Int("current batch size", targetCap)) + logutil.Logger(w.ctx).Info("adjust ddl job config success", zap.Int("current batch size", chk.Capacity())) } chk.Reset() return chk From 52594667955b6a9d7371219525c1036937e9209d Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 12:57:01 +0800 Subject: [PATCH 10/22] check and update atomic value --- pkg/meta/model/reorg.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index c8e645d92798c..0fc3e504bd9f1 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -86,7 +86,10 @@ func (dm *DDLReorgMeta) GetConcurrency() int { // SetConcurrency sets the concurrency in DDLReorgMeta. func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { - atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) + currentValue := atomic.LoadInt64(&dm.Concurrency) + if currentValue != int64(concurrency) { + atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) + } } // GetBatchSize gets the batch size from DDLReorgMeta. @@ -96,7 +99,10 @@ func (dm *DDLReorgMeta) GetBatchSize() int { // SetBatchSize sets the batch size in DDLReorgMeta. func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { - atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) + currentValue := atomic.LoadInt64(&dm.BatchSize) + if currentValue != int64(batchSize) { + atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) + } } const ( From 6e8950eb108bfdb945e4d4ffc58b9d06df3eac3b Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 17:02:57 +0800 Subject: [PATCH 11/22] add ut case --- pkg/ddl/backfilling_test.go | 34 ++++++++++++++ .../addindextest3/operator_test.go | 47 +++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index cb769947e4d72..fb1627bf6cd2b 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -16,6 +16,8 @@ package ddl import ( "bytes" + "github.com/pingcap/tidb/dumpling/context" + "github.com/pingcap/tidb/pkg/ddl/copr" "testing" "time" @@ -485,3 +487,35 @@ func TestValidateAndFillRanges(t *testing.T) { err = validateAndFillRanges(ranges, []byte("b"), []byte("f")) require.Error(t, err) } + +func TestTuneTableScanWorkerBatchSize(t *testing.T) { + reorgMeta := &model.DDLReorgMeta{ + Concurrency: 4, + BatchSize: 32, + } + copCtx := &copr.CopContextSingleIndex{ + CopContextBase: &copr.CopContextBase{ + FieldTypes: []*types.FieldType{}, + }, + } + opCtx, cancel := NewDistTaskOperatorCtx(context.Background(), 1, 1) + w := tableScanWorker{ + copCtx: copCtx, + ctx: opCtx, + srcChkPool: createChunkPool(copCtx, reorgMeta), + hintBatchSize: 32, + reorgMeta: reorgMeta, + } + for i := 0; i < 10; i++ { + chk := w.getChunk() + require.Equal(t, 32, chk.Capacity()) + w.srcChkPool.Put(chk) + } + reorgMeta.SetBatchSize(64) + for i := 0; i < 10; i++ { + chk := w.getChunk() + require.Equal(t, 64, chk.Capacity()) + w.srcChkPool.Put(chk) + } + cancel() +} diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 57b204a4007c2..a6b67ee79279a 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -374,3 +374,50 @@ func (p *sessPoolForTest) Get() (sessionctx.Context, error) { func (p *sessPoolForTest) Put(sctx sessionctx.Context) { p.pool.Put(sctx.(pools.Resource)) } + +func TestTuneWorkerPoolSize(t *testing.T) { + store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) + tk := testkit.NewTestKit(t, store) + tbl, idxInfo, _, _, copCtx := prepare(t, tk, dom, 10) + sessPool := newSessPoolForTest(t, store) + + // Test TableScanOperator. + { + ctx := context.Background() + opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, nil, 2, nil, 0, nil) + + scanOp.Open() + require.Equal(t, scanOp.GetWorkerPoolSize(), int32(2)) + scanOp.TuneWorkerPoolSize(8) + require.Equal(t, scanOp.GetWorkerPoolSize(), int32(8)) + scanOp.TuneWorkerPoolSize(1) + require.Equal(t, scanOp.GetWorkerPoolSize(), int32(1)) + + cancel() + require.NoError(t, opCtx.OperatorErr()) + } + + // Test IndexIngestOperator. + { + ctx := context.Background() + opCtx, cancel := ddl.NewDistTaskOperatorCtx(ctx, 1, 1) + pTbl := tbl.(table.PhysicalTable) + index := tables.NewIndex(pTbl.GetPhysicalID(), tbl.Meta(), idxInfo) + mockBackendCtx := &ingest.MockBackendCtx{} + mockEngine := ingest.NewMockEngineInfo(nil) + ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, mockBackendCtx, sessPool, pTbl, []table.Index{index}, + []ingest.Engine{mockEngine}, nil, 2, nil, + nil, &ddl.EmptyRowCntListener{}) + + ingestOp.Open() + require.Equal(t, ingestOp.GetWorkerPoolSize(), int32(2)) + ingestOp.TuneWorkerPoolSize(8) + require.Equal(t, ingestOp.GetWorkerPoolSize(), int32(8)) + ingestOp.TuneWorkerPoolSize(1) + require.Equal(t, ingestOp.GetWorkerPoolSize(), int32(1)) + + cancel() + require.NoError(t, opCtx.OperatorErr()) + } +} From 64f9f4d9f0074c185b2423614c461fee7a300420 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 17:07:01 +0800 Subject: [PATCH 12/22] start adjust go routine after pipeline execute --- pkg/ddl/backfilling.go | 91 +++++++++++++++++-------------- pkg/ddl/backfilling_read_index.go | 4 +- 2 files changed, 51 insertions(+), 44 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index bc5eecccc7ef9..8d57a9cca4cc5 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -777,47 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return err } - // Adjust worker pool size dynamically. - go func() { - opR, opW := pipe.GetLocalIngestModeReaderAndWriter() - if opR == nil || opW == nil { - logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID)) - return - } - reader, readerOk := opR.(*TableScanOperator) - writer, writerOk := opW.(*IndexIngestOperator) - if !readerOk || !writerOk { - logutil.DDLIngestLogger().Error( - "unexpected operator types, config can't be adjusted", - zap.Int64("jobID", job.ID), - zap.Bool("isReaderValid", readerOk), - zap.Bool("isWriterValid", writerOk), - ) - return - } - ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize) - currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize() - if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt { - continue - } - reader.TuneWorkerPoolSize(int32(targetReaderCnt)) - writer.TuneWorkerPoolSize(int32(targetWriterCnt)) - logutil.DDLIngestLogger().Info("adjust ddl job config success", - zap.Int64("jobID", job.ID), - zap.Int32("table scan operator count", reader.GetWorkerPoolSize()), - zap.Int32("index ingest operator count", writer.GetWorkerPoolSize())) - } - } - }() - - err = executeAndClosePipeline(opCtx, pipe) + err = executeAndClosePipeline(opCtx, pipe, reorgInfo.Job, avgRowSize) if err != nil { err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines) if err1 != nil { @@ -834,11 +794,58 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup) } -func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error { +func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) { + opR, opW := pipe.GetLocalIngestModeReaderAndWriter() + if opR == nil || opW == nil { + logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID)) + return + } + reader, readerOk := opR.(*TableScanOperator) + writer, writerOk := opW.(*IndexIngestOperator) + if !readerOk || !writerOk { + logutil.DDLIngestLogger().Error( + "unexpected operator types, config can't be adjusted", + zap.Int64("jobID", job.ID), + zap.Bool("isReaderValid", readerOk), + zap.Bool("isWriterValid", writerOk), + ) + return + } + ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize) + currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize() + if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt { + continue + } + reader.TuneWorkerPoolSize(int32(targetReaderCnt)) + writer.TuneWorkerPoolSize(int32(targetWriterCnt)) + logutil.DDLIngestLogger().Info("adjust ddl job config success", + zap.Int64("jobID", job.ID), + zap.Int32("table scan operator count", reader.GetWorkerPoolSize()), + zap.Int32("index ingest operator count", writer.GetWorkerPoolSize())) + } + } +} + +func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error { err := pipe.Execute() if err != nil { return err } + + // Adjust worker pool size dynamically. + if job != nil && avgRowSize != 0 { + go func() { + adjustWorkerPoolSize(ctx, pipe, job, avgRowSize) + }() + } + err = pipe.Close() if opErr := ctx.OperatorErr(); opErr != nil { return opErr diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 8e848b19b654c..de0b3c8409db6 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta if err != nil { return err } - return executeAndClosePipeline(opCtx, pipe) + return executeAndClosePipeline(opCtx, pipe, nil, 0) } pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency) if err != nil { return err } - err = executeAndClosePipeline(opCtx, pipe) + err = executeAndClosePipeline(opCtx, pipe, nil, 0) if err != nil { // For dist task local based ingest, checkpoint is unsupported. // If there is an error we should keep local sort dir clean. From 0c6394be3f864be2d9ea976640b93e619c4a4f98 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 17:08:21 +0800 Subject: [PATCH 13/22] Update backfilling.go --- pkg/ddl/backfilling.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 8d57a9cca4cc5..3c8ece18b37d2 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -776,7 +776,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( if err != nil { return err } - err = executeAndClosePipeline(opCtx, pipe, reorgInfo.Job, avgRowSize) if err != nil { err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines) From 4f1b08c4e3c737add822116e3c54c95589c54bc5 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 17:15:59 +0800 Subject: [PATCH 14/22] fix nogo --- pkg/ddl/backfilling_operators.go | 1 + pkg/ddl/backfilling_test.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 429e52c2532e1..f0691c77b4b7c 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -175,6 +175,7 @@ func NewAddIndexIngestPipeline( readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) rm := reorgMeta if rm.IsDistReorg { + // Currently, only the batch size of local ingest mode can be adjusted rm = nil } diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index fb1627bf6cd2b..376035f6ad84a 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -16,11 +16,11 @@ package ddl import ( "bytes" - "github.com/pingcap/tidb/dumpling/context" - "github.com/pingcap/tidb/pkg/ddl/copr" "testing" "time" + "github.com/pingcap/tidb/dumpling/context" + "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/ingest" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/errctx" From e183d183f87838890ae417655be5d11b337b2af2 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Tue, 19 Nov 2024 17:34:23 +0800 Subject: [PATCH 15/22] correct import --- pkg/ddl/backfilling_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index 376035f6ad84a..ed0802a45f5c8 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -16,10 +16,10 @@ package ddl import ( "bytes" + "context" "testing" "time" - "github.com/pingcap/tidb/dumpling/context" "github.com/pingcap/tidb/pkg/ddl/copr" "github.com/pingcap/tidb/pkg/ddl/ingest" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" From 86a8f4dcc7a9a5447e69f318d212ef4cc11c2df1 Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:17:32 +0800 Subject: [PATCH 16/22] Update pkg/ddl/backfilling.go Co-authored-by: tangenta --- pkg/ddl/backfilling.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 3c8ece18b37d2..c8b789aabcd6a 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -436,7 +436,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { if targetBatchSize != currentBatchCnt { w.GetCtx().batchCnt = targetBatchSize logger.Info("adjust ddl job config success", - zap.Int64("job ID", job.ID), + zap.Int64("jobID", job.ID), zap.Int("current batch size", w.GetCtx().batchCnt)) } result := w.handleBackfillTask(d, task, bf) From f34e39281995fa488723e7236da2d61741fff2dc Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Thu, 21 Nov 2024 11:18:18 +0800 Subject: [PATCH 17/22] Update pkg/ddl/backfilling.go Co-authored-by: tangenta --- pkg/ddl/backfilling.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index c8b789aabcd6a..0aa157a78cf6e 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -839,7 +839,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job } // Adjust worker pool size dynamically. - if job != nil && avgRowSize != 0 { + if job != nil { go func() { adjustWorkerPoolSize(ctx, pipe, job, avgRowSize) }() From 14e8a4c2c2077cff4050d7a1390377defd45e6a8 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Thu, 21 Nov 2024 13:41:41 +0800 Subject: [PATCH 18/22] handle the 0 case --- pkg/ddl/backfilling.go | 13 +++++++------ pkg/ddl/backfilling_dist_executor.go | 3 ++- pkg/ddl/backfilling_operators.go | 11 +++++++---- pkg/ddl/backfilling_scheduler.go | 4 ++-- pkg/ddl/db_test.go | 8 ++++---- pkg/ddl/executor.go | 4 ++-- pkg/ddl/index.go | 2 +- pkg/ddl/job_worker.go | 4 ++-- pkg/meta/model/reorg.go | 13 ++++++++++--- 9 files changed, 37 insertions(+), 25 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 0aa157a78cf6e..7fae96dd89140 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -194,7 +194,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo, id = int(backfillContextID.Add(1)) } - batchCnt := rInfo.ReorgMeta.GetBatchSize() + batchCnt := rInfo.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())) return &backfillCtx{ id: id, ddlCtx: rInfo.jobCtx.oldDDLCtx, @@ -432,7 +432,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // Change the batch size dynamically. currentBatchCnt := w.GetCtx().batchCnt - targetBatchSize := job.ReorgMeta.GetBatchSize() + targetBatchSize := job.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())) if targetBatchSize != currentBatchCnt { w.GetCtx().batchCnt = targetBatchSize logger.Info("adjust ddl job config success", @@ -707,7 +707,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - importConc := job.ReorgMeta.GetConcurrency() + importConc := job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) bcCtx, err := ingest.LitBackCtxMgr.Register( ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS) if err != nil { @@ -776,7 +776,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( if err != nil { return err } - err = executeAndClosePipeline(opCtx, pipe, reorgInfo.Job, avgRowSize) + err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize) if err != nil { err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines) if err1 != nil { @@ -817,7 +817,8 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job case <-ctx.Done(): return case <-ticker.C: - targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize) + targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt( + job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), avgRowSize) currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize() if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt { continue @@ -1036,7 +1037,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( return case <-ticker.C: currentWorkerCnt := scheduler.currentWorkerSize() - targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency() + targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) if currentWorkerCnt != targetWorkerCnt { err := scheduler.adjustWorkerSize() if err != nil { diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index 3e4127668c09a..af5d656d07a73 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" @@ -152,7 +153,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, - job.ReorgMeta.GetConcurrency(), + job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), job.RealStartTS, ) } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index f0691c77b4b7c..453c53c17ccc1 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/pkg/resourcemanager/util" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -180,7 +181,8 @@ func NewAddIndexIngestPipeline( } srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.GetBatchSize(), rm) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, + reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), rm) ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener) @@ -245,7 +247,8 @@ func NewWriteIndexToExternalStoragePipeline( }) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil) - scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.GetBatchSize(), nil) + scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, + reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), nil) writeOp := NewWriteExternalStoreOperator( ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, @@ -274,7 +277,7 @@ func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *syn return &sync.Pool{ New: func() any { return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, - reorgMeta.GetBatchSize()) + reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))) }, } } @@ -598,7 +601,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor func (w *tableScanWorker) getChunk() *chunk.Chunk { targetCap := ingest.CopReadBatchSize(w.hintBatchSize) if w.reorgMeta != nil { - targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSize()) + targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))) } chk := w.srcChkPool.Get().(*chunk.Chunk) if chk.Capacity() != targetCap { diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 6bf9f6f0fd5ed..c9f73c905b5b4 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -87,7 +87,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses if err != nil { return nil, err } - workerCnt := info.ReorgMeta.GetConcurrency() + workerCnt := info.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) return &txnBackfillScheduler{ ctx: ctx, reorgInfo: info, @@ -247,7 +247,7 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) } func (b *txnBackfillScheduler) expectedWorkerSize() (size int) { - workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency() + workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) return min(workerCnt, maxBackfillWorkerSize) } diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index f53d9e05f5317..58897744b0956 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1171,16 +1171,16 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) { insertMockJob2Table(tk, &job) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID)) j := getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.GetConcurrency(), 8) + require.Equal(t, j.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), 8) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID)) j = getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.GetBatchSize(), 256) + require.Equal(t, j.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), 256) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID)) j = getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.GetConcurrency(), 16) - require.Equal(t, j.ReorgMeta.GetBatchSize(), 512) + require.Equal(t, j.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), 16) + require.Equal(t, j.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), 512) deleteJobMetaByID(tk, job.ID) } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 4327eb0661f22..d9ce852a77782 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4971,8 +4971,8 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. zap.Bool("enableDistTask", reorgMeta.IsDistReorg), zap.Bool("enableFastReorg", reorgMeta.IsFastReorg), zap.String("targetScope", reorgMeta.TargetScope), - zap.Int("concurrency", reorgMeta.GetConcurrency()), - zap.Int("batchSize", reorgMeta.GetBatchSize()), + zap.Int("concurrency", reorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))), + zap.Int("batchSize", reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))), ) return reorgMeta, nil } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index bee8cd7330b24..7e09af16e734b 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2511,7 +2511,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { }) } else { job := reorgInfo.Job - workerCntLimit := job.ReorgMeta.GetConcurrency() + workerCntLimit := job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) cpuCount, err := handle.GetCPUCountOfNode(ctx) if err != nil { return err diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 45d574352c279..6817175b70ccb 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -865,8 +865,8 @@ func (w *worker) runOneJobStep( return case model.JobStateRunning: if latestJob.IsAlterable() { - job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrency()) - job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSize()) + job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))) + job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))) } } } diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index 0fc3e504bd9f1..71e37118281e1 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -79,8 +79,12 @@ type DDLReorgMeta struct { BatchSize int64 `json:"batch_size"` } -// GetConcurrency gets the concurrency from DDLReorgMeta. -func (dm *DDLReorgMeta) GetConcurrency() int { +// GetConcurrency gets the concurrency from DDLReorgMeta, +// pass the default value in case of the reorg meta coming from old cluster and Concurrency is 0. +func (dm *DDLReorgMeta) GetConcurrency(defaultVal int) int { + if dm == nil || atomic.LoadInt64(&dm.Concurrency) == 0 { + return defaultVal + } return int(atomic.LoadInt64(&dm.Concurrency)) } @@ -93,7 +97,10 @@ func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { } // GetBatchSize gets the batch size from DDLReorgMeta. -func (dm *DDLReorgMeta) GetBatchSize() int { +func (dm *DDLReorgMeta) GetBatchSize(defaultVal int) int { + if dm == nil || atomic.LoadInt64(&dm.BatchSize) == 0 { + return defaultVal + } return int(atomic.LoadInt64(&dm.BatchSize)) } From fe6756e25b94395abbe52be95cec4270df524e01 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Thu, 21 Nov 2024 13:45:55 +0800 Subject: [PATCH 19/22] rename --- pkg/ddl/backfilling.go | 10 +++++----- pkg/ddl/backfilling_dist_executor.go | 2 +- pkg/ddl/backfilling_operators.go | 8 ++++---- pkg/ddl/backfilling_scheduler.go | 4 ++-- pkg/ddl/db_test.go | 8 ++++---- pkg/ddl/executor.go | 4 ++-- pkg/ddl/index.go | 2 +- pkg/ddl/job_worker.go | 4 ++-- pkg/meta/model/reorg.go | 8 ++++---- 9 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 7fae96dd89140..e3d52c22481db 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -194,7 +194,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo, id = int(backfillContextID.Add(1)) } - batchCnt := rInfo.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())) + batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) return &backfillCtx{ id: id, ddlCtx: rInfo.jobCtx.oldDDLCtx, @@ -432,7 +432,7 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) { // Change the batch size dynamically. currentBatchCnt := w.GetCtx().batchCnt - targetBatchSize := job.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())) + targetBatchSize := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) if targetBatchSize != currentBatchCnt { w.GetCtx().batchCnt = targetBatchSize logger.Info("adjust ddl job config success", @@ -707,7 +707,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - importConc := job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) + importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) bcCtx, err := ingest.LitBackCtxMgr.Register( ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS) if err != nil { @@ -818,7 +818,7 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job return case <-ticker.C: targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt( - job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), avgRowSize) + job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize) currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize() if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt { continue @@ -1037,7 +1037,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( return case <-ticker.C: currentWorkerCnt := scheduler.currentWorkerSize() - targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) + targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) if currentWorkerCnt != targetWorkerCnt { err := scheduler.adjustWorkerSize() if err != nil { diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index af5d656d07a73..d9b5e6c062273 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -153,7 +153,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, - job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), + job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), job.RealStartTS, ) } diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 453c53c17ccc1..d38582ebd2f1c 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -182,7 +182,7 @@ func NewAddIndexIngestPipeline( srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, - reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), rm) + reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), rm) ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener) @@ -248,7 +248,7 @@ func NewWriteIndexToExternalStoragePipeline( srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, - reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), nil) + reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), nil) writeOp := NewWriteExternalStoreOperator( ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, @@ -277,7 +277,7 @@ func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *syn return &sync.Pool{ New: func() any { return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, - reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))) + reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) }, } } @@ -601,7 +601,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor func (w *tableScanWorker) getChunk() *chunk.Chunk { targetCap := ingest.CopReadBatchSize(w.hintBatchSize) if w.reorgMeta != nil { - targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))) + targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) } chk := w.srcChkPool.Get().(*chunk.Chunk) if chk.Capacity() != targetCap { diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index c9f73c905b5b4..ddf78334e2200 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -87,7 +87,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses if err != nil { return nil, err } - workerCnt := info.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) + workerCnt := info.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) return &txnBackfillScheduler{ ctx: ctx, reorgInfo: info, @@ -247,7 +247,7 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context) } func (b *txnBackfillScheduler) expectedWorkerSize() (size int) { - workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) + workerCnt := b.reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) return min(workerCnt, maxBackfillWorkerSize) } diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 58897744b0956..9849edc0739c8 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1171,16 +1171,16 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) { insertMockJob2Table(tk, &job) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID)) j := getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), 8) + require.Equal(t, j.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), 8) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID)) j = getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), 256) + require.Equal(t, j.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), 256) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID)) j = getJobMetaByID(t, tk, job.ID) - require.Equal(t, j.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())), 16) - require.Equal(t, j.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize())), 512) + require.Equal(t, j.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), 16) + require.Equal(t, j.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), 512) deleteJobMetaByID(tk, job.ID) } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index d9ce852a77782..f5805902eee69 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4971,8 +4971,8 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model. zap.Bool("enableDistTask", reorgMeta.IsDistReorg), zap.Bool("enableFastReorg", reorgMeta.IsFastReorg), zap.String("targetScope", reorgMeta.TargetScope), - zap.Int("concurrency", reorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))), - zap.Int("batchSize", reorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))), + zap.Int("concurrency", reorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))), + zap.Int("batchSize", reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))), ) return reorgMeta, nil } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 7e09af16e734b..0a733715625d5 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2511,7 +2511,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { }) } else { job := reorgInfo.Job - workerCntLimit := job.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter())) + workerCntLimit := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) cpuCount, err := handle.GetCPUCountOfNode(ctx) if err != nil { return err diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 6817175b70ccb..01c0dc5547cfc 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -865,8 +865,8 @@ func (w *worker) runOneJobStep( return case model.JobStateRunning: if latestJob.IsAlterable() { - job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrency(int(variable.GetDDLReorgWorkerCounter()))) - job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSize(int(variable.GetDDLReorgBatchSize()))) + job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))) + job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) } } } diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index 71e37118281e1..27e5f5bd75d47 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -79,9 +79,9 @@ type DDLReorgMeta struct { BatchSize int64 `json:"batch_size"` } -// GetConcurrency gets the concurrency from DDLReorgMeta, +// GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta, // pass the default value in case of the reorg meta coming from old cluster and Concurrency is 0. -func (dm *DDLReorgMeta) GetConcurrency(defaultVal int) int { +func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { if dm == nil || atomic.LoadInt64(&dm.Concurrency) == 0 { return defaultVal } @@ -96,8 +96,8 @@ func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { } } -// GetBatchSize gets the batch size from DDLReorgMeta. -func (dm *DDLReorgMeta) GetBatchSize(defaultVal int) int { +// GetBatchSizeOrDefault gets the batch size from DDLReorgMeta. +func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { if dm == nil || atomic.LoadInt64(&dm.BatchSize) == 0 { return defaultVal } From 7993dcc732854e2e221d796f53a14e9f0de933c5 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Thu, 21 Nov 2024 14:41:14 +0800 Subject: [PATCH 20/22] run atomic.LoadInt64() once --- pkg/meta/model/reorg.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index 27e5f5bd75d47..fc117cc8679a4 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -82,34 +82,30 @@ type DDLReorgMeta struct { // GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta, // pass the default value in case of the reorg meta coming from old cluster and Concurrency is 0. func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { - if dm == nil || atomic.LoadInt64(&dm.Concurrency) == 0 { + concurrency := atomic.LoadInt64(&dm.Concurrency) + if dm == nil || concurrency == 0 { return defaultVal } - return int(atomic.LoadInt64(&dm.Concurrency)) + return int(concurrency) } // SetConcurrency sets the concurrency in DDLReorgMeta. func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { - currentValue := atomic.LoadInt64(&dm.Concurrency) - if currentValue != int64(concurrency) { - atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) - } + atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) } // GetBatchSizeOrDefault gets the batch size from DDLReorgMeta. func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { - if dm == nil || atomic.LoadInt64(&dm.BatchSize) == 0 { + batchSize := atomic.LoadInt64(&dm.BatchSize) + if dm == nil || batchSize == 0 { return defaultVal } - return int(atomic.LoadInt64(&dm.BatchSize)) + return int(batchSize) } // SetBatchSize sets the batch size in DDLReorgMeta. func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { - currentValue := atomic.LoadInt64(&dm.BatchSize) - if currentValue != int64(batchSize) { - atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) - } + atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) } const ( From d91f591cc8e5c2e2b4ac9583fbfc02375d5e4cfe Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Thu, 21 Nov 2024 14:41:26 +0800 Subject: [PATCH 21/22] update error message --- pkg/ddl/db_test.go | 4 ++-- pkg/executor/operate_ddl_jobs.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 9849edc0739c8..ed2957c039988 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1206,7 +1206,7 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { insertMockJob2Table(tk, &job) // unsupported job type tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID), - "unsupported DDL operation: add column, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job") + "unsupported DDL operation: add column. Supported DDL operations are: ADD INDEX (with tidb_enable_dist_task=OFF), MODIFY COLUMN, and ALTER TABLE REORGANIZE PARTITION") deleteJobMetaByID(tk, 1) job = model.Job{ @@ -1219,7 +1219,7 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { insertMockJob2Table(tk, &job) // unsupported job type tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID), - "unsupported DDL operation: add index, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job") + "unsupported DDL operation: add index. Supported DDL operations are: ADD INDEX (with tidb_enable_dist_task=OFF), MODIFY COLUMN, and ALTER TABLE REORGANIZE PARTITION") deleteJobMetaByID(tk, 1) } diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 90f1d0471a078..5a48ee4fd535a 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -169,8 +169,8 @@ func (e *AlterDDLJobExec) processAlterDDLJobConfig( continue } if !job.IsAlterable() { - return fmt.Errorf("unsupported DDL operation: %s, "+ - "only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job", job.Type.String()) + return fmt.Errorf("unsupported DDL operation: %s. "+ + "Supported DDL operations are: ADD INDEX (with tidb_enable_dist_task=OFF), MODIFY COLUMN, and ALTER TABLE REORGANIZE PARTITION", job.Type.String()) } if err = e.updateReorgMeta(job, model.AdminCommandByEndUser); err != nil { continue From ff941ec4a533de848c846d19e9c58d7cae184810 Mon Sep 17 00:00:00 2001 From: fzzf678 Date: Thu, 21 Nov 2024 18:00:35 +0800 Subject: [PATCH 22/22] use interface get config --- pkg/ddl/executor.go | 8 ++++---- pkg/executor/show_ddl_jobs.go | 10 ++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 2636f675014fc..cba9955689c25 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4923,10 +4923,10 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro m := NewDDLReorgMeta(sctx) setReorgParam := func() { if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok { - m.Concurrency = variable.TidbOptInt(sv, 0) + m.SetConcurrency(variable.TidbOptInt(sv, 0)) } if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { - m.BatchSize = variable.TidbOptInt(sv, 0) + m.SetBatchSize(variable.TidbOptInt(sv, 0)) } } setDistTaskParam := func() error { @@ -4989,8 +4989,8 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro zap.Bool("enableDistTask", m.IsDistReorg), zap.Bool("enableFastReorg", m.IsFastReorg), zap.String("targetScope", m.TargetScope), - zap.Int("concurrency", m.Concurrency), - zap.Int("batchSize", m.BatchSize), + zap.Int("concurrency", m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))), + zap.Int("batchSize", m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))), ) return nil } diff --git a/pkg/executor/show_ddl_jobs.go b/pkg/executor/show_ddl_jobs.go index 4761577cfe215..440e0bc995ffb 100644 --- a/pkg/executor/show_ddl_jobs.go +++ b/pkg/executor/show_ddl_jobs.go @@ -315,11 +315,13 @@ func showCommentsFromJob(job *model.Job) string { } } if job.MayNeedReorg() { - if m.Concurrency != 0 && m.Concurrency != variable.DefTiDBDDLReorgWorkerCount { - labels = append(labels, fmt.Sprintf("thread=%d", m.Concurrency)) + concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + if concurrency != variable.DefTiDBDDLReorgWorkerCount { + labels = append(labels, fmt.Sprintf("thread=%d", concurrency)) } - if m.BatchSize != 0 && m.BatchSize != variable.DefTiDBDDLReorgBatchSize { - labels = append(labels, fmt.Sprintf("batch_size=%d", m.BatchSize)) + if batchSize != variable.DefTiDBDDLReorgBatchSize { + labels = append(labels, fmt.Sprintf("batch_size=%d", batchSize)) } if m.TargetScope != "" { labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope))