Skip to content

Commit 1743ca6

Browse files
authored
importinto: calculate distsql scan concurency in nextgen (#63341) (#63362)
ref #61702
1 parent b0ecb9f commit 1743ca6

File tree

4 files changed

+68
-7
lines changed

4 files changed

+68
-7
lines changed

pkg/disttask/framework/scheduler/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ go_library(
2626
"//pkg/kv",
2727
"//pkg/lightning/log",
2828
"//pkg/sessionctx",
29+
"//pkg/sessionctx/vardef",
2930
"//pkg/util",
3031
"//pkg/util/backoff",
3132
"//pkg/util/cpu",
@@ -59,7 +60,7 @@ go_test(
5960
embed = [":scheduler"],
6061
flaky = True,
6162
race = "off",
62-
shard_count = 38,
63+
shard_count = 39,
6364
deps = [
6465
"//pkg/config",
6566
"//pkg/config/kerneltype",

pkg/disttask/framework/scheduler/autoscaler.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
2424
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
2525
"github.com/pingcap/tidb/pkg/kv"
26+
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
2627
"github.com/pingcap/tidb/pkg/util/cpu"
2728
"github.com/pingcap/tidb/pkg/util/intest"
2829
"github.com/pingcap/tidb/pkg/util/logutil"
@@ -39,7 +40,11 @@ const (
3940
// To improve performance for small tasks, we assume that on an 8c machine,
4041
// importing 200 GiB of data requires full utilization of a single node’s resources.
4142
// Therefore, for every additional 25 GiB, add 1 concurrency unit as an estimate for task concurrency.
42-
baseSizePerConc = 25 * units.GiB
43+
baseSizePerConc = 25 * units.GiB
44+
maxNodeCountLimitForImportInto = 32
45+
// this value is calculated by 256/8, we have test on a 8c machine with 256
46+
// concurrency, it's fast enough for checksum. we can tune this later if needed.
47+
maxDistSQLConcurrencyPerCore = 32
4348
)
4449

4550
// CalcMaxNodeCountByTableSize calculates the maximum number of nodes to execute DXF based on the table size.
@@ -49,7 +54,7 @@ func CalcMaxNodeCountByTableSize(size int64, coresPerNode int) int {
4954

5055
// CalcMaxNodeCountByDataSize calculates the maximum number of nodes to execute DXF based on the data size.
5156
func CalcMaxNodeCountByDataSize(size int64, coresPerNode int) int {
52-
return calcMaxNodeCountBySize(size, coresPerNode, 32)
57+
return calcMaxNodeCountBySize(size, coresPerNode, maxNodeCountLimitForImportInto)
5358
}
5459

5560
func calcMaxNodeCountBySize(size int64, coresPerNode int, factor float64) int {
@@ -112,3 +117,20 @@ func GetExecCPUNode(ctx context.Context) (int, error) {
112117
}
113118
return cpuNode, nil
114119
}
120+
121+
// CalcDistSQLConcurrency calculates the DistSQL concurrency based on the thread
122+
// count, max node count and CPU cores of each node.
123+
// when maxNodeCnt <= 1,we use task concurrency times DefDistSQLScanConcurrency,
124+
// else, we use a linear interpolation method to gradually increase the concurrency
125+
// to maxDistSQLConcurrencyPerCore*nodeCPU.
126+
func CalcDistSQLConcurrency(threadCnt, maxNodeCnt, nodeCPU int) int {
127+
if maxNodeCnt <= 1 {
128+
return threadCnt * vardef.DefDistSQLScanConcurrency
129+
}
130+
131+
start := vardef.DefDistSQLScanConcurrency * nodeCPU
132+
interval := nodeCPU * (maxDistSQLConcurrencyPerCore - vardef.DefDistSQLScanConcurrency)
133+
totalStepCount := maxNodeCountLimitForImportInto - 1
134+
stepCount := min(totalStepCount, maxNodeCnt-1)
135+
return int(float64(start) + float64(interval)*float64(stepCount)/float64(totalStepCount))
136+
}

pkg/disttask/framework/scheduler/autoscaler_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,42 @@ func TestCalcConcurrencyByDataSize(t *testing.T) {
102102
fmt.Sprintf("dataSize:%d cores:%d", tt.dataSize, tt.cores))
103103
}
104104
}
105+
106+
func TestCalcDistSQLConcurrency(t *testing.T) {
107+
tests := []struct {
108+
// concurrency, nodeCount, nodeCoreCount
109+
c, n, nc int
110+
expected int
111+
}{
112+
// on 8c machine
113+
{c: 1, n: 1, nc: 8, expected: 15},
114+
{c: 3, n: 1, nc: 8, expected: 45},
115+
{c: 7, n: 1, nc: 8, expected: 105},
116+
{c: 8, n: 1, nc: 8, expected: 120},
117+
{c: 8, n: 2, nc: 8, expected: 124},
118+
{c: 8, n: 5, nc: 8, expected: 137},
119+
{c: 8, n: 32, nc: 8, expected: 256},
120+
{c: 8, n: 33, nc: 8, expected: 256},
121+
{c: 8, n: 50, nc: 8, expected: 256},
122+
// on 16c machine
123+
{c: 1, n: 1, nc: 16, expected: 15},
124+
{c: 7, n: 1, nc: 16, expected: 105},
125+
{c: 16, n: 1, nc: 16, expected: 240},
126+
{c: 16, n: 5, nc: 16, expected: 275},
127+
{c: 16, n: 32, nc: 16, expected: 512},
128+
{c: 16, n: 33, nc: 16, expected: 512},
129+
{c: 16, n: 50, nc: 16, expected: 512},
130+
// on 32c machine
131+
{c: 1, n: 1, nc: 32, expected: 15},
132+
{c: 7, n: 1, nc: 32, expected: 105},
133+
{c: 32, n: 1, nc: 32, expected: 480},
134+
{c: 32, n: 5, nc: 32, expected: 550},
135+
{c: 32, n: 32, nc: 32, expected: 1024},
136+
{c: 32, n: 33, nc: 32, expected: 1024},
137+
}
138+
for i, tt := range tests {
139+
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
140+
require.Equal(t, tt.expected, CalcDistSQLConcurrency(tt.c, tt.n, tt.nc))
141+
})
142+
}
143+
}

pkg/executor/importer/import.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,9 +1304,11 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
13041304
failpoint.InjectCall("mockImportDataSize", &totalSize)
13051305
e.ThreadCnt = scheduler.CalcConcurrencyByDataSize(totalSize, targetNodeCPUCnt)
13061306
e.MaxNodeCnt = scheduler.CalcMaxNodeCountByDataSize(totalSize, targetNodeCPUCnt)
1307-
e.logger.Info("set import thread count for nextgen kernel",
1307+
e.DistSQLScanConcurrency = scheduler.CalcDistSQLConcurrency(e.ThreadCnt, e.MaxNodeCnt, targetNodeCPUCnt)
1308+
e.logger.Info("auto calculate resource related params",
13081309
zap.Int("thread count", e.ThreadCnt),
13091310
zap.Int("max node count", e.MaxNodeCnt),
1311+
zap.Int("dist sql scan concurrency", e.DistSQLScanConcurrency),
13101312
zap.Int("target node cpu count", targetNodeCPUCnt),
13111313
zap.String("total file size", units.BytesSize(float64(totalSize))))
13121314
}
@@ -1612,6 +1614,3 @@ func GetTargetNodeCPUCnt(ctx context.Context, sourceType DataSourceType, path st
16121614
}
16131615
return handle.GetCPUCountOfNode(ctx)
16141616
}
1615-
1616-
// TestSyncCh is used in unit test to synchronize the execution.
1617-
var TestSyncCh = make(chan struct{})

0 commit comments

Comments
 (0)