Skip to content

Commit 4e49859

Browse files
authored
disttask: correct the usage of context (#48343)
close #48303
1 parent 9d07f83 commit 4e49859

File tree

4 files changed

+94
-85
lines changed

4 files changed

+94
-85
lines changed

pkg/disttask/framework/scheduler/manager.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,12 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
333333
return
334334
}
335335
scheduler := factory(ctx, m.id, task, m.taskTable)
336-
err := scheduler.Init(ctx)
336+
337+
taskCtx, taskCancel := context.WithCancelCause(ctx)
338+
m.registerCancelFunc(task.ID, taskCancel)
339+
defer taskCancel(nil)
340+
341+
err := scheduler.Init(taskCtx)
337342
if err != nil {
338343
m.logErrAndPersist(err, task.ID)
339344
return
@@ -380,14 +385,11 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
380385
}
381386
switch task.State {
382387
case proto.TaskStateRunning:
383-
runCtx, runCancel := context.WithCancelCause(ctx)
384-
m.registerCancelFunc(task.ID, runCancel)
385-
err = scheduler.Run(runCtx, task)
386-
runCancel(nil)
388+
err = scheduler.Run(taskCtx, task)
387389
case proto.TaskStatePausing:
388-
err = scheduler.Pause(ctx, task)
390+
err = scheduler.Pause(taskCtx, task)
389391
case proto.TaskStateReverting:
390-
err = scheduler.Rollback(ctx, task)
392+
err = scheduler.Rollback(taskCtx, task)
391393
}
392394
if err != nil {
393395
logutil.Logger(m.logCtx).Error("failed to handle task", zap.Error(err))

pkg/disttask/framework/scheduler/scheduler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ func (*BaseScheduler) Init(_ context.Context) error {
124124
func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) {
125125
defer func() {
126126
if r := recover(); r != nil {
127-
logutil.Logger(ctx).Error("BaseScheduler panicked", zap.Any("recover", r), zap.Stack("stack"))
127+
logutil.Logger(s.logCtx).Error("BaseScheduler panicked", zap.Any("recover", r), zap.Stack("stack"))
128128
err4Panic := errors.Errorf("%v", r)
129-
err1 := s.updateErrorToSubtask(ctx, task.ID, err4Panic)
129+
err1 := s.updateErrorToSubtask(task.ID, err4Panic)
130130
if err == nil {
131131
err = err1
132132
}
@@ -139,7 +139,7 @@ func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) {
139139
if err == nil {
140140
return nil
141141
}
142-
return s.updateErrorToSubtask(ctx, task.ID, err)
142+
return s.updateErrorToSubtask(task.ID, err)
143143
}
144144

145145
func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) (resErr error) {
@@ -622,11 +622,11 @@ func (s *BaseScheduler) markSubTaskCanceledOrFailed(ctx context.Context, subtask
622622
return false
623623
}
624624

625-
func (s *BaseScheduler) updateErrorToSubtask(ctx context.Context, taskID int64, err error) error {
625+
func (s *BaseScheduler) updateErrorToSubtask(taskID int64, err error) error {
626626
logger := logutil.Logger(s.logCtx)
627627
backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval)
628-
err1 := handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger,
629-
func(ctx context.Context) (bool, error) {
628+
err1 := handle.RunWithRetry(s.logCtx, dispatcher.RetrySQLTimes, backoffer, logger,
629+
func(_ context.Context) (bool, error) {
630630
return true, s.taskTable.UpdateErrorToSubtask(s.id, taskID, err)
631631
},
632632
)

tests/realtikvtest/addindextest1/BUILD.bazel

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
22

33
go_test(
44
name = "addindextest1_test",
5-
timeout = "short",
5+
timeout = "long",
66
srcs = [
77
"disttask_test.go",
88
"main_test.go",
@@ -11,6 +11,10 @@ go_test(
1111
deps = [
1212
"//pkg/config",
1313
"//pkg/ddl",
14+
"//pkg/ddl/util/callback",
15+
"//pkg/disttask/framework/dispatcher",
16+
"//pkg/disttask/framework/proto",
17+
"//pkg/parser/model",
1418
"//pkg/testkit",
1519
"//tests/realtikvtest",
1620
"@com_github_pingcap_failpoint//:failpoint",

tests/realtikvtest/addindextest1/disttask_test.go

Lines changed: 74 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import (
2020
"github.com/pingcap/failpoint"
2121
"github.com/pingcap/tidb/pkg/config"
2222
"github.com/pingcap/tidb/pkg/ddl"
23+
"github.com/pingcap/tidb/pkg/ddl/util/callback"
24+
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
25+
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
26+
"github.com/pingcap/tidb/pkg/parser/model"
2327
"github.com/pingcap/tidb/pkg/testkit"
2428
"github.com/pingcap/tidb/tests/realtikvtest"
2529
"github.com/stretchr/testify/require"
@@ -118,74 +122,73 @@ func TestAddIndexDistCancel(t *testing.T) {
118122
tk.MustExec(`set global tidb_enable_dist_task=0;`)
119123
}
120124

121-
// TODO: flaky test which can't find the root cause, will run it later.
122-
// func TestAddIndexDistPauseAndResume(t *testing.T) {
123-
// store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
124-
// if store.Name() != "TiKV" {
125-
// t.Skip("TiKV store only")
126-
// }
127-
128-
// tk := testkit.NewTestKit(t, store)
129-
// tk1 := testkit.NewTestKit(t, store)
130-
// tk.MustExec("drop database if exists test;")
131-
// tk.MustExec("create database test;")
132-
// tk.MustExec("use test;")
133-
134-
// tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 8;")
135-
// tk.MustExec("insert into t values (), (), (), (), (), ()")
136-
// tk.MustExec("insert into t values (), (), (), (), (), ()")
137-
// tk.MustExec("insert into t values (), (), (), (), (), ()")
138-
// tk.MustExec("insert into t values (), (), (), (), (), ()")
139-
// tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;")
140-
141-
// ddl.MockDMLExecutionAddIndexSubTaskFinish = func() {
142-
// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
143-
// require.Equal(t, 1, len(row))
144-
// jobID := row[0][0].(string)
145-
// tk1.MustExec("admin pause ddl jobs " + jobID)
146-
// <-ddl.TestSyncChan
147-
// }
148-
149-
// dispatcher.MockDMLExecutionOnPausedState = func(task *proto.Task) {
150-
// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
151-
// require.Equal(t, 1, len(row))
152-
// jobID := row[0][0].(string)
153-
// tk1.MustExec("admin resume ddl jobs " + jobID)
154-
// }
155-
156-
// ddl.MockDMLExecutionOnTaskFinished = func() {
157-
// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
158-
// require.Equal(t, 1, len(row))
159-
// jobID := row[0][0].(string)
160-
// tk1.MustExec("admin pause ddl jobs " + jobID)
161-
// }
162-
163-
// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", "3*return(true)"))
164-
// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState", "return(true)"))
165-
// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause", "return()"))
166-
// tk.MustExec(`set global tidb_enable_dist_task=1;`)
167-
// tk.MustExec("alter table t add index idx1(a);")
168-
// tk.MustExec("admin check table t;")
169-
// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish"))
170-
// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState"))
171-
172-
// // dist task succeed, job paused and resumed.
173-
// var hook = &callback.TestDDLCallback{Do: dom}
174-
// var resumeFunc = func(job *model.Job) {
175-
// if job.IsPaused() {
176-
// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
177-
// require.Equal(t, 1, len(row))
178-
// jobID := row[0][0].(string)
179-
// tk1.MustExec("admin resume ddl jobs " + jobID)
180-
// }
181-
// }
182-
// hook.OnJobUpdatedExported.Store(&resumeFunc)
183-
// dom.DDL().SetHook(hook.Clone())
184-
// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished", "1*return(true)"))
185-
// tk.MustExec("alter table t add index idx3(a);")
186-
// tk.MustExec("admin check table t;")
187-
// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished"))
188-
// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause"))
189-
190-
// tk.MustExec(`set global tidb_enable_dist_task=0;`)
191-
// }
125+
func TestAddIndexDistPauseAndResume(t *testing.T) {
126+
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
127+
if store.Name() != "TiKV" {
128+
t.Skip("TiKV store only")
129+
}
130+
131+
tk := testkit.NewTestKit(t, store)
132+
tk1 := testkit.NewTestKit(t, store)
133+
tk.MustExec("drop database if exists test;")
134+
tk.MustExec("create database test;")
135+
tk.MustExec("use test;")
136+
137+
tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 8;")
138+
tk.MustExec("insert into t values (), (), (), (), (), ()")
139+
tk.MustExec("insert into t values (), (), (), (), (), ()")
140+
tk.MustExec("insert into t values (), (), (), (), (), ()")
141+
tk.MustExec("insert into t values (), (), (), (), (), ()")
142+
tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;")
143+
144+
ddl.MockDMLExecutionAddIndexSubTaskFinish = func() {
145+
row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
146+
require.Equal(t, 1, len(row))
147+
jobID := row[0][0].(string)
148+
tk1.MustExec("admin pause ddl jobs " + jobID)
149+
<-ddl.TestSyncChan
150+
}
151+
152+
dispatcher.MockDMLExecutionOnPausedState = func(task *proto.Task) {
153+
row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
154+
require.Equal(t, 1, len(row))
155+
jobID := row[0][0].(string)
156+
tk1.MustExec("admin resume ddl jobs " + jobID)
157+
}
158+
159+
ddl.MockDMLExecutionOnTaskFinished = func() {
160+
row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
161+
require.Equal(t, 1, len(row))
162+
jobID := row[0][0].(string)
163+
tk1.MustExec("admin pause ddl jobs " + jobID)
164+
}
165+
166+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", "3*return(true)"))
167+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState", "return(true)"))
168+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause", "return()"))
169+
tk.MustExec(`set global tidb_enable_dist_task=1;`)
170+
tk.MustExec("alter table t add index idx1(a);")
171+
tk.MustExec("admin check table t;")
172+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish"))
173+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState"))
174+
175+
// dist task succeed, job paused and resumed.
176+
var hook = &callback.TestDDLCallback{Do: dom}
177+
var resumeFunc = func(job *model.Job) {
178+
if job.IsPaused() {
179+
row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows()
180+
require.Equal(t, 1, len(row))
181+
jobID := row[0][0].(string)
182+
tk1.MustExec("admin resume ddl jobs " + jobID)
183+
}
184+
}
185+
hook.OnJobUpdatedExported.Store(&resumeFunc)
186+
dom.DDL().SetHook(hook.Clone())
187+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished", "1*return(true)"))
188+
tk.MustExec("alter table t add index idx3(a);")
189+
tk.MustExec("admin check table t;")
190+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished"))
191+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause"))
192+
193+
tk.MustExec(`set global tidb_enable_dist_task=0;`)
194+
}

0 commit comments

Comments
 (0)