Skip to content

Commit 4d157f6

Browse files
authored
ttl: fix the infinite waiting for delRateLimiter when tidb_ttl_delete_rate_limit changes (pingcap#58485) (pingcap#59388)
close pingcap#58484
1 parent ea38868 commit 4d157f6

File tree

2 files changed

+108
-13
lines changed

2 files changed

+108
-13
lines changed

pkg/ttl/ttlworker/del.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/pingcap/tidb/pkg/ttl/session"
2929
"github.com/pingcap/tidb/pkg/ttl/sqlbuilder"
3030
"github.com/pingcap/tidb/pkg/types"
31+
"github.com/pingcap/tidb/pkg/util/intest"
3132
"github.com/pingcap/tidb/pkg/util/logutil"
3233
"go.uber.org/zap"
3334
"golang.org/x/time/rate"
@@ -43,27 +44,44 @@ var globalDelRateLimiter = newDelRateLimiter()
4344

4445
type delRateLimiter struct {
4546
sync.Mutex
47+
// limiter limits the rate of delete operation.
48+
// limit.Limit() has a range [1.0, +rate.Inf].
49+
// When the value of system variable `tidb_ttl_delete_rate_limit` is `0`, `limit.Limit()` returns `rate.Inf`.
4650
limiter *rate.Limiter
47-
limit atomic.Int64
51+
// limit is the rate limit of the limiter that is the same value of system variable `tidb_ttl_delete_rate_limit`.
52+
// When it is 0, it means unlimited and `limiter.Limit()` will return `rate.Inf`.
53+
limit atomic.Int64
4854
}
4955

5056
func newDelRateLimiter() *delRateLimiter {
5157
limiter := &delRateLimiter{}
52-
limiter.limiter = rate.NewLimiter(0, 1)
58+
limiter.limiter = rate.NewLimiter(rate.Inf, 1)
5359
limiter.limit.Store(0)
5460
return limiter
5561
}
5662

57-
func (l *delRateLimiter) Wait(ctx context.Context) error {
63+
type beforeWaitLimiterForTestType struct{}
64+
65+
var beforeWaitLimiterForTest = &beforeWaitLimiterForTestType{}
66+
67+
func (l *delRateLimiter) WaitDelToken(ctx context.Context) error {
5868
limit := l.limit.Load()
5969
if variable.TTLDeleteRateLimit.Load() != limit {
6070
limit = l.reset()
6171
}
6272

63-
if limit == 0 {
73+
intest.Assert(limit >= 0)
74+
if limit <= 0 {
6475
return ctx.Err()
6576
}
6677

78+
if intest.InTest {
79+
intest.Assert(l.limiter.Limit() > 0)
80+
if fn, ok := ctx.Value(beforeWaitLimiterForTest).(func()); ok {
81+
fn()
82+
}
83+
}
84+
6785
return l.limiter.Wait(ctx)
6886
}
6987

@@ -73,7 +91,13 @@ func (l *delRateLimiter) reset() (newLimit int64) {
7391
newLimit = variable.TTLDeleteRateLimit.Load()
7492
if newLimit != l.limit.Load() {
7593
l.limit.Store(newLimit)
76-
l.limiter.SetLimit(rate.Limit(newLimit))
94+
rateLimit := rate.Inf
95+
if newLimit > 0 {
96+
// When `TTLDeleteRateLimit > 0`, use the setting as the rate limit.
97+
// Otherwise, use `rate.Inf` to make it unlimited.
98+
rateLimit = rate.Limit(newLimit)
99+
}
100+
l.limiter.SetLimit(rateLimit)
77101
}
78102
return
79103
}
@@ -121,7 +145,7 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
121145
}
122146

123147
tracer.EnterPhase(metrics.PhaseWaitToken)
124-
if err = globalDelRateLimiter.Wait(ctx); err != nil {
148+
if err = globalDelRateLimiter.WaitDelToken(ctx); err != nil {
125149
t.statistics.IncErrorRows(len(delBatch))
126150
return
127151
}

pkg/ttl/ttlworker/del_test.go

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"slices"
2323
"strconv"
2424
"strings"
25+
"sync/atomic"
2526
"testing"
2627
"time"
2728

@@ -345,11 +346,22 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {
345346
}
346347

347348
func TestTTLDeleteRateLimiter(t *testing.T) {
348-
origDeleteLimit := variable.TTLDeleteRateLimit.Load()
349+
origGlobalDelRateLimiter := globalDelRateLimiter
349350
defer func() {
350-
variable.TTLDeleteRateLimit.Store(origDeleteLimit)
351+
globalDelRateLimiter = origGlobalDelRateLimiter
352+
variable.TTLDeleteRateLimit.Store(variable.DefTiDBTTLDeleteRateLimit)
351353
}()
352354

355+
// The global inner limiter should have a default config
356+
require.Equal(t, 0, variable.DefTiDBTTLDeleteRateLimit)
357+
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
358+
require.Equal(t, int64(0), globalDelRateLimiter.limit.Load())
359+
require.Equal(t, rate.Inf, globalDelRateLimiter.limiter.Limit())
360+
// The newDelRateLimiter() should return a default config
361+
globalDelRateLimiter = newDelRateLimiter()
362+
require.Equal(t, int64(0), globalDelRateLimiter.limit.Load())
363+
require.Equal(t, rate.Inf, globalDelRateLimiter.limiter.Limit())
364+
353365
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
354366
defer func() {
355367
if cancel != nil {
@@ -358,21 +370,21 @@ func TestTTLDeleteRateLimiter(t *testing.T) {
358370
}()
359371

360372
variable.TTLDeleteRateLimit.Store(100000)
361-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
373+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
362374
require.Equal(t, rate.Limit(100000), globalDelRateLimiter.limiter.Limit())
363375
require.Equal(t, int64(100000), globalDelRateLimiter.limit.Load())
364376

365377
variable.TTLDeleteRateLimit.Store(0)
366-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
367-
require.Equal(t, rate.Limit(0), globalDelRateLimiter.limiter.Limit())
378+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
379+
require.Equal(t, rate.Inf, globalDelRateLimiter.limiter.Limit())
368380
require.Equal(t, int64(0), globalDelRateLimiter.limit.Load())
369381

370382
// 0 stands for no limit
371-
require.NoError(t, globalDelRateLimiter.Wait(ctx))
383+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
372384
// cancel ctx returns an error
373385
cancel()
374386
cancel = nil
375-
require.EqualError(t, globalDelRateLimiter.Wait(ctx), "context canceled")
387+
require.EqualError(t, globalDelRateLimiter.WaitDelToken(ctx), "context canceled")
376388
}
377389

378390
func TestTTLDeleteTaskWorker(t *testing.T) {
@@ -492,3 +504,62 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
492504
require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load())
493505
require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load())
494506
}
507+
508+
// TestDelRateLimiterConcurrency is used to test some concurrency cases of delRateLimiter.
509+
// See issue: https://github.com/pingcap/tidb/issues/58484
510+
// It tests the below case:
511+
// 1. The `tidb_ttl_delete_rate_limit` set to some non-zero value such as 128.
512+
// 2. Some delWorker delete rows concurrency and try to wait for the inner `rate.Limiter`.
513+
// 3. Before internal `l.limiter.Wait` is called, the `tidb_ttl_delete_rate_limit` is set to 0.
514+
// It resets the internal `rate.Limiter` (in the bug codes, its rate is set to 0).
515+
// 4. The delWorkers in step 2 continue to call l.limiter.Wait.
516+
// In the bug codes, some of them are blocked forever because the rate is set to 0.
517+
func TestDelRateLimiterConcurrency(t *testing.T) {
518+
origGlobalDelRateLimiter := globalDelRateLimiter
519+
defer func() {
520+
globalDelRateLimiter = origGlobalDelRateLimiter
521+
variable.TTLDeleteRateLimit.Store(variable.DefTiDBTTLDeleteRateLimit)
522+
}()
523+
524+
globalDelRateLimiter = newDelRateLimiter()
525+
require.NoError(t, globalDelRateLimiter.WaitDelToken(context.Background()))
526+
527+
variable.TTLDeleteRateLimit.Store(128)
528+
var waiting atomic.Int64
529+
continue1 := make(chan struct{})
530+
continue2 := make(chan struct{})
531+
continue3 := make(chan struct{})
532+
cnt := 4
533+
for i := 0; i < cnt; i++ {
534+
go func() {
535+
ctx := context.WithValue(context.Background(), beforeWaitLimiterForTest, func() {
536+
if waiting.Add(1) == int64(cnt) {
537+
close(continue1)
538+
}
539+
<-continue2
540+
})
541+
require.NoError(t, globalDelRateLimiter.WaitDelToken(ctx))
542+
if waiting.Add(-1) == 0 {
543+
close(continue3)
544+
}
545+
}()
546+
}
547+
548+
timeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
549+
defer cancel()
550+
551+
select {
552+
case <-continue1:
553+
variable.TTLDeleteRateLimit.Store(0)
554+
require.NoError(t, globalDelRateLimiter.WaitDelToken(timeCtx))
555+
close(continue2)
556+
case <-timeCtx.Done():
557+
require.FailNow(t, "timeout")
558+
}
559+
560+
select {
561+
case <-continue3:
562+
case <-timeCtx.Done():
563+
require.FailNow(t, "timeout")
564+
}
565+
}

0 commit comments

Comments
 (0)