Skip to content

Commit 6a30db5

Browse files
committed
ttl: reschedule task to other instances when shriking worker
1 parent d0de86b commit 6a30db5

File tree

3 files changed

+55
-13
lines changed

3 files changed

+55
-13
lines changed

pkg/ttl/ttlworker/del.go

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25+
"github.com/pingcap/log"
2526
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2627
"github.com/pingcap/tidb/pkg/ttl/cache"
2728
"github.com/pingcap/tidb/pkg/ttl/metrics"
@@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
9495
leftRows := t.rows
9596
defer func() {
9697
if len(leftRows) > 0 {
97-
t.statistics.IncErrorRows(len(leftRows))
98+
retryRows = append(retryRows, leftRows...)
9899
}
99100
}()
100101

101102
se := newTableSession(rawSe, t.tbl, t.expire)
102-
for len(leftRows) > 0 {
103+
for len(leftRows) > 0 && ctx.Err() == nil {
103104
maxBatch := variable.TTLDeleteBatchSize.Load()
104105
var delBatch [][]types.Datum
105106
if int64(len(leftRows)) < maxBatch {
@@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
133134
sqlInterval := time.Since(sqlStart)
134135
if err != nil {
135136
metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds())
136-
needRetry = needRetry && ctx.Err() == nil
137137
logutil.BgLogger().Warn(
138138
"delete SQL in TTL failed",
139139
zap.Error(err),
@@ -214,6 +214,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
214214
return b.retryInterval
215215
}
216216

217+
// SetRetryInterval sets the retry interval of the buffer.
218+
func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) {
219+
b.retryInterval = interval
220+
}
221+
217222
// Drain drains a retry buffer.
218223
func (b *ttlDelRetryBuffer) Drain() {
219224
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
@@ -296,8 +301,36 @@ func (w *ttlDeleteWorker) loop() error {
296301
timer := time.NewTimer(w.retryBuffer.retryInterval)
297302
defer timer.Stop()
298303

299-
// drain retry buffer to make sure the statistics are correct
300-
defer w.retryBuffer.Drain()
304+
defer func() {
305+
// have a final try for all rows in retry buffer before the worker stops
306+
if w.retryBuffer.Len() > 0 {
307+
start := time.Now()
308+
log.Info(
309+
"try to delete TTL rows in del worker buffer immediately because the worker is going to stop",
310+
zap.Int("bufferLen", w.retryBuffer.Len()),
311+
)
312+
retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
313+
defer cancel()
314+
w.retryBuffer.SetRetryInterval(0)
315+
w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
316+
return task.doDelete(retryCtx, se)
317+
})
318+
log.Info(
319+
"delete TTL rows in del worker buffer finished",
320+
zap.Duration("duration", time.Since(start)),
321+
)
322+
}
323+
324+
// drain retry buffer to make sure the statistics are correct
325+
if w.retryBuffer.Len() > 0 {
326+
log.Warn(
327+
"some TTL rows are still in the buffer while the worker is going to stop, mark them as error",
328+
zap.Int("bufferLen", w.retryBuffer.Len()),
329+
)
330+
w.retryBuffer.Drain()
331+
}
332+
}()
333+
301334
for w.Status() == workerStatusRunning {
302335
tracer.EnterPhase(metrics.PhaseIdle)
303336
select {

pkg/ttl/ttlworker/scan.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ type ttlScanTask struct {
8282
}
8383

8484
type ttlScanTaskExecResult struct {
85-
time time.Time
86-
task *ttlScanTask
87-
err error
85+
time time.Time
86+
task *ttlScanTask
87+
reschedule bool
88+
err error
8889
}
8990

9091
func (t *ttlScanTask) result(err error) *ttlScanTaskExecResult {

pkg/ttl/ttlworker/task_manager.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ func (m *taskManager) resizeScanWorkers(count int) error {
146146
var scanErr error
147147
result := s.PollTaskResult()
148148
if result != nil {
149+
result.reschedule = true
149150
jobID = result.task.JobID
150151
scanID = result.task.ScanID
151152

@@ -470,11 +471,13 @@ func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) {
470471
}
471472
// we should cancel task to release inner context and avoid memory leak
472473
task.cancel()
473-
err := m.reportTaskFinished(se, now, task)
474-
if err != nil {
475-
logutil.Logger(m.ctx).Error("fail to report finished task", zap.Error(err))
476-
stillRunningTasks = append(stillRunningTasks, task)
477-
continue
474+
if !task.result.reschedule {
475+
err := m.reportTaskFinished(se, now, task)
476+
if err != nil {
477+
logutil.Logger(m.ctx).Error("fail to report finished task", zap.Error(err))
478+
stillRunningTasks = append(stillRunningTasks, task)
479+
continue
480+
}
478481
}
479482
}
480483

@@ -613,6 +616,11 @@ func (t *runningScanTask) finished(logger *zap.Logger) bool {
613616
zap.String("table", t.tbl.Name.O),
614617
)
615618

619+
if t.result.reschedule {
620+
logger.Info("task should be rescheduled in other instances")
621+
return true
622+
}
623+
616624
totalRows := t.statistics.TotalRows.Load()
617625
errRows := t.statistics.ErrorRows.Load()
618626
successRows := t.statistics.SuccessRows.Load()

0 commit comments

Comments
 (0)