Skip to content

Commit b454755

Browse files
authored
ttl: set a result for timeout scan task during shrinking scan worker (#57718) (#57828)
close #57708
1 parent 3ac9806 commit b454755

File tree

7 files changed

+130
-3
lines changed

7 files changed

+130
-3
lines changed

pkg/ttl/ttlworker/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ go_test(
6767
"task_manager_test.go",
6868
"timer_sync_test.go",
6969
"timer_test.go",
70+
"worker_test.go",
7071
],
7172
embed = [":ttlworker"],
7273
flaky = True,
@@ -100,6 +101,7 @@ go_test(
100101
"//pkg/util/chunk",
101102
"//pkg/util/logutil",
102103
"//pkg/util/mock",
104+
"//pkg/util/timeutil",
103105
"@com_github_google_uuid//:uuid",
104106
"@com_github_ngaut_pools//:pools",
105107
"@com_github_pingcap_errors//:errors",

pkg/ttl/ttlworker/scan.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/pingcap/tidb/pkg/types"
3131
"github.com/pingcap/tidb/pkg/util"
3232
"github.com/pingcap/tidb/pkg/util/chunk"
33+
"github.com/pingcap/tidb/pkg/util/intest"
3334
"github.com/pingcap/tidb/pkg/util/logutil"
3435
"go.uber.org/zap"
3536
)
@@ -41,6 +42,9 @@ var (
4142
taskMaxErrorRate = 0.4
4243
)
4344

45+
// TTLScanPostScanHookForTest is used to hook the cancel of the TTL scan task. It's only used in tests.
46+
type TTLScanPostScanHookForTest struct{}
47+
4448
type ttlStatistics struct {
4549
TotalRows atomic.Uint64
4650
SuccessRows atomic.Uint64
@@ -380,6 +384,10 @@ func (w *ttlScanWorker) handleScanTask(tracer *metrics.PhaseTracer, task *ttlSca
380384
result = task.result(nil)
381385
}
382386

387+
if intest.InTest && ctx.Value(TTLScanPostScanHookForTest{}) != nil {
388+
ctx.Value(TTLScanPostScanHookForTest{}).(func())()
389+
}
390+
383391
w.baseWorker.Lock()
384392
w.curTaskResult = result
385393
w.baseWorker.Unlock()

pkg/ttl/ttlworker/task_manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ func (m *taskManager) resizeScanWorkers(count int) error {
160160
jobID = curTask.JobID
161161
scanID = curTask.ScanID
162162
scanErr = errors.New("timeout to cancel scan task")
163+
164+
result = curTask.result(scanErr)
163165
}
164166

165167
task := findTaskWithID(m.runningTasks, jobID, scanID)

pkg/ttl/ttlworker/task_manager_integration_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,3 +356,72 @@ func TestMeetTTLRunningTasks(t *testing.T) {
356356
require.False(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusWaiting))
357357
require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusRunning))
358358
}
359+
360+
func TestShrinkScanWorkerTimeout(t *testing.T) {
361+
store, dom := testkit.CreateMockStoreAndDomain(t)
362+
pool := wrapPoolForTest(dom.SysSessionPool())
363+
defer pool.AssertNoSessionInUse(t)
364+
waitAndStopTTLManager(t, dom)
365+
tk := testkit.NewTestKit(t, store)
366+
sessionFactory := sessionFactory(t, store)
367+
368+
tk.MustExec("set global tidb_ttl_running_tasks = 32")
369+
370+
tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day")
371+
testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
372+
require.NoError(t, err)
373+
for id := 0; id < 4; id++ {
374+
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id)
375+
tk.MustExec(sql)
376+
}
377+
378+
se := sessionFactory()
379+
now := se.Now()
380+
381+
isc := cache.NewInfoSchemaCache(time.Minute)
382+
require.NoError(t, isc.Update(se))
383+
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-1", store)
384+
workers := []ttlworker.Worker{}
385+
for j := 0; j < 4; j++ {
386+
scanWorker := ttlworker.NewMockScanWorker(t)
387+
scanWorker.Start()
388+
workers = append(workers, scanWorker)
389+
}
390+
391+
startBlockNotifyCh := make(chan struct{})
392+
blockCancelCh := make(chan struct{})
393+
workers[0].(ttlworker.WorkerTestExt).SetCtx(func(ctx context.Context) context.Context {
394+
return context.WithValue(ctx, ttlworker.TTLScanPostScanHookForTest{}, func() {
395+
startBlockNotifyCh <- struct{}{}
396+
<-blockCancelCh
397+
})
398+
})
399+
m.SetScanWorkers4Test(workers)
400+
401+
m.RescheduleTasks(se, now)
402+
require.Len(t, m.GetRunningTasks(), 4)
403+
tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4"))
404+
<-startBlockNotifyCh
405+
406+
// shrink scan workers, one of them will timeout
407+
require.Error(t, m.ResizeScanWorkers(0))
408+
require.Len(t, m.GetScanWorkers(), 0)
409+
410+
// the canceled 3 tasks are still running, but they have results, so after `CheckFinishedTask`, it should be finished
411+
tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4"))
412+
m.CheckFinishedTask(se, now)
413+
require.Len(t, m.GetRunningTasks(), 0)
414+
// now, the task should be finished
415+
tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("0"))
416+
// the first task will be finished with "timeout to cancel scan task"
417+
// other tasks will finish with table not found because we didn't mock the table in this test.
418+
tk.MustQuery("SELECT scan_id, json_extract(state, '$.scan_task_err') from mysql.tidb_ttl_task").Sort().Check(testkit.Rows(
419+
"0 \"timeout to cancel scan task\"",
420+
"1 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"",
421+
"2 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"",
422+
"3 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"",
423+
))
424+
425+
require.NoError(t, m.ResizeDelWorkers(0))
426+
close(blockCancelCh)
427+
}

pkg/ttl/ttlworker/task_manager_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ func (m *taskManager) RescheduleTasks(se session.Session, now time.Time) {
5656
m.rescheduleTasks(se, now)
5757
}
5858

59+
// ResizeScanWorkers is an exported version of resizeScanWorkers
60+
func (m *taskManager) ResizeScanWorkers(count int) error {
61+
return m.resizeScanWorkers(count)
62+
}
63+
64+
// ResizeDelWorkers is an exported version of resizeDeleteWorkers
65+
func (m *taskManager) ResizeDelWorkers(count int) error {
66+
return m.resizeDelWorkers(count)
67+
}
68+
5969
// ReportMetrics is an exported version of reportMetrics
6070
func (m *taskManager) ReportMetrics() {
6171
m.reportMetrics()
@@ -81,6 +91,11 @@ func (m *taskManager) ReportTaskFinished(se session.Session, now time.Time, task
8191
return m.reportTaskFinished(se, now, task)
8292
}
8393

94+
// GetScanWorkers returns the scan workers of the task manager.
95+
func (m *taskManager) GetScanWorkers() []worker {
96+
return m.scanWorkers
97+
}
98+
8499
// SetResult sets the result of the task
85100
func (t *runningScanTask) SetResult(err error) {
86101
t.result = t.ttlScanTask.result(err)

pkg/ttl/ttlworker/timer_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2828
timerapi "github.com/pingcap/tidb/pkg/timer/api"
2929
"github.com/pingcap/tidb/pkg/util/logutil"
30+
"github.com/pingcap/tidb/pkg/util/timeutil"
3031
"github.com/stretchr/testify/mock"
3132
"github.com/stretchr/testify/require"
3233
)
@@ -350,8 +351,9 @@ func TestTTLTimerHookOnEvent(t *testing.T) {
350351
require.Equal(t, summaryData, timer.SummaryData)
351352
adapter.AssertExpectations(t)
352353

354+
tz := timeutil.SystemLocation()
353355
// job not exists but table ttl not enabled
354-
watermark := time.Unix(3600*123, 0)
356+
watermark := time.Unix(3600*123, 0).In(tz)
355357
require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark)))
356358
timer = triggerTestTimer(t, store, timer.ID)
357359
adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID).
@@ -373,7 +375,7 @@ func TestTTLTimerHookOnEvent(t *testing.T) {
373375
require.Equal(t, oldSummary, timer.SummaryData)
374376

375377
// job not exists but timer disabled
376-
watermark = time.Unix(3600*456, 0)
378+
watermark = time.Unix(3600*456, 0).In(tz)
377379
require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark), timerapi.WithSetEnable(false)))
378380
timer = triggerTestTimer(t, store, timer.ID)
379381
adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID).
@@ -394,7 +396,7 @@ func TestTTLTimerHookOnEvent(t *testing.T) {
394396
require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetEnable(true)))
395397

396398
// job not exists but event start too early
397-
watermark = time.Unix(3600*789, 0)
399+
watermark = time.Unix(3600*789, 0).In(tz)
398400
require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark)))
399401
timer = triggerTestTimer(t, store, timer.ID)
400402
adapter.On("Now").Return(timer.EventStart.Add(11*time.Minute), nil).Once()

pkg/ttl/ttlworker/worker_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ttlworker
16+
17+
import "context"
18+
19+
// WorkerTestExt is the extension interface for worker in test.
20+
type WorkerTestExt interface {
21+
SetCtx(f func(ctx context.Context) context.Context)
22+
}
23+
24+
var _ WorkerTestExt = &baseWorker{}
25+
26+
// SetCtx modifies the context of the worker.
27+
func (w *baseWorker) SetCtx(f func(ctx context.Context) context.Context) {
28+
w.ctx = f(w.ctx)
29+
}

0 commit comments

Comments
 (0)