Skip to content

Commit eaf7090

Browse files
authored
ttl: fix the issue that one task losing heartbeat will block other tasks (#57919) (#59392)
close #57915
1 parent c526e52 commit eaf7090

File tree

6 files changed

+179
-52
lines changed

6 files changed

+179
-52
lines changed

pkg/ttl/ttlworker/job_manager.go

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,7 @@ func (m *JobManager) jobLoop() error {
221221
// Job Schedule loop:
222222
case <-updateJobHeartBeatTicker:
223223
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
224-
err = m.updateHeartBeat(updateHeartBeatCtx, se, now)
225-
if err != nil {
226-
logutil.Logger(m.ctx).Warn("fail to update job heart beat", zap.Error(err))
227-
}
224+
m.updateHeartBeat(updateHeartBeatCtx, se, now)
228225
cancel()
229226
case <-jobCheckTicker:
230227
m.checkFinishedJob(se)
@@ -267,10 +264,7 @@ func (m *JobManager) jobLoop() error {
267264
m.taskManager.resizeWorkersWithSysVar()
268265
case <-updateTaskHeartBeatTicker:
269266
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
270-
err = m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
271-
if err != nil {
272-
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err))
273-
}
267+
m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
274268
cancel()
275269
case <-checkScanTaskFinishedTicker:
276270
if m.taskManager.handleScanFinishedTask() {
@@ -879,28 +873,41 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t
879873
}
880874

881875
// updateHeartBeat updates the heartbeat for all task with current instance as owner
882-
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
876+
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
883877
for _, job := range m.localJobs() {
884-
if job.createTime.Add(ttlJobTimeout).Before(now) {
885-
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
886-
summary, err := summarizeErr(errors.New("job is timeout"))
887-
if err != nil {
888-
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
889-
}
890-
err = job.finish(se, now, summary)
891-
if err != nil {
892-
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
893-
continue
894-
}
895-
m.removeJob(job)
878+
err := m.updateHeartBeatForJob(ctx, se, now, job)
879+
if err != nil {
880+
logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id))
896881
}
882+
}
883+
}
897884

898-
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
899-
_, err := se.ExecuteSQL(ctx, sql, args...)
885+
func (m *JobManager) updateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
886+
if job.createTime.Add(ttlJobTimeout).Before(now) {
887+
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
888+
summary, err := summarizeErr(errors.New("job is timeout"))
900889
if err != nil {
901-
return errors.Wrapf(err, "execute sql: %s", sql)
890+
return errors.Wrapf(err, "fail to summarize job")
902891
}
892+
err = job.finish(se, now, summary)
893+
if err != nil {
894+
return errors.Wrapf(err, "fail to finish job")
895+
}
896+
m.removeJob(job)
897+
return nil
903898
}
899+
900+
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
901+
_, err := se.ExecuteSQL(ctx, sql, args...)
902+
if err != nil {
903+
return errors.Wrapf(err, "execute sql: %s", sql)
904+
}
905+
906+
if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
907+
return errors.Errorf("fail to update job heartbeat, maybe the owner is not myself (%s), affected rows: %d",
908+
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
909+
}
910+
904911
return nil
905912
}
906913

pkg/ttl/ttlworker/job_manager_integration_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ func TestJobTimeout(t *testing.T) {
701701
require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix())
702702

703703
// the timeout will be checked while updating heartbeat
704-
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
704+
require.NoError(t, m2.UpdateHeartBeatForJob(ctx, se, now.Add(7*time.Hour), m2.RunningJobs()[0]))
705705
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout"))
706706
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
707707
}
@@ -1533,6 +1533,49 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
15331533
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
15341534
}
15351535

1536+
func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
1537+
store, dom := testkit.CreateMockStoreAndDomain(t)
1538+
waitAndStopTTLManager(t, dom)
1539+
tk := testkit.NewTestKit(t, store)
1540+
1541+
sessionFactory := sessionFactory(t, store)
1542+
se := sessionFactory()
1543+
1544+
tk.MustExec("use test")
1545+
tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
1546+
tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
1547+
testTable1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
1548+
require.NoError(t, err)
1549+
testTable2, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
1550+
require.NoError(t, err)
1551+
1552+
ctx := context.Background()
1553+
m := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
1554+
1555+
now := se.Now()
1556+
// acquire two jobs
1557+
require.NoError(t, m.InfoSchemaCache().Update(se))
1558+
require.NoError(t, m.TableStatusCache().Update(ctx, se))
1559+
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable1.Meta().ID], now, uuid.NewString(), false)
1560+
require.NoError(t, err)
1561+
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable2.Meta().ID], now, uuid.NewString(), false)
1562+
require.NoError(t, err)
1563+
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running", "running"))
1564+
1565+
// assign the first job to another manager
1566+
tk.MustExec("update mysql.tidb_ttl_table_status set current_job_owner_id = 'test-ttl-job-manager-2' where table_id = ?", testTable1.Meta().ID)
1567+
// the heartbeat of the first job will fail, but the second one will still success
1568+
now = now.Add(time.Hour)
1569+
require.Error(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[0]))
1570+
require.NoError(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[1]))
1571+
1572+
now = now.Add(time.Hour)
1573+
m.UpdateHeartBeat(ctx, se, now)
1574+
tk.MustQuery("select table_id, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Sort().Check(testkit.Rows(
1575+
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).Format(time.DateTime)),
1576+
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.Format(time.DateTime))))
1577+
}
1578+
15361579
func TestIterationOfRunningJob(t *testing.T) {
15371580
store, dom := testkit.CreateMockStoreAndDomain(t)
15381581
waitAndStopTTLManager(t, dom)

pkg/ttl/ttlworker/job_manager_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,12 @@ func (m *JobManager) TaskManager() *taskManager {
186186
}
187187

188188
// UpdateHeartBeat is an exported version of updateHeartBeat for test
189-
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
190-
return m.updateHeartBeat(ctx, se, now)
189+
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
190+
m.updateHeartBeat(ctx, se, now)
191+
}
192+
193+
func (m *JobManager) UpdateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
194+
return m.updateHeartBeatForJob(ctx, se, now, job)
191195
}
192196

193197
// SetLastReportDelayMetricsTime sets the lastReportDelayMetricsTime for test

pkg/ttl/ttlworker/task_manager.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -434,32 +434,40 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID
434434
}
435435

436436
// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
437-
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
437+
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
438438
for _, task := range m.runningTasks {
439-
state := &cache.TTLTaskState{
440-
TotalRows: task.statistics.TotalRows.Load(),
441-
SuccessRows: task.statistics.SuccessRows.Load(),
442-
ErrorRows: task.statistics.ErrorRows.Load(),
443-
}
444-
if task.result != nil && task.result.err != nil {
445-
state.ScanTaskErr = task.result.err.Error()
446-
}
447-
448-
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
449-
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
450-
if err != nil {
451-
return err
452-
}
453-
_, err = se.ExecuteSQL(ctx, sql, args...)
439+
err := m.updateHeartBeatForTask(ctx, se, now, task)
454440
if err != nil {
455-
return errors.Wrapf(err, "execute sql: %s", sql)
441+
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID))
456442
}
443+
}
444+
}
457445

458-
if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
459-
return errors.Errorf("fail to update task status, maybe the owner is not myself (%s), affected rows: %d",
460-
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
461-
}
446+
func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
447+
state := &cache.TTLTaskState{
448+
TotalRows: task.statistics.TotalRows.Load(),
449+
SuccessRows: task.statistics.SuccessRows.Load(),
450+
ErrorRows: task.statistics.ErrorRows.Load(),
462451
}
452+
if task.result != nil && task.result.err != nil {
453+
state.ScanTaskErr = task.result.err.Error()
454+
}
455+
456+
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
457+
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
458+
if err != nil {
459+
return err
460+
}
461+
_, err = se.ExecuteSQL(ctx, sql, args...)
462+
if err != nil {
463+
return errors.Wrapf(err, "execute sql: %s", sql)
464+
}
465+
466+
if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
467+
return errors.Errorf("fail to update task heartbeat, maybe the owner is not myself (%s), affected rows: %d",
468+
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
469+
}
470+
463471
return nil
464472
}
465473

pkg/ttl/ttlworker/task_manager_integration_test.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
411411
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-2'").Check(testkit.Rows("4"))
412412

413413
// Then m1 cannot update the heartbeat of its task
414-
require.Error(t, m1.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour)))
414+
for i := 0; i < 4; i++ {
415+
require.Error(t, m1.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m1.GetRunningTasks()[i]))
416+
}
415417
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
416418
now.Format(time.DateTime),
417419
now.Format(time.DateTime),
@@ -420,7 +422,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
420422
))
421423

422424
// m2 can successfully update the heartbeat
423-
require.NoError(t, m2.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour)))
425+
for i := 0; i < 4; i++ {
426+
require.NoError(t, m2.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m2.GetRunningTasks()[i]))
427+
}
424428
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
425429
now.Add(time.Hour).Format(time.DateTime),
426430
now.Add(time.Hour).Format(time.DateTime),
@@ -452,3 +456,59 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
452456
`finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`,
453457
))
454458
}
459+
460+
func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
461+
store, dom := testkit.CreateMockStoreAndDomain(t)
462+
pool := dom.SysSessionPool()
463+
waitAndStopTTLManager(t, dom)
464+
tk := testkit.NewTestKit(t, store)
465+
sessionFactory := sessionFactory(t, store)
466+
467+
tk.MustExec("set global tidb_ttl_running_tasks = 32")
468+
469+
tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day")
470+
testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
471+
require.NoError(t, err)
472+
for id := 0; id < 4; id++ {
473+
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id)
474+
tk.MustExec(sql)
475+
}
476+
477+
se := sessionFactory()
478+
now := se.Now()
479+
480+
isc := cache.NewInfoSchemaCache(time.Minute)
481+
require.NoError(t, isc.Update(se))
482+
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
483+
workers := []ttlworker.Worker{}
484+
for j := 0; j < 4; j++ {
485+
scanWorker := ttlworker.NewMockScanWorker(t)
486+
scanWorker.Start()
487+
workers = append(workers, scanWorker)
488+
}
489+
m.SetScanWorkers4Test(workers)
490+
m.RescheduleTasks(se, now)
491+
492+
// All tasks should be scheduled to m1 and running
493+
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("4"))
494+
495+
// Mock the situation that the owner of task 0 has changed
496+
tk.MustExec("update mysql.tidb_ttl_task set owner_id = 'task-manager-2' where scan_id = 0")
497+
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3"))
498+
499+
now = now.Add(time.Hour)
500+
require.Error(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[0]))
501+
for i := 1; i < 4; i++ {
502+
require.NoError(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[i]))
503+
}
504+
505+
now = now.Add(time.Hour)
506+
m.UpdateHeartBeat(context.Background(), se, now)
507+
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3"))
508+
tk.MustQuery("select scan_id, owner_hb_time from mysql.tidb_ttl_task").Sort().Check(testkit.Rows(
509+
fmt.Sprintf("0 %s", now.Add(-2*time.Hour).Format(time.DateTime)),
510+
fmt.Sprintf("1 %s", now.Format(time.DateTime)),
511+
fmt.Sprintf("2 %s", now.Format(time.DateTime)),
512+
fmt.Sprintf("3 %s", now.Format(time.DateTime)),
513+
))
514+
}

pkg/ttl/ttlworker/task_manager_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,13 @@ func (m *taskManager) CheckInvalidTask(se session.Session) {
9797
}
9898

9999
// UpdateHeartBeat is an exported version of updateHeartBeat
100-
func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
101-
return m.updateHeartBeat(ctx, se, now)
100+
func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
101+
m.updateHeartBeat(ctx, se, now)
102+
}
103+
104+
// UpdateHeartBeatForTask is an exported version of updateHeartBeatForTask
105+
func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
106+
return m.updateHeartBeatForTask(ctx, se, now, task)
102107
}
103108

104109
func TestResizeWorkers(t *testing.T) {

0 commit comments

Comments
 (0)