Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/disttask/framework/integrationtests/modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestModifyTaskConcurrency(t *testing.T) {
executor.EXPECT().SetResource(gomock.Any()).AnyTimes()
executor.EXPECT().Cleanup(gomock.Any()).Return(nil).AnyTimes()
executor.EXPECT().RealtimeSummary().Return(nil).AnyTimes()
executor.EXPECT().TaskMetaModified(gomock.Any()).DoAndReturn(func(newTask *proto.Task) error {
executor.EXPECT().TaskMetaModified(gomock.Any(), gomock.Any()).DoAndReturn(func(newTask *proto.Task) error {
runtimeInfo.currentTask = newTask
return nil
}).AnyTimes()
Expand Down
22 changes: 18 additions & 4 deletions pkg/disttask/framework/mock/execute/execute_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/disttask/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/mock/execute",
Expand Down
7 changes: 6 additions & 1 deletion pkg/disttask/framework/taskexecutor/execute/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ type StepExecutor interface {
// TaskMetaModified is called when the task meta is modified, if any error
// happen, framework might recreate the step executor, so don't put code
// that's prone to error in it.
TaskMetaModified(newTask *proto.Task) error
TaskMetaModified(ctx context.Context, newMeta []byte) error
// ResourceModified is called when the resource allowed to be used is modified,
// application must make sure the resource in use conforms to the new resource
// before returning. When reducing resources, the framework depends on this
// to make sure current instance won't OOM.
ResourceModified(ctx context.Context, newResource *proto.StepResource) error
}

// SubtaskSummary contains the summary of a subtask.
Expand Down
9 changes: 7 additions & 2 deletions pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func (*BaseStepExecutor) Cleanup(context.Context) error {
}

// TaskMetaModified implements the StepExecutor interface.
func (*BaseStepExecutor) TaskMetaModified(*proto.Task) error {
return nil
func (*BaseStepExecutor) TaskMetaModified(context.Context, []byte) error {
panic("not implemented")
}

// ResourceModified implements the StepExecutor interface.
func (*BaseStepExecutor) ResourceModified(context.Context, *proto.StepResource) error {
panic("not implemented")
}
139 changes: 135 additions & 4 deletions pkg/disttask/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package taskexecutor
import (
"bytes"
"context"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -46,6 +47,10 @@ var (
// updateSubtaskSummaryInterval is the interval for updating the subtask summary to
// subtask table.
updateSubtaskSummaryInterval = 3 * time.Second
// DetectParamModifyInterval is the interval to detect whether task params
// are modified.
// exported for testing.
DetectParamModifyInterval = 5 * time.Second
)

var (
Expand Down Expand Up @@ -78,6 +83,10 @@ func NewParamForTest(taskTable TaskTable, slotMgr *slotManager, nodeRc *NodeReso
// BaseTaskExecutor is the base implementation of TaskExecutor.
type BaseTaskExecutor struct {
Param
// task is a local state that periodically aligned with what's saved in system
// table, but if the task has modified params, it might be updated in memory
// to reflect that some param modification have been applied successfully,
// see detectAndHandleParamModifyLoop for more detail.
task atomic.Pointer[proto.Task]
logger *zap.Logger
ctx context.Context
Expand Down Expand Up @@ -256,16 +265,16 @@ func (e *BaseTaskExecutor) Run() {
}

if !bytes.Equal(oldTask.Meta, newTask.Meta) {
e.logger.Info("task meta modified",
e.logger.Info("task meta modification applied",
zap.String("oldStep", proto.Step2Str(oldTask.Type, oldTask.Step)),
zap.String("newStep", proto.Step2Str(newTask.Type, newTask.Step)))
// when task switch to next step, task meta might change too, but in
// this case step executor will be recreated with new concurrency and
// meta, so we only notify it when it's still running the same step.
if e.stepExec != nil && e.stepExec.GetStep() == newTask.Step {
e.logger.Info("notify step executor to update task meta")
if err2 := e.stepExec.TaskMetaModified(newTask); err2 != nil {
e.logger.Info("notify step executor failed, will recreate it", zap.Error(err2))
if err2 := e.stepExec.TaskMetaModified(e.stepCtx, newTask.Meta); err2 != nil {
e.logger.Info("notify step executor failed, will try recreate it later", zap.Error(err2))
e.cleanStepExecutor()
continue
}
Expand All @@ -277,7 +286,7 @@ func (e *BaseTaskExecutor) Run() {
zap.Int("old", oldTask.Concurrency), zap.Int("new", newTask.Concurrency))
return
}
e.logger.Info("task concurrency modified",
e.logger.Info("task concurrency modification applied",
zap.Int("old", oldTask.Concurrency), zap.Int("new", newTask.Concurrency),
zap.Int("availableSlots", e.slotMgr.availableSlots()))
newResource := e.nodeRc.getStepResource(newTask.Concurrency)
Expand Down Expand Up @@ -440,6 +449,9 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
e.updateSubtaskSummaryLoop(checkCtx, subtaskCtx, e.stepExec)
})
}
wg.RunWithLog(func() {
e.detectAndHandleParamModifyLoop(checkCtx)
})
defer func() {
checkCancel()
wg.Wait()
Expand Down Expand Up @@ -469,6 +481,125 @@ func (e *BaseTaskExecutor) hasRealtimeSummary(stepExecutor execute.StepExecutor)
return ok && stepExecutor.RealtimeSummary() != nil
}

// there are 2 places that will detect task param modification:
// - Run loop to make 'modifies' apply to all later subtasks
// - this loop to try to make 'modifies' apply to current running subtask
//
// for a single step executor, successfully applied 'modifies' will not be applied
// again, failed ones will be retried in this loop. To achieve this, we will update
// the task inside BaseTaskExecutor to reflect the 'modifies' that have applied
// successfully. the 'modifies' that failed to apply in this loop will be retried
// in the Run loop.
func (e *BaseTaskExecutor) detectAndHandleParamModifyLoop(ctx context.Context) {
ticker := time.NewTicker(DetectParamModifyInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

err := e.detectAndHandleParamModify(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
e.logger.Warn("failed to detect and handle param modification",
zap.Int64("currSubtaskID", e.currSubtaskID.Load()), zap.Error(err))
}
}
}

func (e *BaseTaskExecutor) detectAndHandleParamModify(ctx context.Context) error {
oldTask := e.task.Load()
latestTask, err := e.taskTable.GetTaskByID(ctx, oldTask.ID)
if err != nil {
return err
}

metaModified := !bytes.Equal(latestTask.Meta, oldTask.Meta)
if latestTask.Concurrency == oldTask.Concurrency && !metaModified {
return nil
}

e.logger.Info("task param modification detected",
zap.Int64("currSubtaskID", e.currSubtaskID.Load()),
zap.Bool("metaModified", metaModified),
zap.Int("oldConcurrency", oldTask.Concurrency),
zap.Int("newConcurrency", latestTask.Concurrency))

// we don't report error here, as we might fail to modify task concurrency due
// to not enough slots, we still need try to apply meta modification.
e.tryModifyTaskConcurrency(ctx, oldTask, latestTask)
if metaModified {
if err := e.stepExec.TaskMetaModified(ctx, latestTask.Meta); err != nil {
return errors.Annotate(err, "failed to apply task param modification")
}
e.metaModifyApplied(latestTask.Meta)
}
return nil
}

func (e *BaseTaskExecutor) tryModifyTaskConcurrency(ctx context.Context, oldTask, latestTask *proto.Task) {
logger := e.logger.With(zap.Int64("currSubtaskID", e.currSubtaskID.Load()),
zap.Int("old", oldTask.Concurrency), zap.Int("new", latestTask.Concurrency))
if latestTask.Concurrency < oldTask.Concurrency {
// we need try to release the resource first, then free slots, to avoid
// OOM when manager starts other task executor and start to allocate memory
// immediately.
newResource := e.nodeRc.getStepResource(latestTask.Concurrency)
if err := e.stepExec.ResourceModified(ctx, newResource); err != nil {
logger.Warn("failed to reduce resource usage", zap.Error(err))
return
}
if !e.slotMgr.exchange(&latestTask.TaskBase) {
// we are returning resource back, should not happen
logger.Warn("failed to free slots")
intest.Assert(false, "failed to return slots")
return
}

// after application reduced memory usage, the garbage might not recycle
// in time, so we trigger GC here.
//nolint: revive
runtime.GC()
e.concurrencyModifyApplied(latestTask.Concurrency)
} else if latestTask.Concurrency > oldTask.Concurrency {
exchanged := e.slotMgr.exchange(&latestTask.TaskBase)
if !exchanged {
logger.Info("failed to exchange slots", zap.Int("availableSlots", e.slotMgr.availableSlots()))
return
}
newResource := e.nodeRc.getStepResource(latestTask.Concurrency)
if err := e.stepExec.ResourceModified(ctx, newResource); err != nil {
exchanged := e.slotMgr.exchange(&oldTask.TaskBase)
intest.Assert(exchanged, "failed to return slots")
logger.Warn("failed to increase resource usage, return slots back", zap.Error(err),
zap.Int("availableSlots", e.slotMgr.availableSlots()), zap.Bool("exchanged", exchanged))
return
}

e.concurrencyModifyApplied(latestTask.Concurrency)
}
}

func (e *BaseTaskExecutor) concurrencyModifyApplied(newConcurrency int) {
clone := *e.task.Load()
e.logger.Info("task concurrency modification applied",
zap.Int64("currSubtaskID", e.currSubtaskID.Load()), zap.Int("old", clone.Concurrency),
zap.Int("new", newConcurrency), zap.Int("availableSlots", e.slotMgr.availableSlots()))
clone.Concurrency = newConcurrency
e.task.Store(&clone)
}

func (e *BaseTaskExecutor) metaModifyApplied(newMeta []byte) {
e.logger.Info("task meta modification applied", zap.Int64("currSubtaskID", e.currSubtaskID.Load()))
clone := *e.task.Load()
clone.Meta = newMeta
e.task.Store(&clone)
}

// GetTaskBase implements TaskExecutor.GetTaskBase.
func (e *BaseTaskExecutor) GetTaskBase() *proto.TaskBase {
task := e.task.Load()
Expand Down
Loading