Skip to content

Commit 695c2c7

Browse files
committed
ttl: reschedule task to other instances when shriking worker
1 parent d0de86b commit 695c2c7

File tree

3 files changed

+131
-32
lines changed

3 files changed

+131
-32
lines changed

pkg/ttl/ttlworker/del.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25+
"github.com/pingcap/log"
2526
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2627
"github.com/pingcap/tidb/pkg/ttl/cache"
2728
"github.com/pingcap/tidb/pkg/ttl/metrics"
@@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
9495
leftRows := t.rows
9596
defer func() {
9697
if len(leftRows) > 0 {
97-
t.statistics.IncErrorRows(len(leftRows))
98+
retryRows = append(retryRows, leftRows...)
9899
}
99100
}()
100101

101102
se := newTableSession(rawSe, t.tbl, t.expire)
102-
for len(leftRows) > 0 {
103+
for len(leftRows) > 0 && ctx.Err() == nil {
103104
maxBatch := variable.TTLDeleteBatchSize.Load()
104105
var delBatch [][]types.Datum
105106
if int64(len(leftRows)) < maxBatch {
@@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
133134
sqlInterval := time.Since(sqlStart)
134135
if err != nil {
135136
metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds())
136-
needRetry = needRetry && ctx.Err() == nil
137137
logutil.BgLogger().Warn(
138138
"delete SQL in TTL failed",
139139
zap.Error(err),
@@ -214,6 +214,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
214214
return b.retryInterval
215215
}
216216

217+
// SetRetryInterval sets the retry interval of the buffer.
218+
func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) {
219+
b.retryInterval = interval
220+
}
221+
217222
// Drain drains a retry buffer.
218223
func (b *ttlDelRetryBuffer) Drain() {
219224
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
@@ -296,8 +301,37 @@ func (w *ttlDeleteWorker) loop() error {
296301
timer := time.NewTimer(w.retryBuffer.retryInterval)
297302
defer timer.Stop()
298303

299-
// drain retry buffer to make sure the statistics are correct
300-
defer w.retryBuffer.Drain()
304+
defer func() {
305+
// Have a final try to delete all rows in retry buffer while the worker stops
306+
// to avoid leaving any TTL rows undeleted when shrinking the delete worker.
307+
if w.retryBuffer.Len() > 0 {
308+
start := time.Now()
309+
log.Info(
310+
"try to delete TTL rows in del worker buffer immediately because the worker is going to stop",
311+
zap.Int("bufferLen", w.retryBuffer.Len()),
312+
)
313+
retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
314+
defer cancel()
315+
w.retryBuffer.SetRetryInterval(0)
316+
w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
317+
return task.doDelete(retryCtx, se)
318+
})
319+
log.Info(
320+
"delete TTL rows in del worker buffer finished",
321+
zap.Duration("duration", time.Since(start)),
322+
)
323+
}
324+
325+
// drain retry buffer to make sure the statistics are correct
326+
if w.retryBuffer.Len() > 0 {
327+
log.Warn(
328+
"some TTL rows are still in the buffer while the worker is going to stop, mark them as error",
329+
zap.Int("bufferLen", w.retryBuffer.Len()),
330+
)
331+
w.retryBuffer.Drain()
332+
}
333+
}()
334+
301335
for w.Status() == workerStatusRunning {
302336
tracer.EnterPhase(metrics.PhaseIdle)
303337
select {

pkg/ttl/ttlworker/scan.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type ttlScanTaskExecResult struct {
8585
time time.Time
8686
task *ttlScanTask
8787
err error
88+
// interruptByWorkerStop indicates whether the task has to stop for the worker stops.
89+
// when it is true, we should reschedule this task in another worker or TiDB again.
90+
interruptByWorkerStop bool
8891
}
8992

9093
func (t *ttlScanTask) result(err error) *ttlScanTaskExecResult {
@@ -99,6 +102,17 @@ func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum {
99102
return datums
100103
}
101104

105+
func (t *ttlScanTask) taskLogger(l *zap.Logger) *zap.Logger {
106+
return l.With(
107+
zap.String("jobID", t.JobID),
108+
zap.Int64("scanID", t.ScanID),
109+
zap.Int64("tableID", t.TableID),
110+
zap.String("db", t.tbl.Schema.O),
111+
zap.String("table", t.tbl.Name.O),
112+
zap.String("partition", t.tbl.Partition.O),
113+
)
114+
}
115+
102116
func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult {
103117
// TODO: merge the ctx and the taskCtx in ttl scan task, to allow both "cancel" and gracefully stop workers
104118
// now, the taskCtx is only check at the beginning of every loop
@@ -121,13 +135,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
121135
case <-doScanFinished.Done():
122136
return
123137
}
124-
logger := logutil.BgLogger().With(
125-
zap.Int64("tableID", t.TableID),
126-
zap.String("table", t.tbl.Name.O),
127-
zap.String("partition", t.tbl.Partition.O),
128-
zap.String("jobID", t.JobID),
129-
zap.Int64("scanID", t.ScanID),
130-
)
138+
logger := t.taskLogger(logutil.BgLogger())
131139
logger.Info("kill the running statement in scan task because the task or worker cancelled")
132140
rawSess.KillStmt()
133141
ticker := time.NewTicker(time.Minute)

pkg/ttl/ttlworker/task_manager.go

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func setTTLTaskFinishedSQL(jobID string, scanID int64, state *cache.TTLTaskState
6060
return setTTLTaskFinishedTemplate, []any{now.Format(timeFormat), string(stateStr), jobID, scanID}, nil
6161
}
6262

63-
const updateTTLTaskHeartBeatTempalte = `UPDATE mysql.tidb_ttl_task
63+
const updateTTLTaskHeartBeatTemplate = `UPDATE mysql.tidb_ttl_task
6464
SET state = %?,
6565
owner_hb_time = %?
6666
WHERE job_id = %? AND scan_id = %?`
@@ -70,7 +70,18 @@ func updateTTLTaskHeartBeatSQL(jobID string, scanID int64, now time.Time, state
7070
if err != nil {
7171
return "", nil, err
7272
}
73-
return updateTTLTaskHeartBeatTempalte, []any{string(stateStr), now.Format(timeFormat), jobID, scanID}, nil
73+
return updateTTLTaskHeartBeatTemplate, []any{string(stateStr), now.Format(timeFormat), jobID, scanID}, nil
74+
}
75+
76+
const updateTTLTaskStateTemplate = `UPDATE mysql.tidb_ttl_task
77+
SET state = %? WHERE job_id = %? AND scan_id = %?`
78+
79+
func updateTTLTaskStateSQL(jobID string, scanID int64, state *cache.TTLTaskState) (string, []any, error) {
80+
stateStr, err := json.Marshal(state)
81+
if err != nil {
82+
return "", nil, err
83+
}
84+
return updateTTLTaskStateTemplate, []any{string(stateStr), jobID, scanID}, nil
7485
}
7586

7687
const countRunningTasks = "SELECT count(1) FROM mysql.tidb_ttl_task WHERE status = 'running'"
@@ -160,6 +171,8 @@ func (m *taskManager) resizeScanWorkers(count int) error {
160171
jobID = curTask.JobID
161172
scanID = curTask.ScanID
162173
scanErr = errors.New("timeout to cancel scan task")
174+
175+
result = curTask.result(scanErr)
163176
}
164177

165178
task := findTaskWithID(m.runningTasks, jobID, scanID)
@@ -169,6 +182,7 @@ func (m *taskManager) resizeScanWorkers(count int) error {
169182
}
170183
logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", task.JobID), zap.Int64("taskID", task.ScanID), zap.Error(scanErr))
171184

185+
result.interruptByWorkerStop = true
172186
task.result = result
173187
}
174188
return err
@@ -439,40 +453,83 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID
439453
// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
440454
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
441455
for _, task := range m.runningTasks {
442-
state := &cache.TTLTaskState{
443-
TotalRows: task.statistics.TotalRows.Load(),
444-
SuccessRows: task.statistics.SuccessRows.Load(),
445-
ErrorRows: task.statistics.ErrorRows.Load(),
446-
}
447-
if task.result != nil && task.result.err != nil {
448-
state.ScanTaskErr = task.result.err.Error()
456+
if err := m.updateTaskState(ctx, se, task, true, now); err != nil {
457+
task.taskLogger(logutil.Logger(m.ctx)).Warn("fail to heartbeat task", zap.Error(err))
449458
}
459+
}
460+
return nil
461+
}
450462

451-
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
452-
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state)
453-
if err != nil {
454-
return err
455-
}
456-
_, err = se.ExecuteSQL(ctx, sql, args...)
457-
if err != nil {
458-
return errors.Wrapf(err, "execute sql: %s", sql)
459-
}
463+
func (m *taskManager) updateTaskState(ctx context.Context, se session.Session, task *runningScanTask, heartbeat bool, now time.Time) error {
464+
state := &cache.TTLTaskState{
465+
TotalRows: task.statistics.TotalRows.Load(),
466+
SuccessRows: task.statistics.SuccessRows.Load(),
467+
ErrorRows: task.statistics.ErrorRows.Load(),
468+
}
469+
470+
if prevState := task.TTLTask.State; prevState != nil {
471+
// If a task was timeout and taken over by the current instance,
472+
// adding the previous state to the current state to make the statistics more accurate.
473+
state.TotalRows += prevState.SuccessRows + prevState.ErrorRows
474+
state.SuccessRows += prevState.SuccessRows
475+
state.ErrorRows += prevState.ErrorRows
476+
}
477+
478+
if task.result != nil && task.result.err != nil {
479+
state.ScanTaskErr = task.result.err.Error()
480+
}
481+
482+
intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
483+
var sql string
484+
var args []any
485+
var err error
486+
if heartbeat {
487+
sql, args, err = updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state)
488+
} else {
489+
sql, args, err = updateTTLTaskStateSQL(task.JobID, task.ScanID, state)
490+
}
491+
if err != nil {
492+
return err
493+
}
494+
_, err = se.ExecuteSQL(ctx, sql, args...)
495+
if err != nil {
496+
return errors.Wrapf(err, "execute sql: %s", sql)
460497
}
461498
return nil
462499
}
463500

464501
func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) {
465502
stillRunningTasks := make([]*runningScanTask, 0, len(m.runningTasks))
466503
for _, task := range m.runningTasks {
467-
if !task.finished(logutil.Logger(m.ctx)) {
504+
interruptByWorkerStop := task.result != nil && task.result.interruptByWorkerStop
505+
// If the task is interrupted or finished, we should remove it from memory.
506+
// Otherwise, we should keep it in memory and wait for the next round to check.
507+
if !interruptByWorkerStop && !task.finished(logutil.Logger(m.ctx)) {
468508
stillRunningTasks = append(stillRunningTasks, task)
469509
continue
470510
}
471511
// we should cancel task to release inner context and avoid memory leak
472512
task.cancel()
513+
514+
logger := task.taskLogger(logutil.Logger(m.ctx))
515+
// When a task cannot continue running because there is no worker to run it,
516+
// we should only remove it from memory without reporting its final status.
517+
// Then some other instance will take over this task when its heartbeat timeout.
518+
if interruptByWorkerStop {
519+
logger.Info(
520+
"remove a task from memory without mark it as finished because it is from a stopped worker",
521+
)
522+
if err := m.updateTaskState(m.ctx, se, task, false, now); err != nil {
523+
logger.Warn("fail to report a interrupted task", zap.Error(err))
524+
}
525+
continue
526+
}
527+
528+
// Update the meta of a task to mark it as finished.
473529
err := m.reportTaskFinished(se, now, task)
474530
if err != nil {
475-
logutil.Logger(m.ctx).Error("fail to report finished task", zap.Error(err))
531+
logger.Error(
532+
"fail to report finished task", zap.Error(err))
476533
stillRunningTasks = append(stillRunningTasks, task)
477534
continue
478535
}

0 commit comments

Comments
 (0)