@@ -179,21 +179,21 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCa
179
179
}
180
180
181
181
func (e * BaseTaskExecutor ) updateSubtaskSummaryLoop (
182
- checkCtx , runStepCtx context.Context , stepExec execute.StepExecutor ) {
182
+ subtaskCtx context.Context , stepExec execute.StepExecutor ) {
183
183
taskMgr := e .taskTable .(* storage.TaskManager )
184
184
ticker := time .NewTicker (updateSubtaskSummaryInterval )
185
185
defer ticker .Stop ()
186
186
curSubtaskID := e .currSubtaskID .Load ()
187
187
update := func () {
188
188
summary := stepExec .RealtimeSummary ()
189
- err := taskMgr .UpdateSubtaskRowCount (runStepCtx , curSubtaskID , summary .RowCount )
189
+ err := taskMgr .UpdateSubtaskRowCount (subtaskCtx , curSubtaskID , summary .RowCount )
190
190
if err != nil {
191
191
e .logger .Info ("update subtask row count failed" , zap .Error (err ))
192
192
}
193
193
}
194
194
for {
195
195
select {
196
- case <- checkCtx .Done ():
196
+ case <- subtaskCtx .Done ():
197
197
update ()
198
198
return
199
199
case <- ticker .C :
@@ -434,18 +434,16 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
434
434
subtaskCtx , subtaskCtxCancel = context .WithCancel (e .stepCtx )
435
435
436
436
var wg util.WaitGroupWrapper
437
- checkCtx , checkCancel := context .WithCancel (subtaskCtx )
438
437
wg .RunWithLog (func () {
439
- e .checkBalanceSubtask (checkCtx , subtaskCtxCancel )
438
+ e .checkBalanceSubtask (subtaskCtx , subtaskCtxCancel )
440
439
})
441
440
442
441
if e .hasRealtimeSummary (e .stepExec ) {
443
442
wg .RunWithLog (func () {
444
- e .updateSubtaskSummaryLoop (checkCtx , subtaskCtx , e .stepExec )
443
+ e .updateSubtaskSummaryLoop (subtaskCtx , e .stepExec )
445
444
})
446
445
}
447
446
defer func () {
448
- checkCancel ()
449
447
wg .Wait ()
450
448
subtaskCtxCancel ()
451
449
}()
0 commit comments