Skip to content

Commit 1bf01f4

Browse files
authored
ttl: fix the issue that TTL job may hang some time when shrink the delete worker count (#55572)
close #55561
1 parent ebbe53c commit 1bf01f4

File tree

5 files changed

+299
-74
lines changed

5 files changed

+299
-74
lines changed

pkg/ttl/ttlworker/del.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
9292
tracer.EnterPhase(metrics.PhaseOther)
9393

9494
leftRows := t.rows
95+
defer func() {
96+
if len(leftRows) > 0 {
97+
t.statistics.IncErrorRows(len(leftRows))
98+
}
99+
}()
100+
95101
se := newTableSession(rawSe, t.tbl, t.expire)
96102
for len(leftRows) > 0 {
97103
maxBatch := variable.TTLDeleteBatchSize.Load()
@@ -208,6 +214,18 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
208214
return b.retryInterval
209215
}
210216

217+
// Drain drains a retry buffer.
218+
func (b *ttlDelRetryBuffer) Drain() {
219+
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
220+
if item, ok := ele.Value.(*ttlDelRetryItem); ok {
221+
item.task.statistics.IncErrorRows(len(item.task.rows))
222+
} else {
223+
logutil.BgLogger().Error(fmt.Sprintf("invalid retry buffer item type: %T", ele))
224+
}
225+
}
226+
b.list = list.New()
227+
}
228+
211229
func (b *ttlDelRetryBuffer) recordRetryItem(task *ttlDeleteTask, retryRows [][]types.Datum, retryCnt int) bool {
212230
if len(retryRows) == 0 {
213231
return false
@@ -277,6 +295,8 @@ func (w *ttlDeleteWorker) loop() error {
277295
timer := time.NewTimer(w.retryBuffer.retryInterval)
278296
defer timer.Stop()
279297

298+
// drain retry buffer to make sure the statistics are correct
299+
defer w.retryBuffer.Drain()
280300
for w.Status() == workerStatusRunning {
281301
tracer.EnterPhase(metrics.PhaseIdle)
282302
select {

pkg/ttl/ttlworker/del_test.go

Lines changed: 169 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ package ttlworker
1717
import (
1818
"context"
1919
"errors"
20+
"fmt"
21+
"math"
22+
"slices"
23+
"strconv"
2024
"strings"
2125
"testing"
2226
"time"
@@ -163,48 +167,56 @@ func TestTTLDelRetryBuffer(t *testing.T) {
163167

164168
// test task should be immutable
165169
require.Equal(t, 10, len(task5.rows))
170+
171+
// test drain
172+
require.Equal(t, 0, buffer.Len())
173+
task6, rows6, statics6 := createTask("t6")
174+
buffer.RecordTaskResult(task6, rows6[:7])
175+
require.Equal(t, 1, buffer.Len())
176+
require.Equal(t, uint64(0), statics6.SuccessRows.Load())
177+
require.Equal(t, uint64(0), statics6.ErrorRows.Load())
178+
buffer.Drain()
179+
require.Equal(t, 0, buffer.Len())
180+
require.Equal(t, uint64(0), statics6.SuccessRows.Load())
181+
require.Equal(t, uint64(7), statics6.ErrorRows.Load())
166182
}
167183

168184
func TestTTLDeleteTaskDoDelete(t *testing.T) {
169185
origBatchSize := variable.TTLDeleteBatchSize.Load()
170-
variable.TTLDeleteBatchSize.Store(3)
186+
delBatch := 3
187+
variable.TTLDeleteBatchSize.Store(int64(delBatch))
171188
defer variable.TTLDeleteBatchSize.Store(origBatchSize)
172189

173190
t1 := newMockTTLTbl(t, "t1")
174-
t2 := newMockTTLTbl(t, "t2")
175-
t3 := newMockTTLTbl(t, "t3")
176-
t4 := newMockTTLTbl(t, "t4")
177191
s := newMockSession(t)
178-
invokes := 0
192+
var sqls []string
193+
var retryErrBatches []int
194+
var nonRetryBatches []int
195+
var afterExecuteSQL func()
179196
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
180-
invokes++
181-
s.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo)
182-
if strings.Contains(sql, "`t1`") {
183-
return nil, nil
197+
s.sessionInfoSchema = newMockInfoSchema(t1.TableInfo)
198+
sqls = append(sqls, sql)
199+
200+
if !strings.Contains(sql, "`t1`") {
201+
require.FailNow(t, "")
184202
}
185203

186-
if strings.Contains(sql, "`t2`") {
204+
defer func() {
205+
if afterExecuteSQL != nil {
206+
afterExecuteSQL()
207+
}
208+
}()
209+
210+
if slices.Contains(retryErrBatches, len(sqls)-1) {
187211
return nil, errors.New("mockErr")
188212
}
189213

190-
if strings.Contains(sql, "`t3`") {
214+
if slices.Contains(nonRetryBatches, len(sqls)-1) {
215+
// set an infoschema that contains no table to make an error that cannot retry
191216
s.sessionInfoSchema = newMockInfoSchema()
192217
return nil, nil
193218
}
194219

195-
if strings.Contains(sql, "`t4`") {
196-
switch invokes {
197-
case 1:
198-
return nil, nil
199-
case 2, 4:
200-
return nil, errors.New("mockErr")
201-
case 3:
202-
s.sessionInfoSchema = newMockInfoSchema()
203-
return nil, nil
204-
}
205-
}
206-
207-
require.FailNow(t, "")
208220
return nil, nil
209221
}
210222

@@ -218,63 +230,117 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
218230
return rows
219231
}
220232

221-
delTask := func(t *cache.PhysicalTable) *ttlDeleteTask {
233+
delTask := func(batchCnt int) *ttlDeleteTask {
222234
task := &ttlDeleteTask{
223-
tbl: t,
235+
tbl: t1,
224236
expire: time.UnixMilli(0),
225-
rows: nRows(10),
237+
rows: nRows(batchCnt * delBatch),
226238
statistics: &ttlStatistics{},
227239
}
228-
task.statistics.TotalRows.Add(10)
240+
task.statistics.TotalRows.Add(uint64(batchCnt * delBatch))
229241
return task
230242
}
231243

232244
cases := []struct {
233-
task *ttlDeleteTask
234-
retryRows []int
235-
successRows int
236-
errorRows int
245+
batchCnt int
246+
retryErrBatches []int
247+
noRetryErrBatches []int
248+
cancelCtx bool
249+
cancelCtxBatch int
237250
}{
238251
{
239-
task: delTask(t1),
240-
retryRows: nil,
241-
successRows: 10,
242-
errorRows: 0,
252+
// all success
253+
batchCnt: 10,
243254
},
244255
{
245-
task: delTask(t2),
246-
retryRows: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
247-
successRows: 0,
248-
errorRows: 0,
256+
// all retries
257+
batchCnt: 10,
258+
retryErrBatches: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
249259
},
250260
{
251-
task: delTask(t3),
252-
retryRows: nil,
253-
successRows: 0,
254-
errorRows: 10,
261+
// all errors without retry
262+
batchCnt: 10,
263+
noRetryErrBatches: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
255264
},
256265
{
257-
task: delTask(t4),
258-
retryRows: []int{3, 4, 5, 9},
259-
successRows: 3,
260-
errorRows: 3,
266+
// some retries and some not
267+
batchCnt: 10,
268+
noRetryErrBatches: []int{3, 8, 9},
269+
retryErrBatches: []int{1, 2, 4},
270+
},
271+
{
272+
// some retries and some not
273+
batchCnt: 10,
274+
noRetryErrBatches: []int{3, 8, 9},
275+
retryErrBatches: []int{1, 2, 4},
276+
cancelCtx: true,
277+
cancelCtxBatch: 6,
261278
},
262279
}
263280

264281
for _, c := range cases {
265-
invokes = 0
266-
retryRows := c.task.doDelete(context.Background(), s)
267-
require.Equal(t, 4, invokes)
268-
if c.retryRows == nil {
269-
require.Nil(t, retryRows)
282+
ctx, cancel := context.WithCancel(context.Background())
283+
if c.cancelCtx && c.cancelCtxBatch == 0 {
284+
cancel()
270285
}
271-
require.Equal(t, len(c.retryRows), len(retryRows))
272-
for i, row := range retryRows {
273-
require.Equal(t, int64(c.retryRows[i]), row[0].GetInt64())
286+
287+
afterExecuteSQL = func() {
288+
if c.cancelCtx {
289+
if len(sqls) == c.cancelCtxBatch {
290+
cancel()
291+
}
292+
}
293+
}
294+
295+
task := delTask(c.batchCnt)
296+
require.Equal(t, len(task.rows), c.batchCnt*delBatch)
297+
sqls = make([]string, 0, c.batchCnt)
298+
retryErrBatches = c.retryErrBatches
299+
nonRetryBatches = c.noRetryErrBatches
300+
retryRows := task.doDelete(ctx, s)
301+
realBatchCnt := c.batchCnt
302+
if c.cancelCtx {
303+
realBatchCnt = c.cancelCtxBatch
274304
}
275-
require.Equal(t, uint64(10), c.task.statistics.TotalRows.Load())
276-
require.Equal(t, uint64(c.successRows), c.task.statistics.SuccessRows.Load())
277-
require.Equal(t, uint64(c.errorRows), c.task.statistics.ErrorRows.Load())
305+
require.LessOrEqual(t, realBatchCnt, c.batchCnt)
306+
307+
// check SQLs
308+
require.Equal(t, realBatchCnt, len(sqls))
309+
expectedSQLs := make([]string, 0, len(sqls))
310+
for i := 0; i < realBatchCnt; i++ {
311+
batch := task.rows[i*delBatch : (i+1)*delBatch]
312+
idList := make([]string, 0, delBatch)
313+
for _, row := range batch {
314+
idList = append(idList, strconv.FormatInt(row[0].GetInt64(), 10))
315+
}
316+
sql := fmt.Sprintf("DELETE LOW_PRIORITY FROM `test`.`t1` "+
317+
"WHERE `_tidb_rowid` IN (%s) AND `time` < FROM_UNIXTIME(0) LIMIT %d",
318+
strings.Join(idList, ", "),
319+
delBatch,
320+
)
321+
expectedSQLs = append(expectedSQLs, sql)
322+
}
323+
require.Equal(t, strings.Join(expectedSQLs, "\n"), strings.Join(sqls, "\n"))
324+
325+
// check retry rows
326+
var expectedRetryRows [][]types.Datum
327+
for i := 0; i < realBatchCnt; i++ {
328+
if slices.Contains(c.retryErrBatches, i) {
329+
expectedRetryRows = append(expectedRetryRows, task.rows[i*delBatch:(i+1)*delBatch]...)
330+
}
331+
}
332+
require.Equal(t, expectedRetryRows, retryRows)
333+
334+
// check statistics
335+
var expectedErrRows uint64
336+
for i := 0; i < c.batchCnt; i++ {
337+
if i >= realBatchCnt || slices.Contains(c.noRetryErrBatches, i) {
338+
expectedErrRows += uint64(delBatch)
339+
}
340+
}
341+
expectedSuccessRows := uint64(len(task.rows)) - expectedErrRows - uint64(len(expectedRetryRows))
342+
require.Equal(t, expectedSuccessRows, task.statistics.SuccessRows.Load())
343+
require.Equal(t, expectedErrRows, task.statistics.ErrorRows.Load())
278344
}
279345
}
280346

@@ -317,37 +383,57 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
317383
t1 := newMockTTLTbl(t, "t1")
318384
t2 := newMockTTLTbl(t, "t2")
319385
t3 := newMockTTLTbl(t, "t3")
386+
t4 := newMockTTLTbl(t, "t4")
320387
s := newMockSession(t)
321388
pool := newMockSessionPool(t)
322389
pool.se = s
323390

324-
sqlMap := make(map[string]struct{})
391+
sqlMap := make(map[string]int)
392+
t3Retried := make(chan struct{})
393+
t4Retried := make(chan struct{})
325394
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
326-
pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo)
395+
pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo)
327396
if strings.Contains(sql, "`t1`") {
397+
// success
328398
return nil, nil
329399
}
330400

331401
if strings.Contains(sql, "`t2`") {
402+
// first error, retry success
332403
if _, ok := sqlMap[sql]; ok {
404+
close(t3Retried)
333405
return nil, nil
334406
}
335-
sqlMap[sql] = struct{}{}
407+
sqlMap[sql] = 1
336408
return nil, errors.New("mockErr")
337409
}
338410

339411
if strings.Contains(sql, "`t3`") {
412+
// error no retry
340413
pool.lastSession.sessionInfoSchema = newMockInfoSchema()
341414
return nil, nil
342415
}
343416

417+
if strings.Contains(sql, "`t4`") {
418+
// error and retry still error
419+
// this is to test the retry buffer should be drained after the delete worker stopped
420+
i := sqlMap[sql]
421+
if i >= 2 {
422+
// i >= 2 means t4 has retried once and records in retry buffer
423+
close(t4Retried)
424+
}
425+
sqlMap[sql] = i + 1
426+
return nil, errors.New("mockErr")
427+
}
428+
344429
require.FailNow(t, "")
345430
return nil, nil
346431
}
347432

348433
delCh := make(chan *ttlDeleteTask)
349434
w := newDeleteWorker(delCh, pool)
350435
w.retryBuffer.retryInterval = time.Millisecond
436+
w.retryBuffer.maxRetry = math.MaxInt
351437
require.Equal(t, workerStatusCreated, w.Status())
352438
w.Start()
353439
require.Equal(t, workerStatusRunning, w.Status())
@@ -357,7 +443,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
357443
}()
358444

359445
tasks := make([]*ttlDeleteTask, 0)
360-
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3} {
446+
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4} {
361447
task := &ttlDeleteTask{
362448
tbl: tbl,
363449
expire: time.UnixMilli(0),
@@ -377,7 +463,23 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
377463
}
378464
}
379465

380-
time.Sleep(time.Millisecond * 100)
466+
select {
467+
case <-t3Retried:
468+
case <-time.After(time.Second):
469+
require.FailNow(t, "")
470+
}
471+
472+
select {
473+
case <-t4Retried:
474+
case <-time.After(time.Second):
475+
require.FailNow(t, "")
476+
}
477+
478+
// before stop, t4 should always retry without any error rows
479+
require.Equal(t, uint64(0), tasks[3].statistics.ErrorRows.Load())
480+
w.Stop()
481+
require.NoError(t, w.WaitStopped(context.Background(), 10*time.Second))
482+
381483
require.Equal(t, uint64(3), tasks[0].statistics.SuccessRows.Load())
382484
require.Equal(t, uint64(0), tasks[0].statistics.ErrorRows.Load())
383485

@@ -386,4 +488,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
386488

387489
require.Equal(t, uint64(0), tasks[2].statistics.SuccessRows.Load())
388490
require.Equal(t, uint64(3), tasks[2].statistics.ErrorRows.Load())
491+
492+
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
493+
require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load())
389494
}

0 commit comments

Comments
 (0)