Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ttl/client/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ loop:
return ctx.Err()
case ch <- clientv3.WatchResponse{}:
default:
unsent = make([]chan clientv3.WatchResponse, len(watchers), 0)
unsent = make([]chan clientv3.WatchResponse, len(watchers))
copy(unsent, watchers[i:])
break loop
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ func (m *JobManager) reportMetrics(se session.Session) {

// checkNotOwnJob removes the job whose current job owner is not yourself
func (m *JobManager) checkNotOwnJob() {
for _, job := range m.runningJobs {
// reverse iteration so that we could remove the job safely in the loop
for i := len(m.runningJobs) - 1; i >= 0; i-- {
job := m.runningJobs[i]

tableStatus := m.tableStatusCache.Tables[job.tbl.ID]
if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id))
Expand All @@ -531,8 +534,11 @@ func (m *JobManager) checkNotOwnJob() {
}

func (m *JobManager) checkFinishedJob(se session.Session) {
// reverse iteration so that we could remove the job safely in the loop
j:
for _, job := range m.runningJobs {
for i := len(m.runningJobs) - 1; i >= 0; i-- {
job := m.runningJobs[i]

timeoutJobCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)

sql, args := cache.SelectFromTTLTaskWithJobID(job.id)
Expand Down Expand Up @@ -612,7 +618,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {

if cancelJobs {
if len(m.runningJobs) > 0 {
for _, job := range m.runningJobs {
// reverse iteration so that we could remove the job safely in the loop
for i := len(m.runningJobs) - 1; i >= 0; i-- {
job := m.runningJobs[i]

logger := logutil.Logger(m.ctx).With(
zap.String("jobID", job.id),
zap.Int64("tableID", job.tbl.ID),
Expand All @@ -635,7 +644,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
}

// if the table of a running job disappears, also cancel it
for _, job := range m.runningJobs {
// reverse iteration so that we could remove the job safely in the loop
for i := len(m.runningJobs) - 1; i >= 0; i-- {
job := m.runningJobs[i]

_, ok := m.infoSchemaCache.Tables[job.tbl.ID]
if ok {
continue
Expand Down
31 changes: 31 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1916,3 +1916,34 @@ func TestTimerJobAfterDropTable(t *testing.T) {
require.NotNil(t, job)
require.True(t, job.Finished)
}

func TestIterationOfRunningJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
sessionFactory := sessionFactory(t, dom)

tk := testkit.NewTestKit(t, store)
m := ttlworker.NewJobManager("test-job-manager", dom.SysSessionPool(), store, nil, func() bool { return true })

se := sessionFactory()
defer se.Close()
for tableID := int64(0); tableID < 100; tableID++ {
testTable := &cache.PhysicalTable{ID: tableID, TableInfo: &model.TableInfo{ID: tableID, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay), JobInterval: "1h"}}}
m.InfoSchemaCache().Tables[testTable.ID] = testTable

jobID := uuid.NewString()
_, err := m.LockJob(context.Background(), se, testTable, se.Now(), jobID, false)
require.NoError(t, err)
tk.MustQuery("SELECT current_job_id, current_job_owner_id FROM mysql.tidb_ttl_table_status WHERE table_id = ?", tableID).Check(testkit.Rows(fmt.Sprintf("%s %s", jobID, m.ID())))

// update the owner id
tk.MustExec("UPDATE mysql.tidb_ttl_table_status SET current_job_owner_id = 'another-id' WHERE current_job_id = ?", jobID)
}
require.NoError(t, m.TableStatusCache().Update(context.Background(), se))

require.Len(t, m.RunningJobs(), 100)
m.CheckNotOwnJob()

// Now all the jobs should have been removed
require.Len(t, m.RunningJobs(), 0)
}