diff --git a/pkg/ttl/cache/table.go b/pkg/ttl/cache/table.go index e9183185b4e4e..99d6d0d5cf957 100644 --- a/pkg/ttl/cache/table.go +++ b/pkg/ttl/cache/table.go @@ -40,7 +40,9 @@ import ( "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tikv/client-go/v2/tikv" + "go.uber.org/zap" ) func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, error) { @@ -448,9 +450,10 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag return nil, err } - regionsPerRange := len(regionIDs) / splitCnt - oversizeCnt := len(regionIDs) % splitCnt - ranges := make([]kv.KeyRange, 0, min(len(regionIDs), splitCnt)) + regionsCnt := len(regionIDs) + regionsPerRange := regionsCnt / splitCnt + oversizeCnt := regionsCnt % splitCnt + ranges := make([]kv.KeyRange, 0, min(regionsCnt, splitCnt)) for len(regionIDs) > 0 { startRegion, err := regionCache.LocateRegionByID(tikv.NewBackofferWithVars(ctx, 20000, nil), regionIDs[0]) @@ -483,6 +486,15 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag oversizeCnt-- regionIDs = regionIDs[endRegionIdx+1:] } + logutil.BgLogger().Info("TTL table raw key ranges split", + zap.Int("regionsCnt", regionsCnt), + zap.Int("shouldSplitCnt", splitCnt), + zap.Int("actualSplitCnt", len(ranges)), + zap.Int64("tableID", t.ID), + zap.String("db", t.Schema.O), + zap.String("table", t.Name.O), + zap.String("partition", t.Partition.O), + ) return ranges, nil } diff --git a/pkg/ttl/cache/task.go b/pkg/ttl/cache/task.go index 8ddb4c457593e..4ee6abbe83b91 100644 --- a/pkg/ttl/cache/task.go +++ b/pkg/ttl/cache/task.go @@ -87,9 +87,9 @@ const ( // TaskStatusWaiting means the task hasn't started TaskStatusWaiting TaskStatus = "waiting" // TaskStatusRunning means this task is running - TaskStatusRunning = "running" + TaskStatusRunning TaskStatus = "running" // TaskStatusFinished means this task has finished - TaskStatusFinished = "finished" + TaskStatusFinished TaskStatus = "finished" ) // TTLTask is a row recorded in mysql.tidb_ttl_task @@ -116,6 +116,9 @@ type TTLTaskState struct { ErrorRows uint64 `json:"error_rows"` ScanTaskErr string `json:"scan_task_err"` + + // When PreviousOwner != "", it means this task is resigned from another owner + PreviousOwner string `json:"prev_owner,omitempty"` } // RowToTTLTask converts a row into TTL task diff --git a/pkg/ttl/ttlworker/BUILD.bazel b/pkg/ttl/ttlworker/BUILD.bazel index 59aa1033e5e5c..7fb9c4052e680 100644 --- a/pkg/ttl/ttlworker/BUILD.bazel +++ b/pkg/ttl/ttlworker/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "//pkg/util/timeutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@io_etcd_go_etcd_client_v3//:client", @@ -67,7 +68,6 @@ go_test( "task_manager_test.go", "timer_sync_test.go", "timer_test.go", - "worker_test.go", ], embed = [":ttlworker"], flaky = True, diff --git a/pkg/ttl/ttlworker/del.go b/pkg/ttl/ttlworker/del.go index 6fc4c93af5068..671560fc3a3b4 100644 --- a/pkg/ttl/ttlworker/del.go +++ b/pkg/ttl/ttlworker/del.go @@ -22,6 +22,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/ttl/cache" "github.com/pingcap/tidb/pkg/ttl/metrics" @@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re leftRows := t.rows defer func() { if len(leftRows) > 0 { - t.statistics.IncErrorRows(len(leftRows)) + retryRows = append(retryRows, leftRows...) } }() se := newTableSession(rawSe, t.tbl, t.expire) - for len(leftRows) > 0 { + for len(leftRows) > 0 && ctx.Err() == nil { maxBatch := variable.TTLDeleteBatchSize.Load() var delBatch [][]types.Datum if int64(len(leftRows)) < maxBatch { @@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re sqlInterval := time.Since(sqlStart) if err != nil { metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds()) - needRetry = needRetry && ctx.Err() == nil logutil.BgLogger().Warn( "delete SQL in TTL failed", zap.Error(err), @@ -191,8 +191,14 @@ func (b *ttlDelRetryBuffer) RecordTaskResult(task *ttlDeleteTask, retryRows [][] } func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) time.Duration { - for b.list.Len() > 0 { + l := b.list.Len() + // When `retryInterval==0`, to avoid the infinite retries, limit the max loop to the buffer length. + // It means one item only has one chance to retry in one `DoRetry` invoking. + for i := 0; i < l; i++ { ele := b.list.Front() + if ele == nil { + break + } item, ok := ele.Value.(*ttlDelRetryItem) if !ok { logutil.BgLogger().Error(fmt.Sprintf("invalid retry buffer item type: %T", ele)) @@ -214,6 +220,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim return b.retryInterval } +// SetRetryInterval sets the retry interval of the buffer. +func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) { + b.retryInterval = interval +} + // Drain drains a retry buffer. func (b *ttlDelRetryBuffer) Drain() { for ele := b.list.Front(); ele != nil; ele = ele.Next() { @@ -296,8 +307,37 @@ func (w *ttlDeleteWorker) loop() error { timer := time.NewTimer(w.retryBuffer.retryInterval) defer timer.Stop() - // drain retry buffer to make sure the statistics are correct - defer w.retryBuffer.Drain() + defer func() { + // Have a final try to delete all rows in retry buffer while the worker stops + // to avoid leaving any TTL rows undeleted when shrinking the delete worker. + if w.retryBuffer.Len() > 0 { + start := time.Now() + log.Info( + "try to delete TTL rows in del worker buffer immediately because the worker is going to stop", + zap.Int("bufferLen", w.retryBuffer.Len()), + ) + retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + w.retryBuffer.SetRetryInterval(0) + w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum { + return task.doDelete(retryCtx, se) + }) + log.Info( + "delete TTL rows in del worker buffer finished", + zap.Duration("duration", time.Since(start)), + ) + } + + // drain retry buffer to make sure the statistics are correct + if w.retryBuffer.Len() > 0 { + log.Warn( + "some TTL rows are still in the buffer while the worker is going to stop, mark them as error", + zap.Int("bufferLen", w.retryBuffer.Len()), + ) + w.retryBuffer.Drain() + } + }() + for w.Status() == workerStatusRunning { tracer.EnterPhase(metrics.PhaseIdle) select { diff --git a/pkg/ttl/ttlworker/del_test.go b/pkg/ttl/ttlworker/del_test.go index 87b335b72fa5e..1c9f604ebecb2 100644 --- a/pkg/ttl/ttlworker/del_test.go +++ b/pkg/ttl/ttlworker/del_test.go @@ -179,6 +179,22 @@ func TestTTLDelRetryBuffer(t *testing.T) { require.Equal(t, 0, buffer.Len()) require.Equal(t, uint64(0), statics6.SuccessRows.Load()) require.Equal(t, uint64(7), statics6.ErrorRows.Load()) + + // test should only retry at most once for one item in a DoRetry call. + buffer2 := newTTLDelRetryBuffer() + buffer2.SetRetryInterval(0) + buffer2.maxRetry = math.MaxInt + task7, rows7, statics7 := createTask("t7") + buffer2.RecordTaskResult(task7, rows7[:8]) + require.Equal(t, 1, buffer2.Len()) + currentRetryFn := doRetryFail + buffer2.DoRetry(func(task *ttlDeleteTask) [][]types.Datum { + fn := currentRetryFn + currentRetryFn = shouldNotDoRetry + return fn(task) + }) + require.Equal(t, uint64(1), statics7.SuccessRows.Load()) + require.Equal(t, uint64(0), statics7.ErrorRows.Load()) } func TestTTLDeleteTaskDoDelete(t *testing.T) { @@ -269,7 +285,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { retryErrBatches: []int{1, 2, 4}, }, { - // some retries and some not + // some retries and some not and some are executed when ctx canceled batchCnt: 10, noRetryErrBatches: []int{3, 8, 9}, retryErrBatches: []int{1, 2, 4}, @@ -279,6 +295,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { } for _, c := range cases { + require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt) ctx, cancel := context.WithCancel(context.Background()) if c.cancelCtx && c.cancelCtxBatch == 0 { cancel() @@ -298,16 +315,14 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { retryErrBatches = c.retryErrBatches nonRetryBatches = c.noRetryErrBatches retryRows := task.doDelete(ctx, s) - realBatchCnt := c.batchCnt - if c.cancelCtx { - realBatchCnt = c.cancelCtxBatch - } - require.LessOrEqual(t, realBatchCnt, c.batchCnt) // check SQLs - require.Equal(t, realBatchCnt, len(sqls)) expectedSQLs := make([]string, 0, len(sqls)) - for i := 0; i < realBatchCnt; i++ { + for i := 0; i < c.batchCnt; i++ { + if c.cancelCtx && i >= c.cancelCtxBatch { + break + } + batch := task.rows[i*delBatch : (i+1)*delBatch] idList := make([]string, 0, delBatch) for _, row := range batch { @@ -324,8 +339,8 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { // check retry rows var expectedRetryRows [][]types.Datum - for i := 0; i < realBatchCnt; i++ { - if slices.Contains(c.retryErrBatches, i) { + for i := 0; i < c.batchCnt; i++ { + if slices.Contains(c.retryErrBatches, i) || (c.cancelCtx && i >= c.cancelCtxBatch) { expectedRetryRows = append(expectedRetryRows, task.rows[i*delBatch:(i+1)*delBatch]...) } } @@ -334,7 +349,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { // check statistics var expectedErrRows uint64 for i := 0; i < c.batchCnt; i++ { - if i >= realBatchCnt || slices.Contains(c.noRetryErrBatches, i) { + if slices.Contains(c.noRetryErrBatches, i) && !(c.cancelCtx && i >= c.cancelCtxBatch) { expectedErrRows += uint64(delBatch) } } @@ -384,6 +399,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { t2 := newMockTTLTbl(t, "t2") t3 := newMockTTLTbl(t, "t3") t4 := newMockTTLTbl(t, "t4") + t5 := newMockTTLTbl(t, "t5") s := newMockSession(t) pool := newMockSessionPool(t) pool.se = s @@ -392,8 +408,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) { sqlMap := make(map[string]int) t3Retried := make(chan struct{}) t4Retried := make(chan struct{}) + t5Executed := make(chan struct{}) s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { - pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo) + pool.lastSession.sessionInfoSchema = newMockInfoSchema( + t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo, t5.TableInfo, + ) if strings.Contains(sql, "`t1`") { // success return nil, nil @@ -419,20 +438,35 @@ func TestTTLDeleteTaskWorker(t *testing.T) { // error and retry still error // this is to test the retry buffer should be drained after the delete worker stopped i := sqlMap[sql] - if i >= 2 { - // i >= 2 means t4 has retried once and records in retry buffer + if i == 2 { + // i == 2 means t4 has retried once and records in retry buffer close(t4Retried) } sqlMap[sql] = i + 1 return nil, errors.New("mockErr") } + if strings.Contains(sql, "`t5`") { + // error when the worker is running, + // success when flushing retry buffer while the worker stopping. + i := sqlMap[sql] + sqlMap[sql] = i + 1 + if ctx.Value("delWorker") != nil { + if i == 1 { + close(t5Executed) + } + return nil, errors.New("mockErr") + } + return nil, nil + } + require.FailNow(t, "") return nil, nil } delCh := make(chan *ttlDeleteTask) w := newDeleteWorker(delCh, pool) + w.ctx = context.WithValue(w.ctx, "delWorker", struct{}{}) w.retryBuffer.retryInterval = time.Millisecond w.retryBuffer.maxRetry = math.MaxInt require.Equal(t, workerStatusCreated, w.Status()) @@ -444,7 +478,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { }() tasks := make([]*ttlDeleteTask, 0) - for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4} { + for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4, t5} { task := &ttlDeleteTask{ tbl: tbl, expire: time.UnixMilli(0), @@ -476,8 +510,17 @@ func TestTTLDeleteTaskWorker(t *testing.T) { require.FailNow(t, "") } - // before stop, t4 should always retry without any error rows + select { + case <-t5Executed: + case <-time.After(time.Second): + require.FailNow(t, "") + } + + // before stop, t4, t5 should always retry without any error rows + require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load()) require.Equal(t, uint64(0), tasks[3].statistics.ErrorRows.Load()) + require.Equal(t, uint64(0), tasks[4].statistics.SuccessRows.Load()) + require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load()) w.Stop() require.NoError(t, w.WaitStopped(context.Background(), 10*time.Second)) @@ -490,6 +533,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) { require.Equal(t, uint64(0), tasks[2].statistics.SuccessRows.Load()) require.Equal(t, uint64(3), tasks[2].statistics.ErrorRows.Load()) + // t4 should be error because the buffer flush error while the worker stopping. require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load()) require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load()) + + // t5 should be success because the buffer flush success while the worker stopping. + require.Equal(t, uint64(3), tasks[4].statistics.SuccessRows.Load()) + require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load()) } diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index c3b07f8eb66a6..49335f02980e9 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -516,9 +516,10 @@ func TestRescheduleJobs(t *testing.T) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) se := sessionFactory() - m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + m := ttlworker.NewJobManager("manager-1", dom.SysSessionPool(), store, nil, func() bool { return true }) + defer m.TaskManager().ResizeWorkersToZero(t) m.TaskManager().ResizeWorkersWithSysVar() require.NoError(t, m.InfoSchemaCache().Update(se)) require.NoError(t, m.TableStatusCache().Update(context.Background(), se)) @@ -537,7 +538,8 @@ func TestRescheduleJobs(t *testing.T) { tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) // another manager should get this job, if the heart beat is not updated - anotherManager := ttlworker.NewJobManager("manager-2", nil, store, nil, nil) + anotherManager := ttlworker.NewJobManager("manager-2", dom.SysSessionPool(), store, nil, nil) + defer anotherManager.TaskManager().ResizeWorkersToZero(t) anotherManager.TaskManager().ResizeWorkersWithSysVar() require.NoError(t, anotherManager.InfoSchemaCache().Update(se)) require.NoError(t, anotherManager.TableStatusCache().Update(context.Background(), se)) @@ -584,7 +586,7 @@ func TestRescheduleJobsAfterTableDropped(t *testing.T) { } for i, rb := range removeBehaviors { se := sessionFactory() - m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + m := ttlworker.NewJobManager("manager-1", dom.SysSessionPool(), store, nil, func() bool { return true }) m.TaskManager().ResizeWorkersWithSysVar() @@ -613,6 +615,8 @@ func TestRescheduleJobsAfterTableDropped(t *testing.T) { table, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) m.DoGC(context.TODO(), se, now) + + m.TaskManager().ResizeWorkersToZero(t) } } @@ -631,7 +635,7 @@ func TestJobTimeout(t *testing.T) { se := sessionFactory() now := se.Now() - m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + m := ttlworker.NewJobManager("manager-1", dom.SysSessionPool(), store, nil, func() bool { return true }) m.TaskManager().ResizeWorkersWithSysVar() @@ -640,7 +644,7 @@ func TestJobTimeout(t *testing.T) { // submit job require.NoError(t, m.SubmitJob(se, tableID, tableID, "request1")) // set the worker to be empty, so none of the tasks will be scheduled - m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{}) + m.TaskManager().ResizeWorkersToZero(t) sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID) rows, err := se.ExecuteSQL(ctx, sql, args...) @@ -653,8 +657,10 @@ func TestJobTimeout(t *testing.T) { // there is already a task tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) - m2 := ttlworker.NewJobManager("manager-2", nil, store, nil, nil) + m2 := ttlworker.NewJobManager("manager-2", dom.SysSessionPool(), store, nil, nil) m2.TaskManager().ResizeWorkersWithSysVar() + defer m2.TaskManager().ResizeWorkersToZero(t) + require.NoError(t, m2.InfoSchemaCache().Update(se)) require.NoError(t, m2.TableStatusCache().Update(context.Background(), se)) // schedule jobs @@ -872,7 +878,7 @@ func TestJobMetrics(t *testing.T) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) se := sessionFactory() - m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + m := ttlworker.NewJobManager("manager-1", dom.SysSessionPool(), store, nil, func() bool { return true }) m.TaskManager().ResizeWorkersWithSysVar() @@ -880,7 +886,7 @@ func TestJobMetrics(t *testing.T) { // submit job require.NoError(t, m.SubmitJob(se, table.Meta().ID, table.Meta().ID, "request1")) // set the worker to be empty, so none of the tasks will be scheduled - m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{}) + m.TaskManager().ResizeWorkersToZero(t) sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID) rows, err := se.ExecuteSQL(ctx, sql, args...) diff --git a/pkg/ttl/ttlworker/scan.go b/pkg/ttl/ttlworker/scan.go index d28f62b8734a3..a4a9f67248b35 100644 --- a/pkg/ttl/ttlworker/scan.go +++ b/pkg/ttl/ttlworker/scan.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) @@ -42,9 +41,6 @@ var ( taskMaxErrorRate = 0.4 ) -// TTLScanPostScanHookForTest is used to hook the cancel of the TTL scan task. It's only used in tests. -type TTLScanPostScanHookForTest struct{} - type ttlStatistics struct { TotalRows atomic.Uint64 SuccessRows atomic.Uint64 @@ -85,14 +81,33 @@ type ttlScanTask struct { statistics *ttlStatistics } +// TaskTerminateReason indicates the reason why the task is terminated. +type TaskTerminateReason string + +const ( + // ReasonTaskFinished indicates the task is finished. + ReasonTaskFinished TaskTerminateReason = "finished" + // ReasonError indicates whether the task is terminated because of error. + ReasonError TaskTerminateReason = "error" + // ReasonWorkerStop indicates whether the task is terminated because the scan worker stops. + // We should reschedule this task in another worker or TiDB again. + ReasonWorkerStop TaskTerminateReason = "workerStop" +) + type ttlScanTaskExecResult struct { time time.Time task *ttlScanTask err error + // reason indicates why the task is terminated. + reason TaskTerminateReason } func (t *ttlScanTask) result(err error) *ttlScanTaskExecResult { - return &ttlScanTaskExecResult{time: time.Now(), task: t, err: err} + reason := ReasonTaskFinished + if err != nil { + reason = ReasonError + } + return &ttlScanTaskExecResult{time: time.Now(), task: t, err: err, reason: reason} } func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum { @@ -103,6 +118,17 @@ func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum { return datums } +func (t *ttlScanTask) taskLogger(l *zap.Logger) *zap.Logger { + return l.With( + zap.String("jobID", t.JobID), + zap.Int64("scanID", t.ScanID), + zap.Int64("tableID", t.TableID), + zap.String("db", t.tbl.Schema.O), + zap.String("table", t.tbl.Name.O), + zap.String("partition", t.tbl.Partition.O), + ) +} + func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult { // TODO: merge the ctx and the taskCtx in ttl scan task, to allow both "cancel" and gracefully stop workers // now, the taskCtx is only check at the beginning of every loop @@ -125,13 +151,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s case <-doScanFinished.Done(): return } - logger := logutil.BgLogger().With( - zap.Int64("tableID", t.TableID), - zap.String("table", t.tbl.Name.O), - zap.String("partition", t.tbl.Partition.O), - zap.String("jobID", t.JobID), - zap.Int64("scanID", t.ScanID), - ) + logger := t.taskLogger(logutil.BgLogger()) logger.Info("kill the running statement in scan task because the task or worker cancelled") rawSess.KillStmt() ticker := time.NewTicker(time.Minute) @@ -384,8 +404,8 @@ func (w *ttlScanWorker) handleScanTask(tracer *metrics.PhaseTracer, task *ttlSca result = task.result(nil) } - if intest.InTest && ctx.Value(TTLScanPostScanHookForTest{}) != nil { - ctx.Value(TTLScanPostScanHookForTest{}).(func())() + if result.reason == ReasonError && w.baseWorker.ctx.Err() != nil { + result.reason = ReasonWorkerStop } w.baseWorker.Lock() diff --git a/pkg/ttl/ttlworker/scan_test.go b/pkg/ttl/ttlworker/scan_test.go index 88223b5231066..700da4c4e0f4d 100644 --- a/pkg/ttl/ttlworker/scan_test.go +++ b/pkg/ttl/ttlworker/scan_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/ttl/cache" @@ -72,8 +73,14 @@ func (w *mockScanWorker) checkPollResult(exist bool, err string) { require.Same(w.t, curTask, r.task) if err == "" { require.NoError(w.t, r.err) + require.Equal(w.t, ReasonTaskFinished, r.reason) } else { require.EqualError(w.t, r.err, err) + if w.ctx.Err() != nil { + require.Equal(w.t, ReasonWorkerStop, r.reason) + } else { + require.Equal(w.t, ReasonError, r.reason) + } } } } @@ -125,6 +132,14 @@ func (w *mockScanWorker) stopWithWait() { require.NoError(w.t, w.WaitStopped(context.TODO(), 10*time.Second)) } +func (w *mockScanWorker) SetInfoSchema(is infoschema.InfoSchema) { + w.sessPoll.se.sessionInfoSchema = is +} + +func (w *mockScanWorker) SetExecuteSQL(fn func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error)) { + w.sessPoll.se.executeSQL = fn +} + func TestScanWorkerSchedule(t *testing.T) { origLimit := variable.TTLScanBatchSize.Load() variable.TTLScanBatchSize.Store(5) @@ -209,6 +224,43 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) { w.checkWorkerStatus(workerStatusRunning, true, nil) } +func TestScanResultWhenWorkerStop(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + w := NewMockScanWorker(t) + defer w.sessPoll.AssertNoSessionInUse() + executeCh := make(chan struct{}) + w.sessPoll.se.sessionInfoSchema = newMockInfoSchema(tbl.TableInfo) + w.sessPoll.se.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { + close(executeCh) + select { + case <-ctx.Done(): + case <-time.After(10 * time.Second): + require.FailNow(t, "wait scan worker stop timeout") + } + return nil, nil + } + + w.Start() + task := &ttlScanTask{ + ctx: context.Background(), + tbl: tbl, + TTLTask: &cache.TTLTask{}, + statistics: &ttlStatistics{}, + } + require.NoError(t, w.Schedule(task)) + select { + case <-executeCh: + case <-time.After(time.Second): + require.FailNow(t, "wait executeSQL timeout") + } + w.stopWithWait() + w.checkWorkerStatus(workerStatusStopped, false, task) + msg := w.waitNotifyScanTaskEnd() + require.Equal(t, ReasonWorkerStop, msg.result.reason) + w.checkPollResult(true, msg.result.err.Error()) + w.checkWorkerStatus(workerStatusStopped, false, nil) +} + type mockScanTask struct { *ttlScanTask t *testing.T @@ -278,8 +330,10 @@ func (t *mockScanTask) runDoScanForTest(delTaskCnt int, errString string) *ttlSc require.Same(t.t, t.ttlScanTask, r.task) if errString == "" { require.NoError(t.t, r.err) + require.Equal(t.t, ReasonTaskFinished, r.reason) } else { require.EqualError(t.t, r.err, errString) + require.Equal(t.t, ReasonError, r.reason) } previousIdx := delTaskCnt diff --git a/pkg/ttl/ttlworker/task_manager.go b/pkg/ttl/ttlworker/task_manager.go index d8dfd3e608ed6..a7bd2addc83fd 100644 --- a/pkg/ttl/ttlworker/task_manager.go +++ b/pkg/ttl/ttlworker/task_manager.go @@ -60,7 +60,7 @@ func setTTLTaskFinishedSQL(jobID string, scanID int64, state *cache.TTLTaskState return setTTLTaskFinishedTemplate, []any{now.Format(timeFormat), string(stateStr), jobID, scanID, ownerID}, nil } -const updateTTLTaskHeartBeatTempalte = `UPDATE mysql.tidb_ttl_task +const updateTTLTaskHeartBeatTemplate = `UPDATE mysql.tidb_ttl_task SET state = %?, owner_hb_time = %? WHERE job_id = %? AND scan_id = %? AND owner_id = %?` @@ -70,7 +70,23 @@ func updateTTLTaskHeartBeatSQL(jobID string, scanID int64, now time.Time, state if err != nil { return "", nil, err } - return updateTTLTaskHeartBeatTempalte, []any{string(stateStr), now.Format(timeFormat), jobID, scanID, ownerID}, nil + return updateTTLTaskHeartBeatTemplate, []any{string(stateStr), now.Format(timeFormat), jobID, scanID, ownerID}, nil +} + +const resignOwnerSQLTemplate = `UPDATE mysql.tidb_ttl_task + SET state = %?, + status = 'waiting', + owner_id = NULL, + status_update_time = %?, + owner_hb_time = NULL + WHERE job_id = %? AND scan_id = %? AND owner_id = %?` + +func resignOwnerSQL(jobID string, scanID int64, now time.Time, state *cache.TTLTaskState, ownerID string) (string, []any, error) { + stateStr, err := json.Marshal(state) + if err != nil { + return "", nil, err + } + return resignOwnerSQLTemplate, []any{string(stateStr), now.Format(timeFormat), jobID, scanID, ownerID}, nil } const countRunningTasks = "SELECT count(1) FROM mysql.tidb_ttl_task WHERE status = 'running'" @@ -162,6 +178,7 @@ func (m *taskManager) resizeScanWorkers(count int) error { scanErr = errors.New("timeout to cancel scan task") result = curTask.result(scanErr) + result.reason = ReasonWorkerStop } task := findTaskWithID(m.runningTasks, jobID, scanID) @@ -194,6 +211,8 @@ func (m *taskManager) resizeDelWorkers(count int) error { return err } +var waitWorkerStopTimeout = 30 * time.Second + // resizeWorkers scales the worker, and returns the full set of workers as the first return value. If there are workers // stopped, return the stopped worker in the second return value func (m *taskManager) resizeWorkers(workers []worker, count int, factory func() worker) ([]worker, []worker, error) { @@ -206,9 +225,9 @@ func (m *taskManager) resizeWorkers(workers []worker, count int, factory func() var errs error // don't use `m.ctx` here, because when shutdown the server, `m.ctx` has already been cancelled - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), waitWorkerStopTimeout) for _, w := range workers[count:] { - err := w.WaitStopped(ctx, 30*time.Second) + err := w.WaitStopped(ctx, waitWorkerStopTimeout) if err != nil { logutil.Logger(m.ctx).Warn("fail to stop ttl worker", zap.Error(err)) errs = multierr.Append(errs, err) @@ -292,7 +311,11 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) { loop: for _, t := range tasks { - logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID)) + logger := logutil.Logger(m.ctx).With( + zap.String("jobID", t.JobID), + zap.Int64("scanID", t.ScanID), + zap.Int64("tableID", t.TableID), + ) task, err := m.lockScanTask(se, t, now) if err != nil { @@ -305,7 +328,7 @@ loop: // don't step into the next step to avoid exceeding the limit break loop } - logutil.Logger(m.ctx).Warn("fail to lock scan task", zap.Error(err)) + logger.Warn("fail to lock scan task", zap.Error(err)) continue } @@ -319,7 +342,19 @@ loop: continue } - logger.Info("scheduled ttl task") + var prevTotalRows, prevSuccessRows, prevErrorRows uint64 + if state := task.TTLTask.State; state != nil { + prevTotalRows = state.TotalRows + prevSuccessRows = state.SuccessRows + prevErrorRows = state.ErrorRows + } + + logger.Info( + "scheduled ttl task", + zap.Uint64("prevTotalRows", prevTotalRows), + zap.Uint64("prevSuccessRows", prevSuccessRows), + zap.Uint64("prevErrorRows", prevErrorRows), + ) m.runningTasks = append(m.runningTasks, task) if len(idleScanWorkers) == 0 { @@ -381,6 +416,23 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now return errors.WithStack(errTooManyRunningTasks) } + if task.OwnerID != "" { + logutil.Logger(m.ctx).Info( + "try to lock a heartbeat timeout task", + zap.String("jobID", task.JobID), + zap.Int64("scanID", task.ScanID), + zap.String("prevOwner", task.OwnerID), + zap.Time("lastHeartbeat", task.OwnerHBTime), + ) + } else if task.State != nil && task.State.PreviousOwner != "" { + logutil.Logger(m.ctx).Info( + "try to lock a task resigned from another instance", + zap.String("jobID", task.JobID), + zap.Int64("scanID", task.ScanID), + zap.String("prevOwner", task.State.PreviousOwner), + ) + } + intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) sql, args := setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now) _, err = se.ExecuteSQL(ctx, sql, args...) @@ -441,25 +493,24 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID // updateHeartBeat updates the heartbeat for all tasks with current instance as owner func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) { for _, task := range m.runningTasks { - err := m.updateHeartBeatForTask(ctx, se, now, task) + err := m.taskHeartbeatOrResignOwner(ctx, se, now, task, false) if err != nil { logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID)) } } } -func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error { - state := &cache.TTLTaskState{ - TotalRows: task.statistics.TotalRows.Load(), - SuccessRows: task.statistics.SuccessRows.Load(), - ErrorRows: task.statistics.ErrorRows.Load(), - } - if task.result != nil && task.result.err != nil { - state.ScanTaskErr = task.result.err.Error() - } +func (m *taskManager) taskHeartbeatOrResignOwner(ctx context.Context, se session.Session, now time.Time, task *runningScanTask, isResignOwner bool) error { + state := task.dumpNewTaskState() intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) - sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id) + buildSQLFunc := updateTTLTaskHeartBeatSQL + if isResignOwner { + state.PreviousOwner = m.id + buildSQLFunc = resignOwnerSQL + } + + sql, args, err := buildSQLFunc(task.JobID, task.ScanID, now, state, m.id) if err != nil { return err } @@ -469,16 +520,85 @@ func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Ses } if se.GetSessionVars().StmtCtx.AffectedRows() != 1 { - return errors.Errorf("fail to update task heartbeat, maybe the owner is not myself (%s), affected rows: %d", + return errors.Errorf("fail to update task status, maybe the owner is not myself (%s), affected rows: %d", m.id, se.GetSessionVars().StmtCtx.AffectedRows()) } return nil } +func shouldRunningTaskResignOwner(task *runningScanTask) (string, bool) { + if result := task.result; result != nil { + switch result.reason { + case ReasonWorkerStop: + return string(result.reason), true + } + } + return "", false +} + +func (m *taskManager) tryResignTaskOwner(se session.Session, task *runningScanTask, reason string, now time.Time) bool { + var totalRows, successRows, errRows, processedRows uint64 + if stats := task.statistics; stats != nil { + totalRows = stats.TotalRows.Load() + successRows = stats.SuccessRows.Load() + errRows = stats.ErrorRows.Load() + processedRows = successRows + errRows + } + + var taskEndTime time.Time + if r := task.result; r != nil { + taskEndTime = r.time + } + + logger := task.taskLogger(logutil.Logger(m.ctx)).With( + zap.Time("taskEndTime", taskEndTime), + zap.String("reason", reason), + zap.Uint64("totalRows", totalRows), + zap.Uint64("processedRows", processedRows), + zap.Uint64("successRows", successRows), + zap.Uint64("errRows", errRows), + ) + + sinceTaskEnd := now.Sub(taskEndTime) + if sinceTaskEnd < 0 { + logger.Warn("task end time is after current time, something may goes wrong") + } + + if totalRows > processedRows && sinceTaskEnd < 10*time.Second && sinceTaskEnd > -10*time.Second { + logger.Info("wait all rows processed before resign the owner of a TTL task") + return false + } + + if totalRows > processedRows { + logger.Info("wait all rows processed timeout, force to resign the owner of a TTL task") + } else { + logger.Info("resign the owner of a TTL task") + } + + task.cancel() + // Update the task state with heartbeatTask for the last time. + // The task will be taken over by another instance after timeout. + if err := m.taskHeartbeatOrResignOwner(m.ctx, se, now, task, true); err != nil { + logger.Warn("fail to update the state before resign the task owner", zap.Error(err)) + } + + return true +} + func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) { + if len(m.runningTasks) == 0 { + return + } stillRunningTasks := make([]*runningScanTask, 0, len(m.runningTasks)) for _, task := range m.runningTasks { + if reason, resign := shouldRunningTaskResignOwner(task); resign { + if !m.tryResignTaskOwner(se, task, reason, now) { + stillRunningTasks = append(stillRunningTasks, task) + } + continue + } + if !task.finished(logutil.Logger(m.ctx)) { stillRunningTasks = append(stillRunningTasks, task) continue @@ -487,7 +607,7 @@ func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) { task.cancel() err := m.reportTaskFinished(se, now, task) if err != nil { - logutil.Logger(m.ctx).Error("fail to report finished task", zap.Error(err)) + task.taskLogger(logutil.Logger(m.ctx)).Error("fail to report finished task", zap.Error(err)) stillRunningTasks = append(stillRunningTasks, task) continue } @@ -497,14 +617,7 @@ func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) { } func (m *taskManager) reportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error { - state := &cache.TTLTaskState{ - TotalRows: task.statistics.TotalRows.Load(), - SuccessRows: task.statistics.SuccessRows.Load(), - ErrorRows: task.statistics.ErrorRows.Load(), - } - if task.result.err != nil { - state.ScanTaskErr = task.result.err.Error() - } + state := task.dumpNewTaskState() intest.Assert(se.GetSessionVars().Location().String() == now.Location().String()) sql, args, err := setTTLTaskFinishedSQL(task.JobID, task.ScanID, state, now, m.id) @@ -524,11 +637,21 @@ func (m *taskManager) reportTaskFinished(se session.Session, now time.Time, task m.id, se.GetSessionVars().StmtCtx.AffectedRows()) } + task.taskLogger(logutil.Logger(m.ctx)).Info( + "TTL task finished", + zap.Uint64("finalTotalRows", state.TotalRows), + zap.Uint64("finalSuccessRows", state.SuccessRows), + zap.Uint64("finalErrorRows", state.ErrorRows), + ) + return nil } // checkInvalidTask removes the task whose owner is not myself or which has disappeared func (m *taskManager) checkInvalidTask(se session.Session) { + if len(m.runningTasks) == 0 { + return + } // TODO: optimize this function through cache or something else ownRunningTask := make([]*runningScanTask, 0, len(m.runningTasks)) @@ -626,6 +749,29 @@ func (t *runningScanTask) Context() context.Context { return t.ctx } +// dumpNewTaskState dumps a new TTLTaskState which is used to update the task meta in the storage +func (t *runningScanTask) dumpNewTaskState() *cache.TTLTaskState { + state := &cache.TTLTaskState{ + TotalRows: t.statistics.TotalRows.Load(), + SuccessRows: t.statistics.SuccessRows.Load(), + ErrorRows: t.statistics.ErrorRows.Load(), + } + + if prevState := t.TTLTask.State; prevState != nil { + // If a task was timeout and taken over by the current instance, + // adding the previous state to the current state to make the statistics more accurate. + state.TotalRows += prevState.SuccessRows + prevState.ErrorRows + state.SuccessRows += prevState.SuccessRows + state.ErrorRows += prevState.ErrorRows + } + + if r := t.result; r != nil && r.err != nil { + state.ScanTaskErr = r.err.Error() + } + + return state +} + func (t *runningScanTask) finished(logger *zap.Logger) bool { if t.result == nil { // Scan task isn't finished @@ -645,7 +791,7 @@ func (t *runningScanTask) finished(logger *zap.Logger) bool { if processedRows == totalRows { // All rows are processed. logger.Info( - "mark TTL task finished because all scanned rows are processed", + "will mark TTL task finished because all scanned rows are processed", zap.Uint64("totalRows", totalRows), zap.Uint64("successRows", successRows), zap.Uint64("errorRows", errRows), @@ -657,7 +803,7 @@ func (t *runningScanTask) finished(logger *zap.Logger) bool { // All rows are processed but processed rows are more than total rows. // We still think it is finished. logger.Warn( - "mark TTL task finished but processed rows are more than total rows", + "will mark TTL task finished but processed rows are more than total rows", zap.Uint64("totalRows", totalRows), zap.Uint64("successRows", successRows), zap.Uint64("errorRows", errRows), @@ -670,7 +816,7 @@ func (t *runningScanTask) finished(logger *zap.Logger) bool { // After a certain time, if the rows are still not processed, we need to mark the task finished anyway to make // sure the TTL job does not hang. logger.Info( - "mark TTL task finished because timeout for waiting all scanned rows processed after scan task done", + "will mark TTL task finished because timeout for waiting all scanned rows processed after scan task done", zap.Uint64("totalRows", totalRows), zap.Uint64("successRows", successRows), zap.Uint64("errorRows", errRows), diff --git a/pkg/ttl/ttlworker/task_manager_integration_test.go b/pkg/ttl/ttlworker/task_manager_integration_test.go index c84dd1f61f5b2..01ec9332dab91 100644 --- a/pkg/ttl/ttlworker/task_manager_integration_test.go +++ b/pkg/ttl/ttlworker/task_manager_integration_test.go @@ -17,10 +17,12 @@ package ttlworker_test import ( "context" "fmt" + "strconv" "sync" "testing" "time" + "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" @@ -29,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/ttl/cache" "github.com/pingcap/tidb/pkg/ttl/metrics" "github.com/pingcap/tidb/pkg/ttl/ttlworker" + "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/logutil" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -357,7 +360,9 @@ func TestMeetTTLRunningTasks(t *testing.T) { require.True(t, dom.TTLJobManager().TaskManager().MeetTTLRunningTasks(3, cache.TaskStatusRunning)) } -func TestShrinkScanWorkerTimeout(t *testing.T) { +func TestShrinkScanWorkerAndResignOwner(t *testing.T) { + defer ttlworker.SetWaitWorkerStopTimeoutForTest(time.Second)() + store, dom := testkit.CreateMockStoreAndDomain(t) pool := wrapPoolForTest(dom.SysSessionPool()) defer pool.AssertNoSessionInUse(t) @@ -370,8 +375,9 @@ func TestShrinkScanWorkerTimeout(t *testing.T) { tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - for id := 0; id < 4; id++ { - sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id) + taskCnt := 8 + for id := 0; id < taskCnt; id++ { + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW() - interval %d second)", testTable.Meta().ID, id, taskCnt-id) tk.MustExec(sql) } @@ -384,50 +390,157 @@ func TestShrinkScanWorkerTimeout(t *testing.T) { startBlockNotifyCh := make(chan struct{}) blockCancelCh := make(chan struct{}) + workers := make([]ttlworker.Worker, 0, taskCnt) + defer func() { + close(blockCancelCh) + for _, w := range workers { + w.Stop() + require.NoError(t, w.WaitStopped(context.Background(), time.Minute)) + } + }() - workers := []ttlworker.Worker{} - for j := 0; j < 4; j++ { + for j := 0; j < taskCnt; j++ { scanWorker := ttlworker.NewMockScanWorker(t) - if j == 0 { - scanWorker.SetCtx(func(ctx context.Context) context.Context { - return context.WithValue(ctx, ttlworker.TTLScanPostScanHookForTest{}, func() { - startBlockNotifyCh <- struct{}{} - <-blockCancelCh - }) + scanWorker.SetInfoSchema(dom.InfoSchema()) + switch j { + case 0: + scanWorker.SetExecuteSQL(func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { + // test for shrink scan worker timeout + startBlockNotifyCh <- struct{}{} + <-blockCancelCh + return nil, nil + }) + case 1: + scanWorker.SetExecuteSQL(func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { + return nil, errors.New("mockErr") + }) + default: + scanWorker.SetExecuteSQL(func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { + <-ctx.Done() + return nil, nil }) } scanWorker.Start() workers = append(workers, scanWorker) } - m.SetScanWorkers4Test(workers) m.RescheduleTasks(se, now) - require.Len(t, m.GetRunningTasks(), 4) - tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4")) + require.Len(t, m.GetRunningTasks(), len(workers)) + tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'"). + Check(testkit.Rows(strconv.Itoa(taskCnt))) <-startBlockNotifyCh - // shrink scan workers, one of them will timeout + // shrink scan workers, and one of them will be a timeout require.Error(t, m.ResizeScanWorkers(0)) require.Len(t, m.GetScanWorkers(), 0) - // the canceled 3 tasks are still running, but they have results, so after `CheckFinishedTask`, it should be finished - tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("4")) - m.CheckFinishedTask(se, now) - require.Len(t, m.GetRunningTasks(), 0) - // now, the task should be finished - tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("0")) - // the first task will be finished with "timeout to cancel scan task" - // other tasks will finish with table not found because we didn't mock the table in this test. - tk.MustQuery("SELECT scan_id, json_extract(state, '$.scan_task_err') from mysql.tidb_ttl_task").Sort().Check(testkit.Rows( - "0 \"timeout to cancel scan task\"", - "1 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", - "2 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", - "3 \"table 'test.t' meta changed, should abort current job: [schema:1146]Table 'test.t' doesn't exist\"", - )) + // mock running task's statistics and end time + tk.MustQuery("SELECT count(1) from mysql.tidb_ttl_task where status = 'running'"). + Check(testkit.Rows(strconv.Itoa(taskCnt))) + tasks := m.GetRunningTasks() + require.Len(t, tasks, len(workers)) + for j, task := range tasks { + terminated, reason, endTime := task.GetTerminateInfo() + require.True(t, terminated, j) + require.Equal(t, ttlworker.ReasonWorkerStop, reason, j) + stat := task.GetStatistics() + require.True(t, !endTime.IsZero(), j) + switch j { + case 0: + // some rows error + stat.TotalRows.Store(128) + stat.SuccessRows.Store(100) + stat.ErrorRows.Store(28) + case 1: + // no rows + stat.TotalRows.Store(0) + stat.SuccessRows.Store(0) + stat.ErrorRows.Store(0) + case 2: + // all rows processed + stat.TotalRows.Store(128) + stat.SuccessRows.Store(128) + stat.ErrorRows.Store(0) + case 3: + // no enough rows processed, not timeout + task.ResetEndTimeForTest(t, now.Add(9*time.Second)) + stat.TotalRows.Store(128) + stat.SuccessRows.Store(64) + stat.ErrorRows.Store(63) + case 4: + // no enough rows processed, but timed out + task.ResetEndTimeForTest(t, now.Add(10*time.Second)) + // also, test report rows should be accumulated + task.TTLTask.State = &cache.TTLTaskState{ + TotalRows: 5, + SuccessRows: 2, + ErrorRows: 1, + } + stat.TotalRows.Store(128) + stat.SuccessRows.Store(64) + stat.ErrorRows.Store(63) + case 5: + // no enough rows processed, but no negative time + task.ResetEndTimeForTest(t, now.Add(-9*time.Second)) + stat.TotalRows.Store(128) + stat.SuccessRows.Store(64) + stat.ErrorRows.Store(63) + case 6: + // no enough rows processed, but negative timed out + task.ResetEndTimeForTest(t, now.Add(-10*time.Second)) + stat.TotalRows.Store(128) + stat.SuccessRows.Store(64) + stat.ErrorRows.Store(63) + case 7: + // some unexpected data + stat.TotalRows.Store(128) + stat.SuccessRows.Store(129) + stat.ErrorRows.Store(0) + } + } - require.NoError(t, m.ResizeDelWorkers(0)) - close(blockCancelCh) + // After CheckFinishedTask, tasks should be "waiting" state except some are waiting for statistics collecting. + m.CheckFinishedTask(se, now) + tk.MustQuery( + "SELECT scan_id, status, owner_id," + + " json_extract(state, '$.total_rows')," + + " json_extract(state, '$.success_rows')," + + " json_extract(state, '$.error_rows')," + + " json_extract(state, '$.prev_owner')," + + " json_extract(state, '$.scan_task_err')" + + " from mysql.tidb_ttl_task order by scan_id"). + Check(testkit.Rows( + "0 waiting 128 100 28 \"scan-manager-1\" \"timeout to cancel scan task\"", + "1 waiting 0 0 0 \"scan-manager-1\" \"context canceled\"", + "2 waiting 128 128 0 \"scan-manager-1\" \"context canceled\"", + "3 running scan-manager-1 ", + "4 waiting 131 66 64 \"scan-manager-1\" \"context canceled\"", + "5 running scan-manager-1 ", + "6 waiting 128 64 63 \"scan-manager-1\" \"context canceled\"", + "7 waiting 128 129 0 \"scan-manager-1\" \"context canceled\"", + )) + + // A resigned task can be obtained by other task managers + m2 := ttlworker.NewTaskManager(context.Background(), pool, isc, "scan-manager-2", store) + worker2 := ttlworker.NewMockScanWorker(t) + worker2.Start() + defer func() { + worker2.Stop() + require.NoError(t, worker2.WaitStopped(context.Background(), time.Minute)) + }() + m2.SetScanWorkers4Test([]ttlworker.Worker{worker2}) + m2.RescheduleTasks(se, now) + require.Len(t, m2.GetRunningTasks(), 1) + task := m2.GetRunningTasks()[0] + require.Equal(t, int64(0), task.ScanID) + require.Equal(t, cache.TaskStatusRunning, task.Status) + require.Equal(t, "scan-manager-2", task.OwnerID) + require.Equal(t, uint64(128), task.State.TotalRows) + require.Equal(t, uint64(100), task.State.SuccessRows) + require.Equal(t, uint64(28), task.State.ErrorRows) + tk.MustQuery("select status, owner_id from mysql.tidb_ttl_task where scan_id=0"). + Check(testkit.Rows("running scan-manager-2")) } func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) { diff --git a/pkg/ttl/ttlworker/task_manager_test.go b/pkg/ttl/ttlworker/task_manager_test.go index 2827fb8846147..5372d8c298418 100644 --- a/pkg/ttl/ttlworker/task_manager_test.go +++ b/pkg/ttl/ttlworker/task_manager_test.go @@ -51,6 +51,12 @@ func (m *taskManager) ResizeWorkersWithSysVar() { m.resizeWorkersWithSysVar() } +// ResizeWorkersToZero resize workers to zero +func (m *taskManager) ResizeWorkersToZero(t *testing.T) { + require.NoError(t, m.resizeScanWorkers(0)) + require.NoError(t, m.resizeDelWorkers(0)) +} + // RescheduleTasks is an exported version of rescheduleTasks func (m *taskManager) RescheduleTasks(se session.Session, now time.Time) { m.rescheduleTasks(se, now) @@ -118,7 +124,35 @@ func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, n // UpdateHeartBeatForTask is an exported version of updateHeartBeatForTask func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error { - return m.updateHeartBeatForTask(ctx, se, now, task) + return m.taskHeartbeatOrResignOwner(ctx, se, now, task, false) +} + +// SetWaitWorkerStopTimeoutForTest sets the waitWorkerStopTimeout for testing +func SetWaitWorkerStopTimeoutForTest(timeout time.Duration) func() { + original := waitWorkerStopTimeout + waitWorkerStopTimeout = timeout + return func() { + waitWorkerStopTimeout = original + } +} + +// GetTerminateInfo returns the task terminates info +func (t *runningScanTask) GetTerminateInfo() (bool, TaskTerminateReason, time.Time) { + if t.result == nil { + return false, "", time.Time{} + } + return true, t.result.reason, t.result.time +} + +// GetStatistics returns the ttlStatistics +func (t *runningScanTask) GetStatistics() *ttlStatistics { + return t.statistics +} + +// ResetEndTime resets the end time +func (t *runningScanTask) ResetEndTimeForTest(tb *testing.T, tm time.Time) { + require.NotNil(tb, t.result) + t.result.time = tm } func TestResizeWorkers(t *testing.T) { diff --git a/pkg/ttl/ttlworker/worker.go b/pkg/ttl/ttlworker/worker.go index 9ff607acbdc17..13e5a46fbed9c 100644 --- a/pkg/ttl/ttlworker/worker.go +++ b/pkg/ttl/ttlworker/worker.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) @@ -126,6 +127,7 @@ func (w *baseWorker) loop() { defer func() { if r := recover(); r != nil { logutil.BgLogger().Info("ttl worker panic", zap.Any("recover", r), zap.Stack("stack")) + intest.Assert(false, "ttl worker panic") } w.Lock() w.toStopped(err) diff --git a/pkg/ttl/ttlworker/worker_test.go b/pkg/ttl/ttlworker/worker_test.go deleted file mode 100644 index 0291cd538c24c..0000000000000 --- a/pkg/ttl/ttlworker/worker_test.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ttlworker - -import "context" - -// WorkerTestExt is the extension interface for worker in test. -type WorkerTestExt interface { - SetCtx(f func(ctx context.Context) context.Context) -} - -var _ WorkerTestExt = &baseWorker{} - -// SetCtx modifies the context of the worker. -func (w *baseWorker) SetCtx(f func(ctx context.Context) context.Context) { - w.ctx = f(w.ctx) -}