diff --git a/pkg/ttl/ttlworker/BUILD.bazel b/pkg/ttl/ttlworker/BUILD.bazel index df18effc77bec..59aa1033e5e5c 100644 --- a/pkg/ttl/ttlworker/BUILD.bazel +++ b/pkg/ttl/ttlworker/BUILD.bazel @@ -67,6 +67,7 @@ go_test( "task_manager_test.go", "timer_sync_test.go", "timer_test.go", + "worker_test.go", ], embed = [":ttlworker"], flaky = True, @@ -100,6 +101,7 @@ go_test( "//pkg/util/chunk", "//pkg/util/logutil", "//pkg/util/mock", + "//pkg/util/timeutil", "@com_github_google_uuid//:uuid", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", diff --git a/pkg/ttl/ttlworker/scan.go b/pkg/ttl/ttlworker/scan.go index 9a01009e2f15b..d28f62b8734a3 100644 --- a/pkg/ttl/ttlworker/scan.go +++ b/pkg/ttl/ttlworker/scan.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) @@ -41,6 +42,9 @@ var ( taskMaxErrorRate = 0.4 ) +// TTLScanPostScanHookForTest is used to hook the cancel of the TTL scan task. It's only used in tests. +type TTLScanPostScanHookForTest struct{} + type ttlStatistics struct { TotalRows atomic.Uint64 SuccessRows atomic.Uint64 @@ -380,6 +384,10 @@ func (w *ttlScanWorker) handleScanTask(tracer *metrics.PhaseTracer, task *ttlSca result = task.result(nil) } + if intest.InTest && ctx.Value(TTLScanPostScanHookForTest{}) != nil { + ctx.Value(TTLScanPostScanHookForTest{}).(func())() + } + w.baseWorker.Lock() w.curTaskResult = result w.baseWorker.Unlock() diff --git a/pkg/ttl/ttlworker/task_manager.go b/pkg/ttl/ttlworker/task_manager.go index fafd97247f608..cf4e531008eed 100644 --- a/pkg/ttl/ttlworker/task_manager.go +++ b/pkg/ttl/ttlworker/task_manager.go @@ -160,6 +160,8 @@ func (m *taskManager) resizeScanWorkers(count int) error { jobID = curTask.JobID scanID = curTask.ScanID scanErr = errors.New("timeout to cancel scan task") + + result = curTask.result(scanErr) } task := findTaskWithID(m.runningTasks, jobID, scanID) diff --git a/pkg/ttl/ttlworker/task_manager_integration_test.go b/pkg/ttl/ttlworker/task_manager_integration_test.go index 93ab32d1d3875..e88332198187e 100644 --- a/pkg/ttl/ttlworker/task_manager_integration_test.go +++ b/pkg/ttl/ttlworker/task_manager_integration_test.go @@ -356,3 +356,72 @@ func TestMeetTTLRunningTasks(t *testing.T) { require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusWaiting)) require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusRunning)) } + +func TestShrinkScanWorkerTimeout(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + pool := wrapPoolForTest(dom.SysSessionPool()) + defer pool.AssertNoSessionInUse(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + tk.MustExec("set global tidb_ttl_running_tasks = 32") + + tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") + testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + for id := 0; id < 4; id++ { + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id) + tk.MustExec(sql) + } + + se := sessionFactory() + now := se.Now() + + isc := cache.NewInfoSchemaCache(time.Minute) + require.NoError(t, isc.Update(se)) + m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store) + workers := []ttlworker.Worker{} + for j := 0; j < 4; j++ { + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + workers = append(workers, scanWorker) + } + + startBlockNotifyCh := make(chan struct{}) + blockCancelCh := make(chan struct{}) + workers[0].(ttlworker.WorkerTestExt).SetCtx(func(ctx context.Context) context.Context { + return context.WithValue(ctx, ttlworker.TTLScanPostScanHookForTest{}, func() { + startBlockNotifyCh <- struct{}{} + <-blockCancelCh + }) + }) + m.SetScanWorkers4Test(workers) + + m.RescheduleTasks(se, now) + require.Len(t, m.GetRunningTasks(), 4) + tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4")) + <-startBlockNotifyCh + + // shrink scan workers, one of them will timeout + require.Error(t, m.ResizeScanWorkers(0)) + require.Len(t, m.GetScanWorkers(), 0) + + // the canceled 3 tasks are still running, but they have results, so after `CheckFinishedTask`, it should be finished + tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4")) + m.CheckFinishedTask(se, now) + require.Len(t, m.GetRunningTasks(), 0) + // now, the task should be finished + tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("0")) + // the first task will be finished with "timeout to cancel scan task" + // other tasks will finish with table not found because we didn't mock the table in this test. + tk.MustQuery("SELECT scan_id, json_extract(state, '$.scan_task_err') from mysql.tidb_ttl_task").Sort().Check(testkit.Rows( + "0 \"timeout to cancel scan task\"", + "1 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", + "2 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", + "3 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", + )) + + require.NoError(t, m.ResizeDelWorkers(0)) + close(blockCancelCh) +} diff --git a/pkg/ttl/ttlworker/task_manager_test.go b/pkg/ttl/ttlworker/task_manager_test.go index 064c68f86ed40..2ff3bb212a043 100644 --- a/pkg/ttl/ttlworker/task_manager_test.go +++ b/pkg/ttl/ttlworker/task_manager_test.go @@ -56,6 +56,16 @@ func (m *taskManager) RescheduleTasks(se session.Session, now time.Time) { m.rescheduleTasks(se, now) } +// ResizeScanWorkers is an exported version of resizeScanWorkers +func (m *taskManager) ResizeScanWorkers(count int) error { + return m.resizeScanWorkers(count) +} + +// ResizeDelWorkers is an exported version of resizeDeleteWorkers +func (m *taskManager) ResizeDelWorkers(count int) error { + return m.resizeDelWorkers(count) +} + // ReportMetrics is an exported version of reportMetrics func (m *taskManager) ReportMetrics() { m.reportMetrics() @@ -81,6 +91,11 @@ func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task return m.reportTaskFinished(se, now, task) } +// GetScanWorkers returns the scan workers of the task manager. +func (m *taskManager) GetScanWorkers() []worker { + return m.scanWorkers +} + // SetResult sets the result of the task func (t *runningScanTask) SetResult(err error) { t.result = t.ttlScanTask.result(err) diff --git a/pkg/ttl/ttlworker/timer_test.go b/pkg/ttl/ttlworker/timer_test.go index 12bedc8e0cf41..b0400bc53f7a7 100644 --- a/pkg/ttl/ttlworker/timer_test.go +++ b/pkg/ttl/ttlworker/timer_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" timerapi "github.com/pingcap/tidb/pkg/timer/api" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/timeutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -350,8 +351,9 @@ func TestTTLTimerHookOnEvent(t *testing.T) { require.Equal(t, summaryData, timer.SummaryData) adapter.AssertExpectations(t) + tz := timeutil.SystemLocation() // job not exists but table ttl not enabled - watermark := time.Unix(3600*123, 0) + watermark := time.Unix(3600*123, 0).In(tz) require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark))) timer = triggerTestTimer(t, store, timer.ID) adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). @@ -373,7 +375,7 @@ func TestTTLTimerHookOnEvent(t *testing.T) { require.Equal(t, oldSummary, timer.SummaryData) // job not exists but timer disabled - watermark = time.Unix(3600*456, 0) + watermark = time.Unix(3600*456, 0).In(tz) require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark), timerapi.WithSetEnable(false))) timer = triggerTestTimer(t, store, timer.ID) adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). @@ -394,7 +396,7 @@ func TestTTLTimerHookOnEvent(t *testing.T) { require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetEnable(true))) // job not exists but event start too early - watermark = time.Unix(3600*789, 0) + watermark = time.Unix(3600*789, 0).In(tz) require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark))) timer = triggerTestTimer(t, store, timer.ID) adapter.On("Now").Return(timer.EventStart.Add(11*time.Minute), nil).Once() diff --git a/pkg/ttl/ttlworker/worker_test.go b/pkg/ttl/ttlworker/worker_test.go new file mode 100644 index 0000000000000..0291cd538c24c --- /dev/null +++ b/pkg/ttl/ttlworker/worker_test.go @@ -0,0 +1,29 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import "context" + +// WorkerTestExt is the extension interface for worker in test. +type WorkerTestExt interface { + SetCtx(f func(ctx context.Context) context.Context) +} + +var _ WorkerTestExt = &baseWorker{} + +// SetCtx modifies the context of the worker. +func (w *baseWorker) SetCtx(f func(ctx context.Context) context.Context) { + w.ctx = f(w.ctx) +}