Skip to content

Commit 811be5a

Browse files
authored
disttask: add local disk space to node resource managed by DXF (#59607)
ref #49008
1 parent b45a9e8 commit 811be5a

20 files changed

+176
-139
lines changed

pkg/ddl/backfilling_dist_scheduler.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sort"
2424
"time"
2525

26+
"github.com/docker/go-units"
2627
"github.com/pingcap/errors"
2728
"github.com/pingcap/failpoint"
2829
"github.com/pingcap/tidb/br/pkg/storage"
@@ -38,6 +39,7 @@ import (
3839
"github.com/pingcap/tidb/pkg/lightning/config"
3940
"github.com/pingcap/tidb/pkg/meta"
4041
"github.com/pingcap/tidb/pkg/meta/model"
42+
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
4143
"github.com/pingcap/tidb/pkg/store/helper"
4244
"github.com/pingcap/tidb/pkg/table"
4345
"github.com/pingcap/tidb/pkg/util/backoff"
@@ -51,6 +53,7 @@ type LitBackfillScheduler struct {
5153
*scheduler.BaseScheduler
5254
d *ddl
5355
GlobalSort bool
56+
nodeRes *proto.NodeResource
5457
}
5558

5659
var _ scheduler.Extension = (*LitBackfillScheduler)(nil)
@@ -59,6 +62,7 @@ func newLitBackfillScheduler(ctx context.Context, d *ddl, task *proto.Task, para
5962
sch := LitBackfillScheduler{
6063
d: d,
6164
BaseScheduler: scheduler.NewBaseScheduler(ctx, task, param),
65+
nodeRes: param.GetNodeResource(),
6266
}
6367
return &sch
6468
}
@@ -70,7 +74,8 @@ func NewBackfillingSchedulerForTest(d DDL) (scheduler.Extension, error) {
7074
return nil, errors.New("The getDDL result should be the type of *ddl")
7175
}
7276
return &LitBackfillScheduler{
73-
d: ddl,
77+
d: ddl,
78+
nodeRes: &proto.NodeResource{TotalCPU: 4, TotalMem: 16 * units.GiB, TotalDisk: 100 * units.GiB},
7479
}, nil
7580
}
7681

@@ -125,6 +130,9 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
125130
if tblInfo.Partition != nil {
126131
return generatePartitionPlan(ctx, storeWithPD, tblInfo)
127132
}
133+
// TODO(tangenta): use available disk during adding index.
134+
availableDisk := sch.nodeRes.GetTaskDiskResource(task.Concurrency, vardef.DDLDiskQuota.Load())
135+
logger.Info("available local disk space resource", zap.String("size", units.BytesSize(float64(availableDisk))))
128136
return generateNonPartitionPlan(ctx, sch.d, tblInfo, job, sch.GlobalSort, len(execIDs))
129137
case proto.BackfillStepMergeSort:
130138
return generateMergePlan(taskHandle, task, logger)

pkg/ddl/backfilling_dist_scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
142142
ctx = util.WithInternalSourceType(ctx, "handle")
143143
mgr := storage.NewTaskManager(pool)
144144
storage.SetTaskManager(mgr)
145-
schManager := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port")
145+
schManager := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port", proto.NodeResourceForTest)
146146

147147
tk.MustExec("use test")
148148
tk.MustExec("create table t1(id bigint auto_random primary key)")

pkg/ddl/ingest/env.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,20 @@ func InitGlobalLightningEnv(path string) (ok bool) {
9393
return true
9494
}
9595

96-
// GenIngestTempDataDir generates a path for DDL ingest.
96+
// GetIngestTempDataDir gets the path for DDL ingest.
9797
// Format: ${temp-dir}/tmp_ddl-{port}
98-
func GenIngestTempDataDir() (string, error) {
98+
func GetIngestTempDataDir() string {
9999
tidbCfg := config.GetGlobalConfig()
100100
sortPathSuffix := "/tmp_ddl-" + strconv.Itoa(int(tidbCfg.Port))
101101
sortPath := filepath.Join(tidbCfg.TempDir, sortPathSuffix)
102102

103+
return sortPath
104+
}
105+
106+
// GenIngestTempDataDir generates a path for DDL ingest, and create the dir if not exists.
107+
// Format: ${temp-dir}/tmp_ddl-{port}
108+
func GenIngestTempDataDir() (string, error) {
109+
sortPath := GetIngestTempDataDir()
103110
if _, err := os.Stat(sortPath); err != nil {
104111
if !os.IsNotExist(err) {
105112
logutil.DDLIngestLogger().Error(LitErrStatDirFail,

pkg/disttask/framework/proto/node.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package proto
1616

17+
import "github.com/docker/go-units"
18+
1719
// ManagedNode is a TiDB node that is managed by the framework.
1820
type ManagedNode struct {
1921
// ID see GenerateExecID, it's named as host in the meta table.
@@ -23,3 +25,38 @@ type ManagedNode struct {
2325
Role string
2426
CPUCount int
2527
}
28+
29+
// NodeResource is the resource of the node.
30+
// exported for test.
31+
type NodeResource struct {
32+
TotalCPU int
33+
TotalMem int64
34+
TotalDisk uint64
35+
}
36+
37+
// NewNodeResource creates a new NodeResource.
38+
func NewNodeResource(totalCPU int, totalMem int64, totalDisk uint64) *NodeResource {
39+
return &NodeResource{
40+
TotalCPU: totalCPU,
41+
TotalMem: totalMem,
42+
TotalDisk: totalDisk,
43+
}
44+
}
45+
46+
// NodeResourceForTest is only used for test.
47+
var NodeResourceForTest = NewNodeResource(32, 32*units.GB, 100*units.GB)
48+
49+
// GetStepResource gets the step resource according to concurrency.
50+
func (nr *NodeResource) GetStepResource(concurrency int) *StepResource {
51+
return &StepResource{
52+
CPU: NewAllocatable(int64(concurrency)),
53+
// same proportion as CPU
54+
Mem: NewAllocatable(int64(float64(concurrency) / float64(nr.TotalCPU) * float64(nr.TotalMem))),
55+
}
56+
}
57+
58+
// GetTaskDiskResource gets available disk for a task.
59+
func (nr *NodeResource) GetTaskDiskResource(concurrency int, quotaHint uint64) uint64 {
60+
availableDisk := min(nr.TotalDisk, quotaHint)
61+
return uint64(float64(concurrency) / float64(nr.TotalCPU) * float64(availableDisk))
62+
}

pkg/disttask/framework/scheduler/balancer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func TestBalanceMultipleTasks(t *testing.T) {
407407
}
408408
ctx := context.Background()
409409

410-
manager := NewManager(ctx, mockTaskMgr, "1")
410+
manager := NewManager(ctx, mockTaskMgr, "1", proto.NodeResourceForTest)
411411
manager.slotMgr.updateCapacity(16)
412412
manager.nodeMgr.nodes.Store(&[]proto.ManagedNode{{ID: "tidb1", Role: ""}, {ID: "tidb2", Role: ""}, {ID: "tidb3", Role: ""}})
413413
b := newBalancer(Param{

pkg/disttask/framework/scheduler/interface.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ type Param struct {
146146
slotMgr *SlotManager
147147
serverID string
148148
allocatedSlots bool
149+
nodeRes *proto.NodeResource
150+
}
151+
152+
// GetNodeResource returns the node resource.
153+
func (p *Param) GetNodeResource() *proto.NodeResource {
154+
return p.nodeRes
149155
}
150156

151157
// schedulerFactoryFn is used to create a scheduler.

pkg/disttask/framework/scheduler/scheduler_manager.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ type Manager struct {
118118
// in task order
119119
schedulers []Scheduler
120120
}
121+
nodeRes *proto.NodeResource
121122
}
122123

123124
// NewManager creates a scheduler struct.
124-
func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Manager {
125+
func NewManager(ctx context.Context, taskMgr TaskManager, serverID string, nodeRes *proto.NodeResource) *Manager {
125126
logger := logutil.ErrVerboseLogger()
126127
if intest.InTest {
127128
logger = logger.With(zap.String("server-id", serverID))
@@ -144,6 +145,7 @@ func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Mana
144145
}),
145146
logger: logger,
146147
finishCh: make(chan struct{}, proto.MaxConcurrentTask),
148+
nodeRes: nodeRes,
147149
}
148150
schedulerManager.mu.schedulerMap = make(map[int64]Scheduler)
149151

@@ -333,6 +335,7 @@ func (sm *Manager) startScheduler(basicTask *proto.TaskBase, allocateSlots bool,
333335
slotMgr: sm.slotMgr,
334336
serverID: sm.serverID,
335337
allocatedSlots: allocateSlots,
338+
nodeRes: sm.nodeRes,
336339
})
337340
if err = scheduler.Init(); err != nil {
338341
sm.logger.Error("init scheduler failed", zap.Error(err))

pkg/disttask/framework/scheduler/scheduler_manager_nokit_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func GetTestSchedulerExt(ctrl *gomock.Controller) Extension {
5656
func TestManagerSchedulersOrdered(t *testing.T) {
5757
ctrl := gomock.NewController(t)
5858
defer ctrl.Finish()
59-
mgr := NewManager(context.Background(), nil, "1")
59+
mgr := NewManager(context.Background(), nil, "1", proto.NodeResourceForTest)
6060
for i := 1; i <= 5; i++ {
6161
task := &proto.Task{TaskBase: proto.TaskBase{
6262
ID: int64(i * 10),
@@ -99,7 +99,7 @@ func TestSchedulerCleanupTask(t *testing.T) {
9999
defer ctrl.Finish()
100100
taskMgr := mock.NewMockTaskManager(ctrl)
101101
ctx := context.Background()
102-
mgr := NewManager(ctx, taskMgr, "1")
102+
mgr := NewManager(ctx, taskMgr, "1", proto.NodeResourceForTest)
103103

104104
// normal
105105
tasks := []*proto.Task{
@@ -143,7 +143,7 @@ func TestManagerSchedulerNotAllocateSlots(t *testing.T) {
143143
defer ctrl.Finish()
144144

145145
taskMgr := mock.NewMockTaskManager(ctrl)
146-
mgr := NewManager(context.Background(), taskMgr, "1")
146+
mgr := NewManager(context.Background(), taskMgr, "1", proto.NodeResourceForTest)
147147
RegisterSchedulerFactory(proto.TaskTypeExample,
148148
func(ctx context.Context, task *proto.Task, param Param) Scheduler {
149149
mockScheduler := NewBaseScheduler(ctx, task, param)

pkg/disttask/framework/scheduler/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func MockSchedulerManager(t *testing.T, ctrl *gomock.Controller, pool *pools.Res
9292
ctx := context.WithValue(context.Background(), "etcd", true)
9393
mgr := storage.NewTaskManager(pool)
9494
storage.SetTaskManager(mgr)
95-
sch := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port")
95+
sch := scheduler.NewManager(util.WithInternalSourceType(ctx, "scheduler"), mgr, "host:port", proto.NodeResourceForTest)
9696
scheduler.RegisterSchedulerFactory(proto.TaskTypeExample,
9797
func(ctx context.Context, task *proto.Task, param scheduler.Param) scheduler.Scheduler {
9898
mockScheduler := scheduler.NewBaseScheduler(ctx, task, param)

pkg/disttask/framework/taskexecutor/BUILD.bazel

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,10 @@ go_library(
2424
"//pkg/sessionctx/vardef",
2525
"//pkg/util",
2626
"//pkg/util/backoff",
27-
"//pkg/util/cgroup",
28-
"//pkg/util/cpu",
2927
"//pkg/util/gctuner",
3028
"//pkg/util/intest",
3129
"//pkg/util/logutil",
3230
"//pkg/util/memory",
33-
"@com_github_docker_go_units//:go-units",
3431
"@com_github_pingcap_errors//:errors",
3532
"@com_github_pingcap_failpoint//:failpoint",
3633
"@org_uber_go_zap//:zap",
@@ -50,7 +47,7 @@ go_test(
5047
],
5148
embed = [":taskexecutor"],
5249
flaky = True,
53-
shard_count = 15,
50+
shard_count = 14,
5451
deps = [
5552
"//pkg/disttask/framework/mock",
5653
"//pkg/disttask/framework/mock/execute",
@@ -64,8 +61,6 @@ go_test(
6461
"//pkg/testkit/testfailpoint",
6562
"//pkg/testkit/testsetup",
6663
"//pkg/util/logutil",
67-
"//pkg/util/memory",
68-
"@com_github_docker_go_units//:go-units",
6964
"@com_github_ngaut_pools//:pools",
7065
"@com_github_pingcap_errors//:errors",
7166
"@com_github_stretchr_testify//require",

0 commit comments

Comments
 (0)