Skip to content

Commit 9f99334

Browse files
committed
rate limiter
1 parent e54cc5b commit 9f99334

File tree

2 files changed

+71
-25
lines changed

2 files changed

+71
-25
lines changed

pkg/ttl/ttlworker/del.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,26 @@ const (
4141
delRetryInterval = time.Second * 5
4242
)
4343

44-
var globalDelRateLimiter = newDelRateLimiter()
44+
type delRateLimiter interface {
45+
WaitDelToken(ctx context.Context) error
46+
}
47+
48+
var globalDelRateLimiter delRateLimiter = newDelRateLimiter()
4549

46-
type delRateLimiter struct {
50+
type defaultDelRateLimiter struct {
4751
sync.Mutex
4852
limiter *rate.Limiter
4953
limit atomic.Int64
5054
}
5155

52-
func newDelRateLimiter() *delRateLimiter {
53-
limiter := &delRateLimiter{}
56+
func newDelRateLimiter() *defaultDelRateLimiter {
57+
limiter := &defaultDelRateLimiter{}
5458
limiter.limiter = rate.NewLimiter(0, 1)
5559
limiter.limit.Store(0)
5660
return limiter
5761
}
5862

59-
func (l *delRateLimiter) Wait(ctx context.Context) error {
63+
func (l *defaultDelRateLimiter) WaitDelToken(ctx context.Context) error {
6064
limit := l.limit.Load()
6165
if variable.TTLDeleteRateLimit.Load() != limit {
6266
limit = l.reset()
@@ -69,7 +73,7 @@ func (l *delRateLimiter) Wait(ctx context.Context) error {
6973
return l.limiter.Wait(ctx)
7074
}
7175

72-
func (l *delRateLimiter) reset() (newLimit int64) {
76+
func (l *defaultDelRateLimiter) reset() (newLimit int64) {
7377
l.Lock()
7478
defer l.Unlock()
7579
newLimit = variable.TTLDeleteRateLimit.Load()
@@ -123,9 +127,15 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
123127
}
124128

125129
tracer.EnterPhase(metrics.PhaseWaitToken)
126-
if err = globalDelRateLimiter.Wait(ctx); err != nil {
127-
t.statistics.IncErrorRows(len(delBatch))
128-
return
130+
if err = globalDelRateLimiter.WaitDelToken(ctx); err != nil {
131+
tracer.EnterPhase(metrics.PhaseOther)
132+
logutil.BgLogger().Info(
133+
"wait TTL delete rate limiter interrupted",
134+
zap.Error(err),
135+
zap.Int("waitDelRowCnt", len(delBatch)),
136+
)
137+
retryRows = append(retryRows, delBatch...)
138+
continue
129139
}
130140
tracer.EnterPhase(metrics.PhaseOther)
131141

pkg/ttl/ttlworker/del_test.go

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ func TestTTLDelRetryBuffer(t *testing.T) {
197197
require.Equal(t, uint64(0), statics7.ErrorRows.Load())
198198
}
199199

200+
type mockDelRateLimiter struct {
201+
waitFn func(context.Context) error
202+
}
203+
204+
func (m *mockDelRateLimiter) WaitDelToken(ctx context.Context) error {
205+
return m.waitFn(ctx)
206+
}
207+
200208
func TestTTLDeleteTaskDoDelete(t *testing.T) {
201209
origBatchSize := variable.TTLDeleteBatchSize.Load()
202210
delBatch := 3
@@ -258,11 +266,12 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
258266
}
259267

260268
cases := []struct {
261-
batchCnt int
262-
retryErrBatches []int
263-
noRetryErrBatches []int
264-
cancelCtx bool
265-
cancelCtxBatch int
269+
batchCnt int
270+
retryErrBatches []int
271+
noRetryErrBatches []int
272+
cancelCtx bool
273+
cancelCtxBatch int
274+
cancelCtxErrInLimiter bool
266275
}{
267276
{
268277
// all success
@@ -292,19 +301,46 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
292301
cancelCtx: true,
293302
cancelCtxBatch: 6,
294303
},
304+
{
305+
// some executed when rate limiter returns error
306+
batchCnt: 10,
307+
cancelCtx: true,
308+
cancelCtxBatch: 3,
309+
cancelCtxErrInLimiter: true,
310+
},
311+
}
312+
313+
errLimiter := &mockDelRateLimiter{
314+
waitFn: func(ctx context.Context) error {
315+
return errors.New("mock rate limiter error")
316+
},
295317
}
296318

319+
origGlobalDelRateLimiter := globalDelRateLimiter
320+
defer func() {
321+
globalDelRateLimiter = origGlobalDelRateLimiter
322+
}()
323+
297324
for _, c := range cases {
325+
globalDelRateLimiter = origGlobalDelRateLimiter
298326
require.True(t, c.cancelCtxBatch >= 0 && c.cancelCtxBatch < c.batchCnt)
299-
ctx, cancel := context.WithCancel(context.Background())
327+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
300328
if c.cancelCtx && c.cancelCtxBatch == 0 {
301-
cancel()
329+
if c.cancelCtxErrInLimiter {
330+
globalDelRateLimiter = errLimiter
331+
} else {
332+
cancel()
333+
}
302334
}
303335

304336
afterExecuteSQL = func() {
305337
if c.cancelCtx {
306338
if len(sqls) == c.cancelCtxBatch {
307-
cancel()
339+
if c.cancelCtxErrInLimiter {
340+
globalDelRateLimiter = errLimiter
341+
} else {
342+
cancel()
343+
}
308344
}
309345
}
310346
}
@@ -373,21 +409,21 @@ func TestTTLDeleteRateLimiter(t *testing.T) {
373409
}()
374410

375411
variable.TTLDeleteRateLimit.Store(100000)
376-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
377-
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.limiter.Limit())
378-
require.Equal(t, int64(100000), globalDelRateLimiter.limit.Load())
412+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
413+
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
414+
require.Equal(t, int64(100000), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())
379415

380416
variable.TTLDeleteRateLimit.Store(0)
381-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
382-
require.Equal(t, rate.Limit(0), globalDelRateLimiter.limiter.Limit())
383-
require.Equal(t, int64(0), globalDelRateLimiter.limit.Load())
417+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
418+
require.Equal(t, rate.Limit(0), globalDelRateLimiter.(*defaultDelRateLimiter).limiter.Limit())
419+
require.Equal(t, int64(0), globalDelRateLimiter.(*defaultDelRateLimiter).limit.Load())
384420

385421
// 0 stands for no limit
386-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
422+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
387423
// cancel ctx returns an error
388424
cancel()
389425
cancel = nil
390-
require.EqualError(t, globalDelRateLimiter.Wait(ctx), "context canceled")
426+
require.EqualError(t, globalDelRateLimiter.WaitDelToken(ctx), "context canceled")
391427
}
392428

393429
func TestTTLDeleteTaskWorker(t *testing.T) {

0 commit comments

Comments
 (0)