Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, error) {
Expand Down Expand Up @@ -448,9 +450,10 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag
return nil, err
}

regionsPerRange := len(regionIDs) / splitCnt
oversizeCnt := len(regionIDs) % splitCnt
ranges := make([]kv.KeyRange, 0, min(len(regionIDs), splitCnt))
regionsCnt := len(regionIDs)
regionsPerRange := regionsCnt / splitCnt
oversizeCnt := regionsCnt % splitCnt
ranges := make([]kv.KeyRange, 0, min(regionsCnt, splitCnt))
for len(regionIDs) > 0 {
startRegion, err := regionCache.LocateRegionByID(tikv.NewBackofferWithVars(ctx, 20000, nil),
regionIDs[0])
Expand Down Expand Up @@ -483,6 +486,15 @@ func (t *PhysicalTable) splitRawKeyRanges(ctx context.Context, store tikv.Storag
oversizeCnt--
regionIDs = regionIDs[endRegionIdx+1:]
}
logutil.BgLogger().Info("TTL table raw key ranges split",
zap.Int("regionsCnt", regionsCnt),
zap.Int("shouldSplitCnt", splitCnt),
zap.Int("actualSplitCnt", len(ranges)),
zap.Int64("tableID", t.ID),
zap.String("db", t.Schema.O),
zap.String("table", t.Name.O),
zap.String("partition", t.Partition.O),
)
return ranges, nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ const (
// TaskStatusWaiting means the task hasn't started
TaskStatusWaiting TaskStatus = "waiting"
// TaskStatusRunning means this task is running
TaskStatusRunning = "running"
TaskStatusRunning TaskStatus = "running"
// TaskStatusFinished means this task has finished
TaskStatusFinished = "finished"
TaskStatusFinished TaskStatus = "finished"
)

// TTLTask is a row recorded in mysql.tidb_ttl_task
Expand All @@ -116,6 +116,9 @@ type TTLTaskState struct {
ErrorRows uint64 `json:"error_rows"`

ScanTaskErr string `json:"scan_task_err"`

// When PreviousOwner != "", it means this task is resigned from another owner
PreviousOwner string `json:"prev_owner,omitempty"`
}

// RowToTTLTask converts a row into TTL task
Expand Down
2 changes: 1 addition & 1 deletion pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/util/timeutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@io_etcd_go_etcd_client_v3//:client",
Expand All @@ -67,7 +68,6 @@ go_test(
"task_manager_test.go",
"timer_sync_test.go",
"timer_test.go",
"worker_test.go",
],
embed = [":ttlworker"],
flaky = True,
Expand Down
52 changes: 46 additions & 6 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/metrics"
Expand Down Expand Up @@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
leftRows := t.rows
defer func() {
if len(leftRows) > 0 {
t.statistics.IncErrorRows(len(leftRows))
retryRows = append(retryRows, leftRows...)
}
}()

se := newTableSession(rawSe, t.tbl, t.expire)
for len(leftRows) > 0 {
for len(leftRows) > 0 && ctx.Err() == nil {
maxBatch := variable.TTLDeleteBatchSize.Load()
var delBatch [][]types.Datum
if int64(len(leftRows)) < maxBatch {
Expand Down Expand Up @@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
sqlInterval := time.Since(sqlStart)
if err != nil {
metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds())
needRetry = needRetry && ctx.Err() == nil
logutil.BgLogger().Warn(
"delete SQL in TTL failed",
zap.Error(err),
Expand Down Expand Up @@ -191,8 +191,14 @@ func (b *ttlDelRetryBuffer) RecordTaskResult(task *ttlDeleteTask, retryRows [][]
}

func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) time.Duration {
for b.list.Len() > 0 {
l := b.list.Len()
// When `retryInterval==0`, to avoid the infinite retries, limit the max loop to the buffer length.
// It means one item only has one chance to retry in one `DoRetry` invoking.
for i := 0; i < l; i++ {
ele := b.list.Front()
if ele == nil {
break
}
item, ok := ele.Value.(*ttlDelRetryItem)
if !ok {
logutil.BgLogger().Error(fmt.Sprintf("invalid retry buffer item type: %T", ele))
Expand All @@ -214,6 +220,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
return b.retryInterval
}

// SetRetryInterval sets the retry interval of the buffer.
func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) {
b.retryInterval = interval
}

// Drain drains a retry buffer.
func (b *ttlDelRetryBuffer) Drain() {
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
Expand Down Expand Up @@ -296,8 +307,37 @@ func (w *ttlDeleteWorker) loop() error {
timer := time.NewTimer(w.retryBuffer.retryInterval)
defer timer.Stop()

// drain retry buffer to make sure the statistics are correct
defer w.retryBuffer.Drain()
defer func() {
// Have a final try to delete all rows in retry buffer while the worker stops
// to avoid leaving any TTL rows undeleted when shrinking the delete worker.
if w.retryBuffer.Len() > 0 {
start := time.Now()
log.Info(
"try to delete TTL rows in del worker buffer immediately because the worker is going to stop",
zap.Int("bufferLen", w.retryBuffer.Len()),
)
retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
w.retryBuffer.SetRetryInterval(0)
w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
return task.doDelete(retryCtx, se)
})
log.Info(
"delete TTL rows in del worker buffer finished",
zap.Duration("duration", time.Since(start)),
)
}

// drain retry buffer to make sure the statistics are correct
if w.retryBuffer.Len() > 0 {
log.Warn(
"some TTL rows are still in the buffer while the worker is going to stop, mark them as error",
zap.Int("bufferLen", w.retryBuffer.Len()),
)
w.retryBuffer.Drain()
}
}()

for w.Status() == workerStatusRunning {
tracer.EnterPhase(metrics.PhaseIdle)
select {
Expand Down
80 changes: 64 additions & 16 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ func TestTTLDelRetryBuffer(t *testing.T) {
require.Equal(t, 0, buffer.Len())
require.Equal(t, uint64(0), statics6.SuccessRows.Load())
require.Equal(t, uint64(7), statics6.ErrorRows.Load())

// test should only retry at most once for one item in a DoRetry call.
buffer2 := newTTLDelRetryBuffer()
buffer2.SetRetryInterval(0)
buffer2.maxRetry = math.MaxInt
task7, rows7, statics7 := createTask("t7")
buffer2.RecordTaskResult(task7, rows7[:8])
require.Equal(t, 1, buffer2.Len())
currentRetryFn := doRetryFail
buffer2.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
fn := currentRetryFn
currentRetryFn = shouldNotDoRetry
return fn(task)
})
require.Equal(t, uint64(1), statics7.SuccessRows.Load())
require.Equal(t, uint64(0), statics7.ErrorRows.Load())
}

func TestTTLDeleteTaskDoDelete(t *testing.T) {
Expand Down Expand Up @@ -269,7 +285,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
retryErrBatches: []int{1, 2, 4},
},
{
// some retries and some not
// some retries and some not and some are executed when ctx canceled
batchCnt: 10,
noRetryErrBatches: []int{3, 8, 9},
retryErrBatches: []int{1, 2, 4},
Expand All @@ -279,6 +295,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
}

for _, c := range cases {
require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt)
ctx, cancel := context.WithCancel(context.Background())
if c.cancelCtx && c.cancelCtxBatch == 0 {
cancel()
Expand All @@ -298,16 +315,14 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
retryErrBatches = c.retryErrBatches
nonRetryBatches = c.noRetryErrBatches
retryRows := task.doDelete(ctx, s)
realBatchCnt := c.batchCnt
if c.cancelCtx {
realBatchCnt = c.cancelCtxBatch
}
require.LessOrEqual(t, realBatchCnt, c.batchCnt)

// check SQLs
require.Equal(t, realBatchCnt, len(sqls))
expectedSQLs := make([]string, 0, len(sqls))
for i := 0; i < realBatchCnt; i++ {
for i := 0; i < c.batchCnt; i++ {
if c.cancelCtx && i >= c.cancelCtxBatch {
break
}

batch := task.rows[i*delBatch : (i+1)*delBatch]
idList := make([]string, 0, delBatch)
for _, row := range batch {
Expand All @@ -324,8 +339,8 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {

// check retry rows
var expectedRetryRows [][]types.Datum
for i := 0; i < realBatchCnt; i++ {
if slices.Contains(c.retryErrBatches, i) {
for i := 0; i < c.batchCnt; i++ {
if slices.Contains(c.retryErrBatches, i) || (c.cancelCtx && i >= c.cancelCtxBatch) {
expectedRetryRows = append(expectedRetryRows, task.rows[i*delBatch:(i+1)*delBatch]...)
}
}
Expand All @@ -334,7 +349,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
// check statistics
var expectedErrRows uint64
for i := 0; i < c.batchCnt; i++ {
if i >= realBatchCnt || slices.Contains(c.noRetryErrBatches, i) {
if slices.Contains(c.noRetryErrBatches, i) && !(c.cancelCtx && i >= c.cancelCtxBatch) {
expectedErrRows += uint64(delBatch)
}
}
Expand Down Expand Up @@ -384,6 +399,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
t2 := newMockTTLTbl(t, "t2")
t3 := newMockTTLTbl(t, "t3")
t4 := newMockTTLTbl(t, "t4")
t5 := newMockTTLTbl(t, "t5")
s := newMockSession(t)
pool := newMockSessionPool(t)
pool.se = s
Expand All @@ -392,8 +408,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
sqlMap := make(map[string]int)
t3Retried := make(chan struct{})
t4Retried := make(chan struct{})
t5Executed := make(chan struct{})
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo)
pool.lastSession.sessionInfoSchema = newMockInfoSchema(
t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo, t5.TableInfo,
)
if strings.Contains(sql, "`t1`") {
// success
return nil, nil
Expand All @@ -419,20 +438,35 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
// error and retry still error
// this is to test the retry buffer should be drained after the delete worker stopped
i := sqlMap[sql]
if i >= 2 {
// i >= 2 means t4 has retried once and records in retry buffer
if i == 2 {
// i == 2 means t4 has retried once and records in retry buffer
close(t4Retried)
}
sqlMap[sql] = i + 1
return nil, errors.New("mockErr")
}

if strings.Contains(sql, "`t5`") {
// error when the worker is running,
// success when flushing retry buffer while the worker stopping.
i := sqlMap[sql]
sqlMap[sql] = i + 1
if ctx.Value("delWorker") != nil {
if i == 1 {
close(t5Executed)
}
return nil, errors.New("mockErr")
}
return nil, nil
}

require.FailNow(t, "")
return nil, nil
}

delCh := make(chan *ttlDeleteTask)
w := newDeleteWorker(delCh, pool)
w.ctx = context.WithValue(w.ctx, "delWorker", struct{}{})
w.retryBuffer.retryInterval = time.Millisecond
w.retryBuffer.maxRetry = math.MaxInt
require.Equal(t, workerStatusCreated, w.Status())
Expand All @@ -444,7 +478,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
}()

tasks := make([]*ttlDeleteTask, 0)
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4} {
for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4, t5} {
task := &ttlDeleteTask{
tbl: tbl,
expire: time.UnixMilli(0),
Expand Down Expand Up @@ -476,8 +510,17 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
require.FailNow(t, "")
}

// before stop, t4 should always retry without any error rows
select {
case <-t5Executed:
case <-time.After(time.Second):
require.FailNow(t, "")
}

// before stop, t4, t5 should always retry without any error rows
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[3].statistics.ErrorRows.Load())
require.Equal(t, uint64(0), tasks[4].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load())
w.Stop()
require.NoError(t, w.WaitStopped(context.Background(), 10*time.Second))

Expand All @@ -490,6 +533,11 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
require.Equal(t, uint64(0), tasks[2].statistics.SuccessRows.Load())
require.Equal(t, uint64(3), tasks[2].statistics.ErrorRows.Load())

// t4 should be error because the buffer flush error while the worker stopping.
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load())

// t5 should be success because the buffer flush success while the worker stopping.
require.Equal(t, uint64(3), tasks[4].statistics.SuccessRows.Load())
require.Equal(t, uint64(0), tasks[4].statistics.ErrorRows.Load())
}
Loading