Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ go_library(
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/memory",
"//pkg/util/mock",
"//pkg/util/ranger",
"//pkg/util/resourcegrouptag",
Expand All @@ -159,6 +158,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/topsql",
"//pkg/util/topsql/state",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(
ddlObj.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, s.getBackendCtx, cloudStorageURI, estRowSize)
case proto.BackfillStepMergeSort:
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl, cloudStorageURI, estRowSize)
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl, cloudStorageURI)
case proto.BackfillStepWriteAndIngest:
if len(cloudStorageURI) == 0 {
return nil, errors.Errorf("local import does not have write & ingest step")
Expand Down
12 changes: 4 additions & 8 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type mergeSortExecutor struct {
jobID int64
idxNum int
ptbl table.PhysicalTable
avgRowSize int
cloudStoreURI string

mu sync.Mutex
Expand All @@ -47,14 +46,12 @@ func newMergeSortExecutor(
idxNum int,
ptbl table.PhysicalTable,
cloudStoreURI string,
avgRowSize int,
) (*mergeSortExecutor, error) {
return &mergeSortExecutor{
jobID: jobID,
idxNum: idxNum,
ptbl: ptbl,
cloudStoreURI: cloudStoreURI,
avgRowSize: avgRowSize,
}, nil
}

Expand Down Expand Up @@ -88,16 +85,15 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
}

prefix := path.Join(strconv.Itoa(int(m.jobID)), strconv.Itoa(int(subtask.ID)))
partSize, err := getMergeSortPartSize(m.avgRowSize, subtask.Concurrency, m.idxNum)
if err != nil {
return err
}
res := m.GetResource()
memSizePerCon := res.Mem.Capacity() / int64(subtask.Concurrency)
partSize := max(external.MinUploadPartSize, memSizePerCon*int64(external.MaxMergingFilesPerThread)/10000)

return external.MergeOverlappingFiles(
ctx,
sm.DataFiles,
store,
int64(partSize),
partSize,
prefix,
external.DefaultBlockSize,
onClose,
Expand Down
48 changes: 13 additions & 35 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"sync/atomic"
"time"

"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
Expand All @@ -46,7 +48,6 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -112,34 +113,6 @@ func (ctx *OperatorCtx) OperatorErr() error {
return *err
}

func getWriterMemSize(avgRowSize, concurrency, idxNum int) (uint64, error) {
failpoint.Inject("mockWriterMemSize", func() {
failpoint.Return(1*size.GB, nil)
})
_, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
memTotal, err := memory.MemTotal()
if err != nil {
return 0, err
}
memUsed, err := memory.MemUsed()
if err != nil {
return 0, err
}
memAvailable := memTotal - memUsed
memSize := (memAvailable / 2) / uint64(writerCnt) / uint64(idxNum)
logutil.BgLogger().Info("build operators that write index to cloud storage", zap.Uint64("memory total", memTotal), zap.Uint64("memory used", memUsed), zap.Uint64("memory size", memSize))
return memSize, nil
}

func getMergeSortPartSize(avgRowSize int, concurrency int, idxNum int) (uint64, error) {
writerMemSize, err := getWriterMemSize(avgRowSize, concurrency, idxNum)
if err != nil {
return 0, nil
}
// Prevent part count being too large.
return writerMemSize / uint64(concurrency) / 10000 * uint64(external.MergeSortOverlapThreshold), nil
}

// NewAddIndexIngestPipeline creates a pipeline for adding index in ingest mode.
func NewAddIndexIngestPipeline(
ctx *OperatorCtx,
Expand Down Expand Up @@ -186,6 +159,7 @@ func NewAddIndexIngestPipeline(

logutil.Logger(ctx).Info("build add index local storage operators",
zap.Int64("jobID", jobID),
zap.Int("avgRowSize", avgRowSize),
zap.Int("reader", readerCnt),
zap.Int("writer", writerCnt))

Expand All @@ -211,6 +185,7 @@ func NewWriteIndexToExternalStoragePipeline(
reorgMeta *model.DDLReorgMeta,
avgRowSize int,
concurrency int,
resource *proto.StepResource,
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
Expand All @@ -237,16 +212,16 @@ func NewWriteIndexToExternalStoragePipeline(
if err != nil {
return nil, err
}

memSize, err := getWriterMemSize(avgRowSize, concurrency, len(indexes))
if err != nil {
return nil, err
}
memCap := resource.Mem.Capacity()
memSizePerIndex := uint64(memCap / int64(writerCnt*2*len(idxInfos)))
failpoint.Inject("mockWriterMemSize", func() {
memSizePerIndex = 1 * size.GB
})

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSize, reorgMeta)
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSizePerIndex, reorgMeta)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, totalRowCount, metricCounter)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand All @@ -255,6 +230,9 @@ func NewWriteIndexToExternalStoragePipeline(

logutil.Logger(ctx).Info("build add index cloud storage operators",
zap.Int64("jobID", jobID),
zap.String("memCap", units.BytesSize(float64(memCap))),
zap.String("memSizePerIdx", units.BytesSize(float64(memSizePerIndex))),
zap.Int("avgRowSize", avgRowSize),
zap.Int("reader", readerCnt),
zap.Int("writer", writerCnt))

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,5 +306,6 @@ func (r *readIndexExecutor) buildExternalStorePipeline(
r.job.ReorgMeta,
r.avgRowSize,
concurrency,
r.GetResource(),
)
}