Skip to content

Commit b9b9787

Browse files
authored
dxf: support param modify for active running subtask (#58935)
ref #57497
1 parent ddf8c84 commit b9b9787

File tree

7 files changed

+426
-15
lines changed

7 files changed

+426
-15
lines changed

pkg/disttask/framework/integrationtests/modify_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
7474
executor.EXPECT().SetResource(gomock.Any()).AnyTimes()
7575
executor.EXPECT().Cleanup(gomock.Any()).Return(nil).AnyTimes()
7676
executor.EXPECT().RealtimeSummary().Return(nil).AnyTimes()
77-
executor.EXPECT().TaskMetaModified(gomock.Any()).DoAndReturn(func(newTask *proto.Task) error {
77+
executor.EXPECT().TaskMetaModified(gomock.Any(), gomock.Any()).DoAndReturn(func(newTask *proto.Task) error {
7878
runtimeInfo.currentTask = newTask
7979
return nil
8080
}).AnyTimes()

pkg/disttask/framework/mock/execute/execute_mock.go

Lines changed: 18 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/disttask/framework/taskexecutor/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ go_test(
5050
],
5151
embed = [":taskexecutor"],
5252
flaky = True,
53-
shard_count = 14,
53+
shard_count = 15,
5454
deps = [
5555
"//pkg/disttask/framework/mock",
5656
"//pkg/disttask/framework/mock/execute",

pkg/disttask/framework/taskexecutor/execute/interface.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ type StepExecutor interface {
5353
// TaskMetaModified is called when the task meta is modified, if any error
5454
// happen, framework might recreate the step executor, so don't put code
5555
// that's prone to error in it.
56-
TaskMetaModified(newTask *proto.Task) error
56+
TaskMetaModified(ctx context.Context, newMeta []byte) error
57+
// ResourceModified is called when the resource allowed to be used is modified,
58+
// application must make sure the resource in use conforms to the new resource
59+
// before returning. When reducing resources, the framework depends on this
60+
// to make sure current instance won't OOM.
61+
ResourceModified(ctx context.Context, newResource *proto.StepResource) error
5762
}
5863

5964
// SubtaskSummary contains the summary of a subtask.

pkg/disttask/framework/taskexecutor/interface.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,11 @@ func (*BaseStepExecutor) Cleanup(context.Context) error {
143143
}
144144

145145
// TaskMetaModified implements the StepExecutor interface.
146-
func (*BaseStepExecutor) TaskMetaModified(*proto.Task) error {
147-
return nil
146+
func (*BaseStepExecutor) TaskMetaModified(context.Context, []byte) error {
147+
panic("not implemented")
148+
}
149+
150+
// ResourceModified implements the StepExecutor interface.
151+
func (*BaseStepExecutor) ResourceModified(context.Context, *proto.StepResource) error {
152+
panic("not implemented")
148153
}

pkg/disttask/framework/taskexecutor/task_executor.go

Lines changed: 135 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package taskexecutor
1717
import (
1818
"bytes"
1919
"context"
20+
"runtime"
2021
"sync"
2122
"sync/atomic"
2223
"time"
@@ -46,6 +47,10 @@ var (
4647
// updateSubtaskSummaryInterval is the interval for updating the subtask summary to
4748
// subtask table.
4849
updateSubtaskSummaryInterval = 3 * time.Second
50+
// DetectParamModifyInterval is the interval to detect whether task params
51+
// are modified.
52+
// exported for testing.
53+
DetectParamModifyInterval = 5 * time.Second
4954
)
5055

5156
var (
@@ -78,6 +83,10 @@ func NewParamForTest(taskTable TaskTable, slotMgr *slotManager, nodeRc *NodeReso
7883
// BaseTaskExecutor is the base implementation of TaskExecutor.
7984
type BaseTaskExecutor struct {
8085
Param
86+
// task is a local state that periodically aligned with what's saved in system
87+
// table, but if the task has modified params, it might be updated in memory
88+
// to reflect that some param modification have been applied successfully,
89+
// see detectAndHandleParamModifyLoop for more detail.
8190
task atomic.Pointer[proto.Task]
8291
logger *zap.Logger
8392
ctx context.Context
@@ -256,16 +265,16 @@ func (e *BaseTaskExecutor) Run() {
256265
}
257266

258267
if !bytes.Equal(oldTask.Meta, newTask.Meta) {
259-
e.logger.Info("task meta modified",
268+
e.logger.Info("task meta modification applied",
260269
zap.String("oldStep", proto.Step2Str(oldTask.Type, oldTask.Step)),
261270
zap.String("newStep", proto.Step2Str(newTask.Type, newTask.Step)))
262271
// when task switch to next step, task meta might change too, but in
263272
// this case step executor will be recreated with new concurrency and
264273
// meta, so we only notify it when it's still running the same step.
265274
if e.stepExec != nil && e.stepExec.GetStep() == newTask.Step {
266275
e.logger.Info("notify step executor to update task meta")
267-
if err2 := e.stepExec.TaskMetaModified(newTask); err2 != nil {
268-
e.logger.Info("notify step executor failed, will recreate it", zap.Error(err2))
276+
if err2 := e.stepExec.TaskMetaModified(e.stepCtx, newTask.Meta); err2 != nil {
277+
e.logger.Info("notify step executor failed, will try recreate it later", zap.Error(err2))
269278
e.cleanStepExecutor()
270279
continue
271280
}
@@ -277,7 +286,7 @@ func (e *BaseTaskExecutor) Run() {
277286
zap.Int("old", oldTask.Concurrency), zap.Int("new", newTask.Concurrency))
278287
return
279288
}
280-
e.logger.Info("task concurrency modified",
289+
e.logger.Info("task concurrency modification applied",
281290
zap.Int("old", oldTask.Concurrency), zap.Int("new", newTask.Concurrency),
282291
zap.Int("availableSlots", e.slotMgr.availableSlots()))
283292
newResource := e.nodeRc.getStepResource(newTask.Concurrency)
@@ -440,6 +449,9 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
440449
e.updateSubtaskSummaryLoop(checkCtx, subtaskCtx, e.stepExec)
441450
})
442451
}
452+
wg.RunWithLog(func() {
453+
e.detectAndHandleParamModifyLoop(checkCtx)
454+
})
443455
defer func() {
444456
checkCancel()
445457
wg.Wait()
@@ -469,6 +481,125 @@ func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor)
469481
return ok && stepExecutor.RealtimeSummary() != nil
470482
}
471483

484+
// there are 2 places that will detect task param modification:
485+
// - Run loop to make 'modifies' apply to all later subtasks
486+
// - this loop to try to make 'modifies' apply to current running subtask
487+
//
488+
// for a single step executor, successfully applied 'modifies' will not be applied
489+
// again, failed ones will be retried in this loop. To achieve this, we will update
490+
// the task inside BaseTaskExecutor to reflect the 'modifies' that have applied
491+
// successfully. the 'modifies' that failed to apply in this loop will be retried
492+
// in the Run loop.
493+
func (e *BaseTaskExecutor) detectAndHandleParamModifyLoop(ctx context.Context) {
494+
ticker := time.NewTicker(DetectParamModifyInterval)
495+
defer ticker.Stop()
496+
for {
497+
select {
498+
case <-ctx.Done():
499+
return
500+
case <-ticker.C:
501+
}
502+
503+
err := e.detectAndHandleParamModify(ctx)
504+
if err != nil {
505+
if ctx.Err() != nil {
506+
return
507+
}
508+
e.logger.Warn("failed to detect and handle param modification",
509+
zap.Int64("currSubtaskID", e.currSubtaskID.Load()), zap.Error(err))
510+
}
511+
}
512+
}
513+
514+
func (e *BaseTaskExecutor) detectAndHandleParamModify(ctx context.Context) error {
515+
oldTask := e.task.Load()
516+
latestTask, err := e.taskTable.GetTaskByID(ctx, oldTask.ID)
517+
if err != nil {
518+
return err
519+
}
520+
521+
metaModified := !bytes.Equal(latestTask.Meta, oldTask.Meta)
522+
if latestTask.Concurrency == oldTask.Concurrency && !metaModified {
523+
return nil
524+
}
525+
526+
e.logger.Info("task param modification detected",
527+
zap.Int64("currSubtaskID", e.currSubtaskID.Load()),
528+
zap.Bool("metaModified", metaModified),
529+
zap.Int("oldConcurrency", oldTask.Concurrency),
530+
zap.Int("newConcurrency", latestTask.Concurrency))
531+
532+
// we don't report error here, as we might fail to modify task concurrency due
533+
// to not enough slots, we still need try to apply meta modification.
534+
e.tryModifyTaskConcurrency(ctx, oldTask, latestTask)
535+
if metaModified {
536+
if err := e.stepExec.TaskMetaModified(ctx, latestTask.Meta); err != nil {
537+
return errors.Annotate(err, "failed to apply task param modification")
538+
}
539+
e.metaModifyApplied(latestTask.Meta)
540+
}
541+
return nil
542+
}
543+
544+
func (e *BaseTaskExecutor) tryModifyTaskConcurrency(ctx context.Context, oldTask, latestTask *proto.Task) {
545+
logger := e.logger.With(zap.Int64("currSubtaskID", e.currSubtaskID.Load()),
546+
zap.Int("old", oldTask.Concurrency), zap.Int("new", latestTask.Concurrency))
547+
if latestTask.Concurrency < oldTask.Concurrency {
548+
// we need try to release the resource first, then free slots, to avoid
549+
// OOM when manager starts other task executor and start to allocate memory
550+
// immediately.
551+
newResource := e.nodeRc.getStepResource(latestTask.Concurrency)
552+
if err := e.stepExec.ResourceModified(ctx, newResource); err != nil {
553+
logger.Warn("failed to reduce resource usage", zap.Error(err))
554+
return
555+
}
556+
if !e.slotMgr.exchange(&latestTask.TaskBase) {
557+
// we are returning resource back, should not happen
558+
logger.Warn("failed to free slots")
559+
intest.Assert(false, "failed to return slots")
560+
return
561+
}
562+
563+
// after application reduced memory usage, the garbage might not recycle
564+
// in time, so we trigger GC here.
565+
//nolint: revive
566+
runtime.GC()
567+
e.concurrencyModifyApplied(latestTask.Concurrency)
568+
} else if latestTask.Concurrency > oldTask.Concurrency {
569+
exchanged := e.slotMgr.exchange(&latestTask.TaskBase)
570+
if !exchanged {
571+
logger.Info("failed to exchange slots", zap.Int("availableSlots", e.slotMgr.availableSlots()))
572+
return
573+
}
574+
newResource := e.nodeRc.getStepResource(latestTask.Concurrency)
575+
if err := e.stepExec.ResourceModified(ctx, newResource); err != nil {
576+
exchanged := e.slotMgr.exchange(&oldTask.TaskBase)
577+
intest.Assert(exchanged, "failed to return slots")
578+
logger.Warn("failed to increase resource usage, return slots back", zap.Error(err),
579+
zap.Int("availableSlots", e.slotMgr.availableSlots()), zap.Bool("exchanged", exchanged))
580+
return
581+
}
582+
583+
e.concurrencyModifyApplied(latestTask.Concurrency)
584+
}
585+
}
586+
587+
func (e *BaseTaskExecutor) concurrencyModifyApplied(newConcurrency int) {
588+
clone := *e.task.Load()
589+
e.logger.Info("task concurrency modification applied",
590+
zap.Int64("currSubtaskID", e.currSubtaskID.Load()), zap.Int("old", clone.Concurrency),
591+
zap.Int("new", newConcurrency), zap.Int("availableSlots", e.slotMgr.availableSlots()))
592+
clone.Concurrency = newConcurrency
593+
e.task.Store(&clone)
594+
}
595+
596+
func (e *BaseTaskExecutor) metaModifyApplied(newMeta []byte) {
597+
e.logger.Info("task meta modification applied", zap.Int64("currSubtaskID", e.currSubtaskID.Load()))
598+
clone := *e.task.Load()
599+
clone.Meta = newMeta
600+
e.task.Store(&clone)
601+
}
602+
472603
// GetTaskBase implements TaskExecutor.GetTaskBase.
473604
func (e *BaseTaskExecutor) GetTaskBase() *proto.TaskBase {
474605
task := e.task.Load()

0 commit comments

Comments
 (0)