Skip to content

Commit eaee3fe

Browse files
committed
ttl: reschedule task to other instances when shriking worker
1 parent ba791ab commit eaee3fe

File tree

3 files changed

+188
-45
lines changed

3 files changed

+188
-45
lines changed

pkg/ttl/ttlworker/del.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25+
"github.com/pingcap/log"
2526
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2627
"github.com/pingcap/tidb/pkg/ttl/cache"
2728
"github.com/pingcap/tidb/pkg/ttl/metrics"
@@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
9495
leftRows := t.rows
9596
defer func() {
9697
if len(leftRows) > 0 {
97-
t.statistics.IncErrorRows(len(leftRows))
98+
retryRows = append(retryRows, leftRows...)
9899
}
99100
}()
100101

101102
se := newTableSession(rawSe, t.tbl, t.expire)
102-
for len(leftRows) > 0 {
103+
for len(leftRows) > 0 && ctx.Err() == nil {
103104
maxBatch := variable.TTLDeleteBatchSize.Load()
104105
var delBatch [][]types.Datum
105106
if int64(len(leftRows)) < maxBatch {
@@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
133134
sqlInterval := time.Since(sqlStart)
134135
if err != nil {
135136
metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds())
136-
needRetry = needRetry && ctx.Err() == nil
137137
logutil.BgLogger().Warn(
138138
"delete SQL in TTL failed",
139139
zap.Error(err),
@@ -214,6 +214,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
214214
return b.retryInterval
215215
}
216216

217+
// SetRetryInterval sets the retry interval of the buffer.
218+
func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) {
219+
b.retryInterval = interval
220+
}
221+
217222
// Drain drains a retry buffer.
218223
func (b *ttlDelRetryBuffer) Drain() {
219224
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
@@ -296,8 +301,37 @@ func (w *ttlDeleteWorker) loop() error {
296301
timer := time.NewTimer(w.retryBuffer.retryInterval)
297302
defer timer.Stop()
298303

299-
// drain retry buffer to make sure the statistics are correct
300-
defer w.retryBuffer.Drain()
304+
defer func() {
305+
// Have a final try to delete all rows in retry buffer while the worker stops
306+
// to avoid leaving any TTL rows undeleted when shrinking the delete worker.
307+
if w.retryBuffer.Len() > 0 {
308+
start := time.Now()
309+
log.Info(
310+
"try to delete TTL rows in del worker buffer immediately because the worker is going to stop",
311+
zap.Int("bufferLen", w.retryBuffer.Len()),
312+
)
313+
retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
314+
defer cancel()
315+
w.retryBuffer.SetRetryInterval(0)
316+
w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
317+
return task.doDelete(retryCtx, se)
318+
})
319+
log.Info(
320+
"delete TTL rows in del worker buffer finished",
321+
zap.Duration("duration", time.Since(start)),
322+
)
323+
}
324+
325+
// drain retry buffer to make sure the statistics are correct
326+
if w.retryBuffer.Len() > 0 {
327+
log.Warn(
328+
"some TTL rows are still in the buffer while the worker is going to stop, mark them as error",
329+
zap.Int("bufferLen", w.retryBuffer.Len()),
330+
)
331+
w.retryBuffer.Drain()
332+
}
333+
}()
334+
301335
for w.Status() == workerStatusRunning {
302336
tracer.EnterPhase(metrics.PhaseIdle)
303337
select {

pkg/ttl/ttlworker/scan.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type ttlScanTaskExecResult struct {
8585
time time.Time
8686
task *ttlScanTask
8787
err error
88+
// interruptedByWorkerStop indicates whether the task has to stop for the worker stops.
89+
// when it is true, we should reschedule this task in another worker or TiDB again.
90+
interruptedByWorkerStop bool
8891
}
8992

9093
func (t *ttlScanTask) result(err error) *ttlScanTaskExecResult {
@@ -99,6 +102,17 @@ func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum {
99102
return datums
100103
}
101104

105+
func (t *ttlScanTask) taskLogger(l *zap.Logger) *zap.Logger {
106+
return l.With(
107+
zap.String("jobID", t.JobID),
108+
zap.Int64("scanID", t.ScanID),
109+
zap.Int64("tableID", t.TableID),
110+
zap.String("db", t.tbl.Schema.O),
111+
zap.String("table", t.tbl.Name.O),
112+
zap.String("partition", t.tbl.Partition.O),
113+
)
114+
}
115+
102116
func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult {
103117
// TODO: merge the ctx and the taskCtx in ttl scan task, to allow both "cancel" and gracefully stop workers
104118
// now, the taskCtx is only check at the beginning of every loop
@@ -121,13 +135,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
121135
case <-doScanFinished.Done():
122136
return
123137
}
124-
logger := logutil.BgLogger().With(
125-
zap.Int64("tableID", t.TableID),
126-
zap.String("table", t.tbl.Name.O),
127-
zap.String("partition", t.tbl.Partition.O),
128-
zap.String("jobID", t.JobID),
129-
zap.Int64("scanID", t.ScanID),
130-
)
138+
logger := t.taskLogger(logutil.BgLogger())
131139
logger.Info("kill the running statement in scan task because the task or worker cancelled")
132140
rawSess.KillStmt()
133141
ticker := time.NewTicker(time.Minute)

0 commit comments

Comments
 (0)