Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/kv",
"//pkg/lightning/log",
"//pkg/sessionctx",
"//pkg/sessionctx/vardef",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/cpu",
Expand Down Expand Up @@ -59,7 +60,7 @@ go_test(
embed = [":scheduler"],
flaky = True,
race = "off",
shard_count = 38,
shard_count = 39,
deps = [
"//pkg/config",
"//pkg/config/kerneltype",
Expand Down
26 changes: 24 additions & 2 deletions pkg/disttask/framework/scheduler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/util/cpu"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -39,7 +40,11 @@ const (
// To improve performance for small tasks, we assume that on an 8c machine,
// importing 200 GiB of data requires full utilization of a single node’s resources.
// Therefore, for every additional 25 GiB, add 1 concurrency unit as an estimate for task concurrency.
baseSizePerConc = 25 * units.GiB
baseSizePerConc = 25 * units.GiB
maxNodeCountLimitForImportInto = 32
// this value is calculated by 256/8, we have test on a 8c machine with 256
// concurrency, it's fast enough for checksum. we can tune this later if needed.
maxDistSQLConcurrencyPerCore = 32
)

// CalcMaxNodeCountByTableSize calculates the maximum number of nodes to execute DXF based on the table size.
Expand All @@ -49,7 +54,7 @@ func CalcMaxNodeCountByTableSize(size int64, coresPerNode int) int {

// CalcMaxNodeCountByDataSize calculates the maximum number of nodes to execute DXF based on the data size.
func CalcMaxNodeCountByDataSize(size int64, coresPerNode int) int {
return calcMaxNodeCountBySize(size, coresPerNode, 32)
return calcMaxNodeCountBySize(size, coresPerNode, maxNodeCountLimitForImportInto)
}

func calcMaxNodeCountBySize(size int64, coresPerNode int, factor float64) int {
Expand Down Expand Up @@ -112,3 +117,20 @@ func GetExecCPUNode(ctx context.Context) (int, error) {
}
return cpuNode, nil
}

// CalcDistSQLConcurrency calculates the DistSQL concurrency based on the thread
// count, max node count and CPU cores of each node.
// when maxNodeCnt <= 1,we use task concurrency times DefDistSQLScanConcurrency,
// else, we use a linear interpolation method to gradually increase the concurrency
// to maxDistSQLConcurrencyPerCore*nodeCPU.
func CalcDistSQLConcurrency(threadCnt, maxNodeCnt, nodeCPU int) int {
if maxNodeCnt <= 1 {
return threadCnt * vardef.DefDistSQLScanConcurrency
}

start := vardef.DefDistSQLScanConcurrency * nodeCPU
interval := nodeCPU * (maxDistSQLConcurrencyPerCore - vardef.DefDistSQLScanConcurrency)
totalStepCount := maxNodeCountLimitForImportInto - 1
stepCount := min(totalStepCount, maxNodeCnt-1)
return int(float64(start) + float64(interval)*float64(stepCount)/float64(totalStepCount))
}
39 changes: 39 additions & 0 deletions pkg/disttask/framework/scheduler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,42 @@ func TestCalcConcurrencyByDataSize(t *testing.T) {
fmt.Sprintf("dataSize:%d cores:%d", tt.dataSize, tt.cores))
}
}

func TestCalcDistSQLConcurrency(t *testing.T) {
tests := []struct {
// concurrency, nodeCount, nodeCoreCount
c, n, nc int
expected int
}{
// on 8c machine
{c: 1, n: 1, nc: 8, expected: 15},
{c: 3, n: 1, nc: 8, expected: 45},
{c: 7, n: 1, nc: 8, expected: 105},
{c: 8, n: 1, nc: 8, expected: 120},
{c: 8, n: 2, nc: 8, expected: 124},
{c: 8, n: 5, nc: 8, expected: 137},
{c: 8, n: 32, nc: 8, expected: 256},
{c: 8, n: 33, nc: 8, expected: 256},
{c: 8, n: 50, nc: 8, expected: 256},
// on 16c machine
{c: 1, n: 1, nc: 16, expected: 15},
{c: 7, n: 1, nc: 16, expected: 105},
{c: 16, n: 1, nc: 16, expected: 240},
{c: 16, n: 5, nc: 16, expected: 275},
{c: 16, n: 32, nc: 16, expected: 512},
{c: 16, n: 33, nc: 16, expected: 512},
{c: 16, n: 50, nc: 16, expected: 512},
// on 32c machine
{c: 1, n: 1, nc: 32, expected: 15},
{c: 7, n: 1, nc: 32, expected: 105},
{c: 32, n: 1, nc: 32, expected: 480},
{c: 32, n: 5, nc: 32, expected: 550},
{c: 32, n: 32, nc: 32, expected: 1024},
{c: 32, n: 33, nc: 32, expected: 1024},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
require.Equal(t, tt.expected, CalcDistSQLConcurrency(tt.c, tt.n, tt.nc))
})
}
}
7 changes: 3 additions & 4 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,9 +1317,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
failpoint.InjectCall("mockImportDataSize", &totalSize)
e.ThreadCnt = scheduler.CalcConcurrencyByDataSize(totalSize, targetNodeCPUCnt)
e.MaxNodeCnt = scheduler.CalcMaxNodeCountByDataSize(totalSize, targetNodeCPUCnt)
e.logger.Info("set import thread count for nextgen kernel",
e.DistSQLScanConcurrency = scheduler.CalcDistSQLConcurrency(e.ThreadCnt, e.MaxNodeCnt, targetNodeCPUCnt)
e.logger.Info("auto calculate resource related params",
zap.Int("thread count", e.ThreadCnt),
zap.Int("max node count", e.MaxNodeCnt),
zap.Int("dist sql scan concurrency", e.DistSQLScanConcurrency),
zap.Int("target node cpu count", targetNodeCPUCnt),
zap.String("total file size", units.BytesSize(float64(totalSize))))
}
Expand Down Expand Up @@ -1625,6 +1627,3 @@ func GetTargetNodeCPUCnt(ctx context.Context, sourceType DataSourceType, path st
}
return handle.GetCPUCountOfNode(ctx)
}

// TestSyncCh is used in unit test to synchronize the execution.
var TestSyncCh = make(chan struct{})