Skip to content

Commit 02ccbad

Browse files
YangKeaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#59348
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 45ee207 commit 02ccbad

File tree

3 files changed

+58
-4
lines changed

3 files changed

+58
-4
lines changed

pkg/ttl/client/notification.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ loop:
7575
return ctx.Err()
7676
case ch <- clientv3.WatchResponse{}:
7777
default:
78-
unsent = make([]chan clientv3.WatchResponse, len(watchers), 0)
78+
unsent = make([]chan clientv3.WatchResponse, len(watchers))
7979
copy(unsent, watchers[i:])
8080
break loop
8181
}

pkg/ttl/ttlworker/job_manager.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,10 @@ func (m *JobManager) reportMetrics(se session.Session) {
503503

504504
// checkNotOwnJob removes the job whose current job owner is not yourself
505505
func (m *JobManager) checkNotOwnJob() {
506-
for _, job := range m.runningJobs {
506+
// reverse iteration so that we could remove the job safely in the loop
507+
for i := len(m.runningJobs) - 1; i >= 0; i-- {
508+
job := m.runningJobs[i]
509+
507510
tableStatus := m.tableStatusCache.Tables[job.tbl.ID]
508511
if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id {
509512
logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id))
@@ -517,8 +520,11 @@ func (m *JobManager) checkNotOwnJob() {
517520
}
518521

519522
func (m *JobManager) checkFinishedJob(se session.Session) {
523+
// reverse iteration so that we could remove the job safely in the loop
520524
j:
521-
for _, job := range m.runningJobs {
525+
for i := len(m.runningJobs) - 1; i >= 0; i-- {
526+
job := m.runningJobs[i]
527+
522528
timeoutJobCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
523529

524530
sql, args := cache.SelectFromTTLTaskWithJobID(job.id)
@@ -576,10 +582,24 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
576582

577583
if !variable.EnableTTLJob.Load() || !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) {
578584
if len(m.runningJobs) > 0 {
585+
<<<<<<< HEAD
579586
for _, job := range m.runningJobs {
580587
logutil.Logger(m.ctx).Info("cancel job because tidb_ttl_job_enable turned off", zap.String("jobID", job.id))
581588

582589
summary, err := summarizeErr(errors.New("ttl job is disabled"))
590+
=======
591+
// reverse iteration so that we could remove the job safely in the loop
592+
for i := len(m.runningJobs) - 1; i >= 0; i-- {
593+
job := m.runningJobs[i]
594+
595+
logger := logutil.Logger(m.ctx).With(
596+
zap.String("jobID", job.id),
597+
zap.Int64("tableID", job.tbl.ID),
598+
zap.String("table", job.tbl.FullName()),
599+
)
600+
logger.Info(fmt.Sprintf("cancel job because %s", cancelReason))
601+
summary, err := summarizeErr(errors.New(cancelReason))
602+
>>>>>>> b7aafa67ec2 (ttl: fix the issue that the TTL jobs are skipped or handled multiple times in one iteration (#59348))
583603
if err != nil {
584604
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
585605
}
@@ -595,7 +615,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
595615
}
596616

597617
// if the table of a running job disappears, also cancel it
598-
for _, job := range m.runningJobs {
618+
// reverse iteration so that we could remove the job safely in the loop
619+
for i := len(m.runningJobs) - 1; i >= 0; i-- {
620+
job := m.runningJobs[i]
621+
599622
_, ok := m.infoSchemaCache.Tables[job.tbl.ID]
600623
if ok {
601624
continue

pkg/ttl/ttlworker/job_manager_integration_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,3 +1468,34 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
14681468
// the job should have been cancelled
14691469
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("<nil>"))
14701470
}
1471+
1472+
func TestIterationOfRunningJob(t *testing.T) {
1473+
store, dom := testkit.CreateMockStoreAndDomain(t)
1474+
waitAndStopTTLManager(t, dom)
1475+
sessionFactory := sessionFactory(t, dom)
1476+
1477+
tk := testkit.NewTestKit(t, store)
1478+
m := ttlworker.NewJobManager("test-job-manager", dom.SysSessionPool(), store, nil, func() bool { return true })
1479+
1480+
se := sessionFactory()
1481+
defer se.Close()
1482+
for tableID := int64(0); tableID < 100; tableID++ {
1483+
testTable := &cache.PhysicalTable{ID: tableID, TableInfo: &model.TableInfo{ID: tableID, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay), JobInterval: "1h"}}}
1484+
m.InfoSchemaCache().Tables[testTable.ID] = testTable
1485+
1486+
jobID := uuid.NewString()
1487+
_, err := m.LockJob(context.Background(), se, testTable, se.Now(), jobID, false)
1488+
require.NoError(t, err)
1489+
tk.MustQuery("SELECT current_job_id, current_job_owner_id FROM mysql.tidb_ttl_table_status WHERE table_id = ?", tableID).Check(testkit.Rows(fmt.Sprintf("%s %s", jobID, m.ID())))
1490+
1491+
// update the owner id
1492+
tk.MustExec("UPDATE mysql.tidb_ttl_table_status SET current_job_owner_id = 'another-id' WHERE current_job_id = ?", jobID)
1493+
}
1494+
require.NoError(t, m.TableStatusCache().Update(context.Background(), se))
1495+
1496+
require.Len(t, m.RunningJobs(), 100)
1497+
m.CheckNotOwnJob()
1498+
1499+
// Now all the jobs should have been removed
1500+
require.Len(t, m.RunningJobs(), 0)
1501+
}

0 commit comments

Comments
 (0)