Skip to content

Commit 53459e3

Browse files
lcwangchaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of #58206
Signed-off-by: ti-chi-bot <[email protected]>
1 parent ea9c4ab commit 53459e3

File tree

2 files changed

+74
-23
lines changed

2 files changed

+74
-23
lines changed

pkg/ttl/ttlworker/del.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,26 @@ const (
4040
delRetryInterval = time.Second * 5
4141
)
4242

43+
type delRateLimiter interface {
44+
WaitDelToken(ctx context.Context) error
45+
}
46+
4347
var globalDelRateLimiter = newDelRateLimiter()
4448

45-
type delRateLimiter struct {
49+
type defaultDelRateLimiter struct {
4650
sync.Mutex
4751
limiter *rate.Limiter
4852
limit atomic.Int64
4953
}
5054

51-
func newDelRateLimiter() *delRateLimiter {
52-
limiter := &delRateLimiter{}
55+
func newDelRateLimiter() delRateLimiter {
56+
limiter := &defaultDelRateLimiter{}
5357
limiter.limiter = rate.NewLimiter(0, 1)
5458
limiter.limit.Store(0)
5559
return limiter
5660
}
5761

58-
func (l *delRateLimiter) Wait(ctx context.Context) error {
62+
func (l *defaultDelRateLimiter) WaitDelToken(ctx context.Context) error {
5963
limit := l.limit.Load()
6064
if variable.TTLDeleteRateLimit.Load() != limit {
6165
limit = l.reset()
@@ -68,7 +72,7 @@ func (l *delRateLimiter) Wait(ctx context.Context) error {
6872
return l.limiter.Wait(ctx)
6973
}
7074

71-
func (l *delRateLimiter) reset() (newLimit int64) {
75+
func (l *defaultDelRateLimiter) reset() (newLimit int64) {
7276
l.Lock()
7377
defer l.Unlock()
7478
newLimit = variable.TTLDeleteRateLimit.Load()
@@ -122,9 +126,15 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
122126
}
123127

124128
tracer.EnterPhase(metrics.PhaseWaitToken)
125-
if err = globalDelRateLimiter.Wait(ctx); err != nil {
126-
t.statistics.IncErrorRows(len(delBatch))
127-
return
129+
if err = globalDelRateLimiter.WaitDelToken(ctx); err != nil {
130+
tracer.EnterPhase(metrics.PhaseOther)
131+
logutil.BgLogger().Info(
132+
"wait TTL delete rate limiter interrupted",
133+
zap.Error(err),
134+
zap.Int("waitDelRowCnt", len(delBatch)),
135+
)
136+
retryRows = append(retryRows, delBatch...)
137+
continue
128138
}
129139
tracer.EnterPhase(metrics.PhaseOther)
130140

pkg/ttl/ttlworker/del_test.go

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ func TestTTLDelRetryBuffer(t *testing.T) {
181181
require.Equal(t, uint64(7), statics6.ErrorRows.Load())
182182
}
183183

184+
type mockDelRateLimiter struct {
185+
waitFn func(context.Context) error
186+
}
187+
188+
func (m *mockDelRateLimiter) WaitDelToken(ctx context.Context) error {
189+
return m.waitFn(ctx)
190+
}
191+
184192
func TestTTLDeleteTaskDoDelete(t *testing.T) {
185193
origBatchSize := variable.TTLDeleteBatchSize.Load()
186194
delBatch := 3
@@ -242,11 +250,12 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
242250
}
243251

244252
cases := []struct {
245-
batchCnt int
246-
retryErrBatches []int
247-
noRetryErrBatches []int
248-
cancelCtx bool
249-
cancelCtxBatch int
253+
batchCnt int
254+
retryErrBatches []int
255+
noRetryErrBatches []int
256+
cancelCtx bool
257+
cancelCtxBatch int
258+
cancelCtxErrInLimiter bool
250259
}{
251260
{
252261
// all success
@@ -276,18 +285,50 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
276285
cancelCtx: true,
277286
cancelCtxBatch: 6,
278287
},
288+
{
289+
// some executed when rate limiter returns error
290+
batchCnt: 10,
291+
cancelCtx: true,
292+
cancelCtxBatch: 3,
293+
cancelCtxErrInLimiter: true,
294+
},
295+
}
296+
297+
errLimiter := &mockDelRateLimiter{
298+
waitFn: func(ctx context.Context) error {
299+
return errors.New("mock rate limiter error")
300+
},
279301
}
280302

303+
origGlobalDelRateLimiter := globalDelRateLimiter
304+
defer func() {
305+
globalDelRateLimiter = origGlobalDelRateLimiter
306+
}()
307+
281308
for _, c := range cases {
309+
<<<<<<< HEAD
282310
ctx, cancel := context.WithCancel(context.Background())
311+
=======
312+
globalDelRateLimiter = origGlobalDelRateLimiter
313+
require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt)
314+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
315+
>>>>>>> cea46f17ce0 (ttl: retry the rows when del rate limiter returns error in delWorker (#58206))
283316
if c.cancelCtx && c.cancelCtxBatch == 0 {
284-
cancel()
317+
if c.cancelCtxErrInLimiter {
318+
globalDelRateLimiter = errLimiter
319+
} else {
320+
cancel()
321+
}
285322
}
286323

287324
afterExecuteSQL = func() {
288325
if c.cancelCtx {
289326
if len(sqls) == c.cancelCtxBatch {
290-
cancel()
327+
if c.cancelCtxErrInLimiter {
328+
globalDelRateLimiter = errLimiter
329+
} else {
330+
cancel()
331+
}
291332
}
292333
}
293334
}
@@ -358,21 +399,21 @@ func TestTTLDeleteRateLimiter(t *testing.T) {
358399
}()
359400

360401
variable.TTLDeleteRateLimit.Store(100000)
361-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
362-
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.limiter.Limit())
363-
require.Equal(t, int64(100000), globalDelRateLimiter.limit.Load())
402+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
403+
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
404+
require.Equal(t, int64(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())
364405

365406
variable.TTLDeleteRateLimit.Store(0)
366-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
367-
require.Equal(t, rate.Limit(0), globalDelRateLimiter.limiter.Limit())
368-
require.Equal(t, int64(0), globalDelRateLimiter.limit.Load())
407+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
408+
require.Equal(t, rate.Limit(0), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
409+
require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())
369410

370411
// 0 stands for no limit
371-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
412+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
372413
// cancel ctx returns an error
373414
cancel()
374415
cancel = nil
375-
require.EqualError(t, globalDelRateLimiter.Wait(ctx), "context canceled")
416+
require.EqualError(t, globalDelRateLimiter.WaitDelToken(ctx), "context canceled")
376417
}
377418

378419
func TestTTLDeleteTaskWorker(t *testing.T) {

0 commit comments

Comments
 (0)