Skip to content

Commit 68dcf8a

Browse files
YangKeaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#57919
Signed-off-by: ti-chi-bot <[email protected]>
1 parent d91b100 commit 68dcf8a

File tree

6 files changed

+373
-29
lines changed

6 files changed

+373
-29
lines changed

pkg/ttl/ttlworker/job_manager.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,7 @@ func (m *JobManager) jobLoop() error {
222222
// Job Schedule loop:
223223
case <-updateJobHeartBeatTicker:
224224
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
225-
err = m.updateHeartBeat(updateHeartBeatCtx, se, now)
226-
if err != nil {
227-
logutil.Logger(m.ctx).Warn("fail to update job heart beat", zap.Error(err))
228-
}
225+
m.updateHeartBeat(updateHeartBeatCtx, se, now)
229226
cancel()
230227
case <-jobCheckTicker:
231228
m.checkFinishedJob(se)
@@ -268,10 +265,7 @@ func (m *JobManager) jobLoop() error {
268265
m.taskManager.resizeWorkersWithSysVar()
269266
case <-updateTaskHeartBeatTicker:
270267
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
271-
err = m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
272-
if err != nil {
273-
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err))
274-
}
268+
m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
275269
cancel()
276270
case <-checkScanTaskFinishedTicker:
277271
if m.taskManager.handleScanFinishedTask() {
@@ -878,29 +872,42 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t
878872
}
879873

880874
// updateHeartBeat updates the heartbeat for all task with current instance as owner
881-
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
875+
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
882876
for _, job := range m.localJobs() {
883-
if job.createTime.Add(ttlJobTimeout).Before(now) {
884-
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
885-
summary, err := summarizeErr(errors.New("job is timeout"))
886-
if err != nil {
887-
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
888-
}
889-
err = job.finish(se, now, summary)
890-
if err != nil {
891-
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
892-
continue
893-
}
894-
m.removeJob(job)
877+
err := m.updateHeartBeatForJob(ctx, se, now, job)
878+
if err != nil {
879+
logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id))
895880
}
881+
}
882+
}
896883

897-
intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
898-
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
899-
_, err := se.ExecuteSQL(ctx, sql, args...)
884+
func (m *JobManager) updateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
885+
if job.createTime.Add(ttlJobTimeout).Before(now) {
886+
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
887+
summary, err := summarizeErr(errors.New("job is timeout"))
900888
if err != nil {
901-
return errors.Wrapf(err, "execute sql: %s", sql)
889+
return errors.Wrapf(err, "fail to summarize job")
902890
}
891+
err = job.finish(se, now, summary)
892+
if err != nil {
893+
return errors.Wrapf(err, "fail to finish job")
894+
}
895+
m.removeJob(job)
896+
return nil
903897
}
898+
899+
intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
900+
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
901+
_, err := se.ExecuteSQL(ctx, sql, args...)
902+
if err != nil {
903+
return errors.Wrapf(err, "execute sql: %s", sql)
904+
}
905+
906+
if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
907+
return errors.Errorf("fail to update job heartbeat, maybe the owner is not myself (%s), affected rows: %d",
908+
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
909+
}
910+
904911
return nil
905912
}
906913

pkg/ttl/ttlworker/job_manager_integration_test.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ func TestJobTimeout(t *testing.T) {
705705
require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix())
706706

707707
// the timeout will be checked while updating heartbeat
708-
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
708+
require.NoError(t, m2.UpdateHeartBeatForJob(ctx, se, now.Add(7*time.Hour), m2.RunningJobs()[0]))
709709
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout"))
710710
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
711711
}
@@ -1530,3 +1530,46 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
15301530
// the job should have been cancelled
15311531
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
15321532
}
1533+
1534+
func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
1535+
store, dom := testkit.CreateMockStoreAndDomain(t)
1536+
waitAndStopTTLManager(t, dom)
1537+
tk := testkit.NewTestKit(t, store)
1538+
1539+
sessionFactory := sessionFactory(t, store)
1540+
se := sessionFactory()
1541+
1542+
tk.MustExec("use test")
1543+
tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
1544+
tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
1545+
testTable1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
1546+
require.NoError(t, err)
1547+
testTable2, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t2"))
1548+
require.NoError(t, err)
1549+
1550+
ctx := context.Background()
1551+
m := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
1552+
1553+
now := se.Now()
1554+
// acquire two jobs
1555+
require.NoError(t, m.InfoSchemaCache().Update(se))
1556+
require.NoError(t, m.TableStatusCache().Update(ctx, se))
1557+
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable1.Meta().ID], now, uuid.NewString(), false)
1558+
require.NoError(t, err)
1559+
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable2.Meta().ID], now, uuid.NewString(), false)
1560+
require.NoError(t, err)
1561+
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running", "running"))
1562+
1563+
// assign the first job to another manager
1564+
tk.MustExec("update mysql.tidb_ttl_table_status set current_job_owner_id = 'test-ttl-job-manager-2' where table_id = ?", testTable1.Meta().ID)
1565+
// the heartbeat of the first job will fail, but the second one will still success
1566+
now = now.Add(time.Hour)
1567+
require.Error(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[0]))
1568+
require.NoError(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[1]))
1569+
1570+
now = now.Add(time.Hour)
1571+
m.UpdateHeartBeat(ctx, se, now)
1572+
tk.MustQuery("select table_id, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Sort().Check(testkit.Rows(
1573+
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).Format(time.DateTime)),
1574+
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.Format(time.DateTime))))
1575+
}

pkg/ttl/ttlworker/job_manager_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,12 @@ func (m *JobManager) TaskManager() *taskManager {
192192
}
193193

194194
// UpdateHeartBeat is an exported version of updateHeartBeat for test
195-
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
196-
return m.updateHeartBeat(ctx, se, now)
195+
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
196+
m.updateHeartBeat(ctx, se, now)
197+
}
198+
199+
func (m *JobManager) UpdateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
200+
return m.updateHeartBeatForJob(ctx, se, now, job)
197201
}
198202

199203
// ReportMetrics is an exported version of reportMetrics

pkg/ttl/ttlworker/task_manager.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,9 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID
436436
}
437437

438438
// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
439-
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
439+
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
440440
for _, task := range m.runningTasks {
441+
<<<<<<< HEAD
441442
state := &cache.TTLTaskState{
442443
TotalRows: task.statistics.TotalRows.Load(),
443444
SuccessRows: task.statistics.SuccessRows.Load(),
@@ -455,8 +456,40 @@ func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, n
455456
_, err = se.ExecuteSQL(ctx, sql, args...)
456457
if err != nil {
457458
return errors.Wrapf(err, "execute sql: %s", sql)
459+
=======
460+
err := m.updateHeartBeatForTask(ctx, se, now, task)
461+
if err != nil {
462+
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID))
463+
>>>>>>> 0392cdda767 (ttl: fix the issue that one task losing heartbeat will block other tasks (#57919))
458464
}
459465
}
466+
}
467+
468+
func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
469+
state := &cache.TTLTaskState{
470+
TotalRows: task.statistics.TotalRows.Load(),
471+
SuccessRows: task.statistics.SuccessRows.Load(),
472+
ErrorRows: task.statistics.ErrorRows.Load(),
473+
}
474+
if task.result != nil && task.result.err != nil {
475+
state.ScanTaskErr = task.result.err.Error()
476+
}
477+
478+
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
479+
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
480+
if err != nil {
481+
return err
482+
}
483+
_, err = se.ExecuteSQL(ctx, sql, args...)
484+
if err != nil {
485+
return errors.Wrapf(err, "execute sql: %s", sql)
486+
}
487+
488+
if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
489+
return errors.Errorf("fail to update task heartbeat, maybe the owner is not myself (%s), affected rows: %d",
490+
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
491+
}
492+
460493
return nil
461494
}
462495

0 commit comments

Comments
 (0)