Skip to content

Commit 4d7425f

Browse files
lcwangchaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#57703
Signed-off-by: ti-chi-bot <[email protected]>
1 parent ea9c4ab commit 4d7425f

13 files changed

+606
-132
lines changed

pkg/ttl/cache/table.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ import (
4040
"github.com/pingcap/tidb/pkg/util/codec"
4141
"github.com/pingcap/tidb/pkg/util/collate"
4242
"github.com/pingcap/tidb/pkg/util/intest"
43+
"github.com/pingcap/tidb/pkg/util/logutil"
4344
"github.com/tikv/client-go/v2/tikv"
45+
"go.uber.org/zap"
4446
)
4547

4648
func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, error) {
@@ -448,9 +450,10 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag
448450
return nil, err
449451
}
450452

451-
regionsPerRange := len(regionIDs) / splitCnt
452-
oversizeCnt := len(regionIDs) % splitCnt
453-
ranges := make([]kv.KeyRange, 0, min(len(regionIDs), splitCnt))
453+
regionsCnt := len(regionIDs)
454+
regionsPerRange := regionsCnt / splitCnt
455+
oversizeCnt := regionsCnt % splitCnt
456+
ranges := make([]kv.KeyRange, 0, min(regionsCnt, splitCnt))
454457
for len(regionIDs) > 0 {
455458
startRegion, err := regionCache.LocateRegionByID(tikv.NewBackofferWithVars(ctx, 20000, nil),
456459
regionIDs[0])
@@ -483,6 +486,15 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag
483486
oversizeCnt--
484487
regionIDs = regionIDs[endRegionIdx+1:]
485488
}
489+
logutil.BgLogger().Info("TTL table raw key ranges split",
490+
zap.Int("regionsCnt", regionsCnt),
491+
zap.Int("shouldSplitCnt", splitCnt),
492+
zap.Int("actualSplitCnt", len(ranges)),
493+
zap.Int64("tableID", t.ID),
494+
zap.String("db", t.Schema.O),
495+
zap.String("table", t.Name.O),
496+
zap.String("partition", t.Partition.O),
497+
)
486498
return ranges, nil
487499
}
488500

pkg/ttl/cache/task.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ const (
8787
// TaskStatusWaiting means the task hasn't started
8888
TaskStatusWaiting TaskStatus = "waiting"
8989
// TaskStatusRunning means this task is running
90-
TaskStatusRunning = "running"
90+
TaskStatusRunning TaskStatus = "running"
9191
// TaskStatusFinished means this task has finished
92-
TaskStatusFinished = "finished"
92+
TaskStatusFinished TaskStatus = "finished"
9393
)
9494

9595
// TTLTask is a row recorded in mysql.tidb_ttl_task
@@ -116,6 +116,9 @@ type TTLTaskState struct {
116116
ErrorRows uint64 `json:"error_rows"`
117117

118118
ScanTaskErr string `json:"scan_task_err"`
119+
120+
// When PreviousOwner != "", it means this task is resigned from another owner
121+
PreviousOwner string `json:"prev_owner,omitempty"`
119122
}
120123

121124
// RowToTTLTask converts a row into TTL task

pkg/ttl/ttlworker/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ go_library(
4444
"//pkg/util/timeutil",
4545
"@com_github_pingcap_errors//:errors",
4646
"@com_github_pingcap_failpoint//:failpoint",
47+
"@com_github_pingcap_log//:log",
4748
"@com_github_tikv_client_go_v2//tikv",
4849
"@com_github_tikv_client_go_v2//tikvrpc",
4950
"@io_etcd_go_etcd_client_v3//:client",
@@ -68,7 +69,6 @@ go_test(
6869
"task_manager_test.go",
6970
"timer_sync_test.go",
7071
"timer_test.go",
71-
"worker_test.go",
7272
],
7373
embed = [":ttlworker"],
7474
flaky = True,

pkg/ttl/ttlworker/del.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25+
"github.com/pingcap/log"
2526
"github.com/pingcap/tidb/pkg/sessionctx/variable"
2627
"github.com/pingcap/tidb/pkg/ttl/cache"
2728
"github.com/pingcap/tidb/pkg/ttl/metrics"
@@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
9495
leftRows := t.rows
9596
defer func() {
9697
if len(leftRows) > 0 {
97-
t.statistics.IncErrorRows(len(leftRows))
98+
retryRows = append(retryRows, leftRows...)
9899
}
99100
}()
100101

101102
se := newTableSession(rawSe, t.tbl, t.expire)
102-
for len(leftRows) > 0 {
103+
for len(leftRows) > 0 && ctx.Err() == nil {
103104
maxBatch := variable.TTLDeleteBatchSize.Load()
104105
var delBatch [][]types.Datum
105106
if int64(len(leftRows)) < maxBatch {
@@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
133134
sqlInterval := time.Since(sqlStart)
134135
if err != nil {
135136
metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds())
136-
needRetry = needRetry && ctx.Err() == nil
137137
logutil.BgLogger().Warn(
138138
"delete SQL in TTL failed",
139139
zap.Error(err),
@@ -191,8 +191,14 @@ func (b *ttlDelRetryBuffer) RecordTaskResult(task *ttlDeleteTask, retryRows [][]
191191
}
192192

193193
func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) time.Duration {
194-
for b.list.Len() > 0 {
194+
l := b.list.Len()
195+
// When `retryInterval==0`, to avoid the infinite retries, limit the max loop to the buffer length.
196+
// It means one item only has one chance to retry in one `DoRetry` invoking.
197+
for i := 0; i < l; i++ {
195198
ele := b.list.Front()
199+
if ele == nil {
200+
break
201+
}
196202
item, ok := ele.Value.(*ttlDelRetryItem)
197203
if !ok {
198204
logutil.BgLogger().Error(fmt.Sprintf("invalid retry buffer item type: %T", ele))
@@ -214,6 +220,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
214220
return b.retryInterval
215221
}
216222

223+
// SetRetryInterval sets the retry interval of the buffer.
224+
func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) {
225+
b.retryInterval = interval
226+
}
227+
217228
// Drain drains a retry buffer.
218229
func (b *ttlDelRetryBuffer) Drain() {
219230
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
@@ -296,8 +307,37 @@ func (w *ttlDeleteWorker) loop() error {
296307
timer := time.NewTimer(w.retryBuffer.retryInterval)
297308
defer timer.Stop()
298309

299-
// drain retry buffer to make sure the statistics are correct
300-
defer w.retryBuffer.Drain()
310+
defer func() {
311+
// Have a final try to delete all rows in retry buffer while the worker stops
312+
// to avoid leaving any TTL rows undeleted when shrinking the delete worker.
313+
if w.retryBuffer.Len() > 0 {
314+
start := time.Now()
315+
log.Info(
316+
"try to delete TTL rows in del worker buffer immediately because the worker is going to stop",
317+
zap.Int("bufferLen", w.retryBuffer.Len()),
318+
)
319+
retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
320+
defer cancel()
321+
w.retryBuffer.SetRetryInterval(0)
322+
w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
323+
return task.doDelete(retryCtx, se)
324+
})
325+
log.Info(
326+
"delete TTL rows in del worker buffer finished",
327+
zap.Duration("duration", time.Since(start)),
328+
)
329+
}
330+
331+
// drain retry buffer to make sure the statistics are correct
332+
if w.retryBuffer.Len() > 0 {
333+
log.Warn(
334+
"some TTL rows are still in the buffer while the worker is going to stop, mark them as error",
335+
zap.Int("bufferLen", w.retryBuffer.Len()),
336+
)
337+
w.retryBuffer.Drain()
338+
}
339+
}()
340+
301341
for w.Status() == workerStatusRunning {
302342
tracer.EnterPhase(metrics.PhaseIdle)
303343
select {

pkg/ttl/ttlworker/del_test.go

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,22 @@ func TestTTLDelRetryBuffer(t *testing.T) {
179179
require.Equal(t, 0, buffer.Len())
180180
require.Equal(t, uint64(0), statics6.SuccessRows.Load())
181181
require.Equal(t, uint64(7), statics6.ErrorRows.Load())
182+
183+
// test should only retry at most once for one item in a DoRetry call.
184+
buffer2 := newTTLDelRetryBuffer()
185+
buffer2.SetRetryInterval(0)
186+
buffer2.maxRetry = math.MaxInt
187+
task7, rows7, statics7 := createTask("t7")
188+
buffer2.RecordTaskResult(task7, rows7[:8])
189+
require.Equal(t, 1, buffer2.Len())
190+
currentRetryFn := doRetryFail
191+
buffer2.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
192+
fn := currentRetryFn
193+
currentRetryFn = shouldNotDoRetry
194+
return fn(task)
195+
})
196+
require.Equal(t, uint64(1), statics7.SuccessRows.Load())
197+
require.Equal(t, uint64(0), statics7.ErrorRows.Load())
182198
}
183199

184200
func TestTTLDeleteTaskDoDelete(t *testing.T) {
@@ -269,7 +285,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
269285
retryErrBatches: []int{1, 2, 4},
270286
},
271287
{
272-
// some retries and some not
288+
// some retries and some not and some are executed when ctx canceled
273289
batchCnt: 10,
274290
noRetryErrBatches: []int{3, 8, 9},
275291
retryErrBatches: []int{1, 2, 4},
@@ -279,6 +295,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
279295
}
280296

281297
for _, c := range cases {
298+
require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt)
282299
ctx, cancel := context.WithCancel(context.Background())
283300
if c.cancelCtx && c.cancelCtxBatch == 0 {
284301
cancel()
@@ -298,16 +315,14 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
298315
retryErrBatches = c.retryErrBatches
299316
nonRetryBatches = c.noRetryErrBatches
300317
retryRows := task.doDelete(ctx, s)
301-
realBatchCnt := c.batchCnt
302-
if c.cancelCtx {
303-
realBatchCnt = c.cancelCtxBatch
304-
}
305-
require.LessOrEqual(t, realBatchCnt, c.batchCnt)
306318

307319
// check SQLs
308-
require.Equal(t, realBatchCnt, len(sqls))
309320
expectedSQLs := make([]string, 0, len(sqls))
310-
for i := 0; i < realBatchCnt; i++ {
321+
for i := 0; i < c.batchCnt; i++ {
322+
if c.cancelCtx && i >= c.cancelCtxBatch {
323+
break
324+
}
325+
311326
batch := task.rows[i*delBatch : (i+1)*delBatch]
312327
idList := make([]string, 0, delBatch)
313328
for _, row := range batch {
@@ -324,8 +339,8 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
324339

325340
// check retry rows
326341
var expectedRetryRows [][]types.Datum
327-
for i := 0; i < realBatchCnt; i++ {
328-
if slices.Contains(c.retryErrBatches, i) {
342+
for i := 0; i < c.batchCnt; i++ {
343+
if slices.Contains(c.retryErrBatches, i) || (c.cancelCtx && i >= c.cancelCtxBatch) {
329344
expectedRetryRows = append(expectedRetryRows, task.rows[i*delBatch:(i+1)*delBatch]...)
330345
}
331346
}
@@ -334,7 +349,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
334349
// check statistics
335350
var expectedErrRows uint64
336351
for i := 0; i < c.batchCnt; i++ {
337-
if i >= realBatchCnt || slices.Contains(c.noRetryErrBatches, i) {
352+
if slices.Contains(c.noRetryErrBatches, i) && !(c.cancelCtx && i >= c.cancelCtxBatch) {
338353
expectedErrRows += uint64(delBatch)
339354
}
340355
}
@@ -384,6 +399,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
384399
t2 := newMockTTLTbl(t, "t2")
385400
t3 := newMockTTLTbl(t, "t3")
386401
t4 := newMockTTLTbl(t, "t4")
402+
t5 := newMockTTLTbl(t, "t5")
387403
s := newMockSession(t)
388404
pool := newMockSessionPool(t)
389405
pool.se = s
@@ -392,8 +408,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
392408
sqlMap := make(map[string]int)
393409
t3Retried := make(chan struct{})
394410
t4Retried := make(chan struct{})
411+
t5Executed := make(chan struct{})
395412
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
396-
pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo)
413+
pool.lastSession.sessionInfoSchema = newMockInfoSchema(
414+
t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo, t5.TableInfo,
415+
)
397416
if strings.Contains(sql, "`t1`") {
398417
// success
399418
return nil, nil
@@ -419,20 +438,35 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
419438
// error and retry still error
420439
// this is to test the retry buffer should be drained after the delete worker stopped
421440
i := sqlMap[sql]
422-
if i >= 2 {
423-
// i >= 2 means t4 has retried once and records in retry buffer
441+
if i == 2 {
442+
// i == 2 means t4 has retried once and records in retry buffer
424443
close(t4Retried)
425444
}
426445
sqlMap[sql] = i + 1
427446
return nil, errors.New("mockErr")
428447
}
429448

449+
if strings.Contains(sql, "`t5`") {
450+
// error when the worker is running,
451+
// success when flushing retry buffer while the worker stopping.
452+
i := sqlMap[sql]
453+
sqlMap[sql] = i + 1
454+
if ctx.Value("delWorker") != nil {
455+
if i == 1 {
456+
close(t5Executed)
457+
}
458+
return nil, errors.New("mockErr")
459+
}
460+
return nil, nil
461+
}
462+
430463
require.FailNow(t, "")
431464
return nil, nil
432465
}
433466

434467
delCh := make(chan *ttlDeleteTask)
435468
w := newDeleteWorker(delCh, pool)
469+
w.ctx = context.WithValue(w.ctx, "delWorker", struct{}{})
436470
w.retryBuffer.retryInterval = time.Millisecond
437471
w.retryBuffer.maxRetry = math.MaxInt
438472
require.Equal(t, workerStatusCreated, w.Status())
@@ -444,7 +478,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
444478
}()
445479

446480
tasks := make([]*ttlDeleteTask, 0)
447-
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4} {
481+
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4, t5} {
448482
task := &ttlDeleteTask{
449483
tbl: tbl,
450484
expire: time.UnixMilli(0),
@@ -476,8 +510,17 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
476510
require.FailNow(t, "")
477511
}
478512

479-
// before stop, t4 should always retry without any error rows
513+
select {
514+
case <-t5Executed:
515+
case <-time.After(time.Second):
516+
require.FailNow(t, "")
517+
}
518+
519+
// before stop, t4, t5 should always retry without any error rows
520+
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
480521
require.Equal(t, uint64(0), tasks[3].statistics.ErrorRows.Load())
522+
require.Equal(t, uint64(0), tasks[4].statistics.SuccessRows.Load())
523+
require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load())
481524
w.Stop()
482525
require.NoError(t, w.WaitStopped(context.Background(), 10*time.Second))
483526

@@ -490,6 +533,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
490533
require.Equal(t, uint64(0), tasks[2].statistics.SuccessRows.Load())
491534
require.Equal(t, uint64(3), tasks[2].statistics.ErrorRows.Load())
492535

536+
// t4 should be error because the buffer flush error while the worker stopping.
493537
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
494538
require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load())
539+
540+
// t5 should be success because the buffer flush success while the worker stopping.
541+
require.Equal(t, uint64(3), tasks[4].statistics.SuccessRows.Load())
542+
require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load())
495543
}

0 commit comments

Comments
 (0)