Skip to content

Commit f4539b2

Browse files
YangKeaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#57919
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 7362832 commit f4539b2

File tree

6 files changed

+373
-29
lines changed

6 files changed

+373
-29
lines changed

pkg/ttl/ttlworker/job_manager.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,7 @@ func (m *JobManager) jobLoop() error {
224224
// Job Schedule loop:
225225
case <-updateJobHeartBeatTicker:
226226
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
227-
err = m.updateHeartBeat(updateHeartBeatCtx, se, now)
228-
if err != nil {
229-
logutil.Logger(m.ctx).Warn("fail to update job heart beat", zap.Error(err))
230-
}
227+
m.updateHeartBeat(updateHeartBeatCtx, se, now)
231228
cancel()
232229
case <-jobCheckTicker:
233230
m.checkFinishedJob(se)
@@ -270,10 +267,7 @@ func (m *JobManager) jobLoop() error {
270267
m.taskManager.resizeWorkersWithSysVar()
271268
case <-updateTaskHeartBeatTicker:
272269
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
273-
err = m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
274-
if err != nil {
275-
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err))
276-
}
270+
m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
277271
cancel()
278272
case <-checkScanTaskFinishedTicker:
279273
if m.taskManager.handleScanFinishedTask() {
@@ -903,29 +897,42 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t
903897
}
904898

905899
// updateHeartBeat updates the heartbeat for all task with current instance as owner
906-
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
900+
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
907901
for _, job := range m.localJobs() {
908-
if job.createTime.Add(ttlJobTimeout).Before(now) {
909-
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
910-
summary, err := summarizeErr(errors.New("job is timeout"))
911-
if err != nil {
912-
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
913-
}
914-
err = job.finish(se, now, summary)
915-
if err != nil {
916-
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
917-
continue
918-
}
919-
m.removeJob(job)
902+
err := m.updateHeartBeatForJob(ctx, se, now, job)
903+
if err != nil {
904+
logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id))
920905
}
906+
}
907+
}
921908

922-
intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
923-
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
924-
_, err := se.ExecuteSQL(ctx, sql, args...)
909+
func (m *JobManager) updateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
910+
if job.createTime.Add(ttlJobTimeout).Before(now) {
911+
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
912+
summary, err := summarizeErr(errors.New("job is timeout"))
925913
if err != nil {
926-
return errors.Wrapf(err, "execute sql: %s", sql)
914+
return errors.Wrapf(err, "fail to summarize job")
927915
}
916+
err = job.finish(se, now, summary)
917+
if err != nil {
918+
return errors.Wrapf(err, "fail to finish job")
919+
}
920+
m.removeJob(job)
921+
return nil
928922
}
923+
924+
intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
925+
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
926+
_, err := se.ExecuteSQL(ctx, sql, args...)
927+
if err != nil {
928+
return errors.Wrapf(err, "execute sql: %s", sql)
929+
}
930+
931+
if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
932+
return errors.Errorf("fail to update job heartbeat, maybe the owner is not myself (%s), affected rows: %d",
933+
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
934+
}
935+
929936
return nil
930937
}
931938

pkg/ttl/ttlworker/job_manager_integration_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ func TestJobTimeout(t *testing.T) {
677677
require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix())
678678

679679
// the timeout will be checked while updating heartbeat
680-
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
680+
require.NoError(t, m2.UpdateHeartBeatForJob(ctx, se, now.Add(7*time.Hour), m2.RunningJobs()[0]))
681681
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout"))
682682
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
683683
}
@@ -1499,3 +1499,46 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
14991499
// the job should have been cancelled
15001500
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
15011501
}
1502+
1503+
func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
1504+
store, dom := testkit.CreateMockStoreAndDomain(t)
1505+
waitAndStopTTLManager(t, dom)
1506+
tk := testkit.NewTestKit(t, store)
1507+
1508+
sessionFactory := sessionFactory(t, store)
1509+
se := sessionFactory()
1510+
1511+
tk.MustExec("use test")
1512+
tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
1513+
tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
1514+
testTable1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
1515+
require.NoError(t, err)
1516+
testTable2, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t2"))
1517+
require.NoError(t, err)
1518+
1519+
ctx := context.Background()
1520+
m := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
1521+
1522+
now := se.Now()
1523+
// acquire two jobs
1524+
require.NoError(t, m.InfoSchemaCache().Update(se))
1525+
require.NoError(t, m.TableStatusCache().Update(ctx, se))
1526+
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable1.Meta().ID], now, uuid.NewString(), false)
1527+
require.NoError(t, err)
1528+
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable2.Meta().ID], now, uuid.NewString(), false)
1529+
require.NoError(t, err)
1530+
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running", "running"))
1531+
1532+
// assign the first job to another manager
1533+
tk.MustExec("update mysql.tidb_ttl_table_status set current_job_owner_id = 'test-ttl-job-manager-2' where table_id = ?", testTable1.Meta().ID)
1534+
// the heartbeat of the first job will fail, but the second one will still success
1535+
now = now.Add(time.Hour)
1536+
require.Error(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[0]))
1537+
require.NoError(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[1]))
1538+
1539+
now = now.Add(time.Hour)
1540+
m.UpdateHeartBeat(ctx, se, now)
1541+
tk.MustQuery("select table_id, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Sort().Check(testkit.Rows(
1542+
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).Format(time.DateTime)),
1543+
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.Format(time.DateTime))))
1544+
}

pkg/ttl/ttlworker/job_manager_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,12 @@ func (m *JobManager) TaskManager() *taskManager {
194194
}
195195

196196
// UpdateHeartBeat is an exported version of updateHeartBeat for test
197-
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
198-
return m.updateHeartBeat(ctx, se, now)
197+
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
198+
m.updateHeartBeat(ctx, se, now)
199+
}
200+
201+
func (m *JobManager) UpdateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
202+
return m.updateHeartBeatForJob(ctx, se, now, job)
199203
}
200204

201205
// ReportMetrics is an exported version of reportMetrics

pkg/ttl/ttlworker/task_manager.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,9 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID
437437
}
438438

439439
// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
440-
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
440+
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
441441
for _, task := range m.runningTasks {
442+
<<<<<<< HEAD
442443
state := &cache.TTLTaskState{
443444
TotalRows: task.statistics.TotalRows.Load(),
444445
SuccessRows: task.statistics.SuccessRows.Load(),
@@ -456,8 +457,40 @@ func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, n
456457
_, err = se.ExecuteSQL(ctx, sql, args...)
457458
if err != nil {
458459
return errors.Wrapf(err, "execute sql: %s", sql)
460+
=======
461+
err := m.updateHeartBeatForTask(ctx, se, now, task)
462+
if err != nil {
463+
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID))
464+
>>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919))
459465
}
460466
}
467+
}
468+
469+
func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
470+
state := &cache.TTLTaskState{
471+
TotalRows: task.statistics.TotalRows.Load(),
472+
SuccessRows: task.statistics.SuccessRows.Load(),
473+
ErrorRows: task.statistics.ErrorRows.Load(),
474+
}
475+
if task.result != nil && task.result.err != nil {
476+
state.ScanTaskErr = task.result.err.Error()
477+
}
478+
479+
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
480+
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
481+
if err != nil {
482+
return err
483+
}
484+
_, err = se.ExecuteSQL(ctx, sql, args...)
485+
if err != nil {
486+
return errors.Wrapf(err, "execute sql: %s", sql)
487+
}
488+
489+
if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
490+
return errors.Errorf("fail to update task heartbeat, maybe the owner is not myself (%s), affected rows: %d",
491+
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
492+
}
493+
461494
return nil
462495
}
463496

0 commit comments

Comments
 (0)