Skip to content

Commit 719f68e

Browse files
authored
ttl: cancel the hearbeat timeout job after disable the TTL (pingcap#57452) (pingcap#57488)
close pingcap#57404
1 parent fb15e5e commit 719f68e

File tree

2 files changed

+52
-10
lines changed

2 files changed

+52
-10
lines changed

pkg/ttl/ttlworker/job_manager.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,18 @@ j:
562562
}
563563

564564
func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
565+
// Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will
566+
// never be cancelled.
567+
jobTables := m.readyForLockHBTimeoutJobTables(now)
568+
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
569+
// when the heart beat is not sent
570+
for _, table := range jobTables {
571+
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
572+
if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil {
573+
logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err))
574+
}
575+
}
576+
565577
if !variable.EnableTTLJob.Load() || !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) {
566578
if len(m.runningJobs) > 0 {
567579
for _, job := range m.runningJobs {
@@ -602,16 +614,6 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
602614
}
603615
m.removeJob(job)
604616
}
605-
606-
jobTables := m.readyForLockHBTimeoutJobTables(now)
607-
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
608-
// when the heart beat is not sent
609-
for _, table := range jobTables {
610-
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
611-
if _, err := m.lockHBTimeoutJob(m.ctx, se, table, now); err != nil {
612-
logutil.Logger(m.ctx).Warn("failed to lock heartbeat timeout job", zap.Error(err))
613-
}
614-
}
615617
}
616618

617619
func (m *JobManager) localJobs() []*ttlJob {

pkg/ttl/ttlworker/job_manager_integration_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,3 +1393,43 @@ func TestFinishError(t *testing.T) {
13931393
m.UpdateHeartBeat(context.Background(), se, now)
13941394
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
13951395
}
1396+
1397+
func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
1398+
store, dom := testkit.CreateMockStoreAndDomain(t)
1399+
waitAndStopTTLManager(t, dom)
1400+
tk := testkit.NewTestKit(t, store)
1401+
1402+
sessionFactory := sessionFactory(t, store)
1403+
se := sessionFactory()
1404+
1405+
tk.MustExec("use test")
1406+
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
1407+
testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
1408+
require.NoError(t, err)
1409+
1410+
ctx := context.Background()
1411+
m1 := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)
1412+
require.NoError(t, m1.InfoSchemaCache().Update(se))
1413+
require.NoError(t, m1.TableStatusCache().Update(ctx, se))
1414+
1415+
now := se.Now()
1416+
_, err = m1.LockJob(context.Background(), se, m1.InfoSchemaCache().Tables[testTable.Meta().ID], now, uuid.NewString(), false)
1417+
require.NoError(t, err)
1418+
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running"))
1419+
1420+
// lose heartbeat. Simulate the situation that m1 doesn't update the hearbeat for 8 hours.
1421+
now = now.Add(time.Hour * 8)
1422+
1423+
// stop the tidb_ttl_job_enable
1424+
tk.MustExec("set global tidb_ttl_job_enable = 'OFF'")
1425+
defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'")
1426+
1427+
// reschedule and try to get the job
1428+
m2 := ttlworker.NewJobManager("test-ttl-job-manager-2", nil, store, nil, nil)
1429+
require.NoError(t, m2.InfoSchemaCache().Update(se))
1430+
require.NoError(t, m2.TableStatusCache().Update(ctx, se))
1431+
m2.RescheduleJobs(se, now)
1432+
1433+
// the job should have been cancelled
1434+
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
1435+
}

0 commit comments

Comments
 (0)