Skip to content

Commit 9482860

Browse files
Shuning ChenShuning Chen
authored andcommitted
client/controller: record context error and add slowlog about token bucket request
Signed-off-by: Shuning Chen <[email protected]>
1 parent 114cb56 commit 9482860

File tree

4 files changed

+59
-28
lines changed

4 files changed

+59
-28
lines changed

client/resource_group/controller/controller.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ import (
3838
)
3939

4040
const (
41-
controllerConfigPath = "resource_group/controller"
42-
maxNotificationChanLen = 200
43-
needTokensAmplification = 1.1
44-
trickleReserveDuration = 1250 * time.Millisecond
41+
controllerConfigPath = "resource_group/controller"
42+
maxNotificationChanLen = 200
43+
needTokensAmplification = 1.1
44+
trickleReserveDuration = 1250 * time.Millisecond
45+
slowNotifyFilterDuration = 5 * time.Millisecond
4546

4647
watchRetryInterval = 30 * time.Second
4748
)
@@ -139,7 +140,7 @@ type ResourceGroupsController struct {
139140
calculators []ResourceCalculator
140141

141142
// When a signal is received, it means the number of available token is low.
142-
lowTokenNotifyChan chan struct{}
143+
lowTokenNotifyChan chan NotifyMsg
143144
// When a token bucket response received from server, it will be sent to the channel.
144145
tokenResponseChan chan []*rmpb.TokenBucketResponse
145146
// When the token bucket of a resource group is updated, it will be sent to the channel.
@@ -181,7 +182,7 @@ func NewResourceGroupController(
181182
clientUniqueID: clientUniqueID,
182183
provider: provider,
183184
ruConfig: ruConfig,
184-
lowTokenNotifyChan: make(chan struct{}, 1),
185+
lowTokenNotifyChan: make(chan NotifyMsg, 1),
185186
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
186187
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
187188
opts: opts,
@@ -287,7 +288,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
287288
c.executeOnAllGroups((*groupCostController).updateRunState)
288289
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
289290
if len(c.run.currentRequests) == 0 {
290-
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
291+
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, NotifyMsg{})
291292
}
292293
case <-watchRetryTimer.C:
293294
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
@@ -325,11 +326,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
325326
c.handleTokenBucketResponse(resp)
326327
}
327328
c.run.currentRequests = nil
328-
case <-c.lowTokenNotifyChan:
329+
case notifyMsg := <-c.lowTokenNotifyChan:
329330
c.executeOnAllGroups((*groupCostController).updateRunState)
330331
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
331332
if len(c.run.currentRequests) == 0 {
332-
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */)
333+
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
333334
}
334335
if c.run.inDegradedMode {
335336
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
@@ -508,7 +509,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
508509
}
509510
}
510511

511-
func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType) {
512+
func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg NotifyMsg) {
512513
c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0)
513514
c.groupsController.Range(func(_, value any) bool {
514515
gc := value.(*groupCostController)
@@ -520,11 +521,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
520521
return true
521522
})
522523
if len(c.run.currentRequests) > 0 {
523-
c.sendTokenBucketRequests(ctx, c.run.currentRequests, source)
524+
c.sendTokenBucketRequests(ctx, c.run.currentRequests, source, notifyMsg)
524525
}
525526
}
526527

527-
func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string) {
528+
func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg NotifyMsg) {
528529
now := time.Now()
529530
req := &rmpb.TokenBucketsRequest{
530531
Requests: requests,
@@ -542,13 +543,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
542543
if err != nil {
543544
// Don't log any errors caused by the stopper canceling the context.
544545
if !errors.ErrorEqual(err, context.Canceled) {
545-
log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err)
546+
log.Info("[resource group controller] token bucket rpc error", zap.Error(err))
546547
}
547548
resp = nil
548549
failedTokenRequestDuration.Observe(latency.Seconds())
549550
} else {
550551
successfulTokenRequestDuration.Observe(latency.Seconds())
551552
}
553+
if !notifyMsg.StartTime.IsZero() && time.Since(notifyMsg.StartTime) > slowNotifyFilterDuration {
554+
log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.StartTime)))
555+
}
552556
logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
553557
c.tokenResponseChan <- resp
554558
}()
@@ -644,7 +648,7 @@ type groupCostController struct {
644648
// fast path to make once token limit with un-limit burst.
645649
burstable *atomic.Bool
646650

647-
lowRUNotifyChan chan<- struct{}
651+
lowRUNotifyChan chan<- NotifyMsg
648652
tokenBucketUpdateChan chan<- *groupCostController
649653

650654
// run contains the state that is updated by the main loop.
@@ -734,7 +738,7 @@ type tokenCounter struct {
734738
func newGroupCostController(
735739
group *rmpb.ResourceGroup,
736740
mainCfg *RUConfig,
737-
lowRUNotifyChan chan struct{},
741+
lowRUNotifyChan chan NotifyMsg,
738742
tokenBucketUpdateChan chan *groupCostController,
739743
) (*groupCostController, error) {
740744
switch group.Mode {
@@ -853,7 +857,7 @@ func (gc *groupCostController) updateRunState() {
853857
}
854858
*gc.run.consumption = *gc.mu.consumption
855859
gc.mu.Unlock()
856-
logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
860+
logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption))
857861
gc.run.now = newTime
858862
}
859863

@@ -1107,6 +1111,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
11071111
timerDuration = (trickleDuration + trickleReserveDuration) / 2
11081112
}
11091113
counter.notify.mu.Lock()
1114+
if counter.notify.setupNotificationTimer != nil {
1115+
counter.notify.setupNotificationTimer.Stop()
1116+
}
11101117
counter.notify.setupNotificationTimer = time.NewTimer(timerDuration)
11111118
counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C
11121119
counter.notify.setupNotificationThreshold = 1
@@ -1279,7 +1286,7 @@ func (gc *groupCostController) onRequestWait(
12791286
sub(gc.mu.consumption, delta)
12801287
gc.mu.Unlock()
12811288
failpoint.Inject("triggerUpdate", func() {
1282-
gc.lowRUNotifyChan <- struct{}{}
1289+
gc.lowRUNotifyChan <- NotifyMsg{}
12831290
})
12841291
return nil, nil, waitDuration, 0, err
12851292
}

client/resource_group/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
4545
JobTypes: []string{"lightning", "br"},
4646
},
4747
}
48-
ch1 := make(chan struct{})
48+
ch1 := make(chan NotifyMsg)
4949
ch2 := make(chan *groupCostController)
5050
gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2)
5151
re.NoError(err)

client/resource_group/controller/limiter.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type Limiter struct {
7575
// last is the last time the limiter's tokens field was updated
7676
last time.Time
7777
notifyThreshold float64
78-
lowTokensNotifyChan chan<- struct{}
78+
lowTokensNotifyChan chan<- NotifyMsg
7979
// To prevent too many chan sent, the notifyThreshold is set to 0 after notify.
8080
// So the notifyThreshold cannot show whether the limiter is in the low token state,
8181
// isLowProcess is used to check it.
@@ -88,6 +88,11 @@ type Limiter struct {
8888
metrics *limiterMetricsCollection
8989
}
9090

91+
// NotifyMsg is a message to notify the low token state.
92+
type NotifyMsg struct {
93+
StartTime time.Time
94+
}
95+
9196
// limiterMetricsCollection is a collection of metrics for a limiter.
9297
type limiterMetricsCollection struct {
9398
lowTokenNotifyCounter prometheus.Counter
@@ -102,7 +107,7 @@ func (lim *Limiter) Limit() Limit {
102107

103108
// NewLimiter returns a new Limiter that allows events up to rate r and permits
104109
// bursts of at most b tokens.
105-
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter {
110+
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- NotifyMsg) *Limiter {
106111
lim := &Limiter{
107112
limit: r,
108113
last: now,
@@ -116,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify
116121

117122
// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
118123
// bursts of at most b tokens.
119-
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
124+
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- NotifyMsg) *Limiter {
120125
lim := &Limiter{
121126
name: name,
122127
limit: Limit(cfg.NewRate),
@@ -144,6 +149,7 @@ type Reservation struct {
144149
// This is the Limit at reservation time, it can change later.
145150
limit Limit
146151
remainingTokens float64
152+
err error
147153
}
148154

149155
// OK returns whether the limiter can provide the requested number of tokens
@@ -218,7 +224,8 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now
218224
select {
219225
case <-ctx.Done():
220226
return &Reservation{
221-
ok: false,
227+
ok: false,
228+
err: ctx.Err(),
222229
}
223230
default:
224231
}
@@ -255,7 +262,7 @@ func (lim *Limiter) notify() {
255262
lim.notifyThreshold = 0
256263
lim.isLowProcess = true
257264
select {
258-
case lim.lowTokensNotifyChan <- struct{}{}:
265+
case lim.lowTokensNotifyChan <- NotifyMsg{StartTime: time.Now()}:
259266
if lim.metrics != nil {
260267
lim.metrics.lowTokenNotifyCounter.Inc()
261268
}
@@ -414,7 +421,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
414421
zap.Float64("notify-threshold", lim.notifyThreshold),
415422
zap.Bool("is-low-process", lim.isLowProcess),
416423
zap.Int64("burst", lim.burst),
417-
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
424+
zap.Int("remaining-notify-times", lim.remainingNotifyTimes),
425+
zap.String("name", lim.name))
418426
}
419427
lim.last = last
420428
if lim.limit == 0 {
@@ -495,6 +503,9 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
495503
for _, res := range reservations {
496504
if !res.ok {
497505
cancel()
506+
if res.err != nil {
507+
return res.needWaitDuration, res.err
508+
}
498509
return res.needWaitDuration, errs.ErrClientResourceGroupThrottled.FastGenByArgs(res.needWaitDuration, res.limit, res.remainingTokens)
499510
}
500511
delay := res.DelayFrom(now)

client/resource_group/controller/limiter_test.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo
8383
}
8484

8585
func TestSimpleReserve(t *testing.T) {
86-
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))
86+
lim := NewLimiter(t0, 1, 0, 2, make(chan NotifyMsg, 1))
8787

8888
runReserveMax(t, lim, request{t0, 3, t1, true})
8989
runReserveMax(t, lim, request{t0, 3, t4, true})
@@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) {
103103

104104
func TestReconfig(t *testing.T) {
105105
re := require.New(t)
106-
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))
106+
lim := NewLimiter(t0, 1, 0, 2, make(chan NotifyMsg, 1))
107107

108108
runReserveMax(t, lim, request{t0, 4, t2, true})
109109
args := tokenBucketReconfigureArgs{
@@ -126,7 +126,7 @@ func TestReconfig(t *testing.T) {
126126
}
127127

128128
func TestNotify(t *testing.T) {
129-
nc := make(chan struct{}, 1)
129+
nc := make(chan NotifyMsg, 1)
130130
lim := NewLimiter(t0, 1, 0, 0, nc)
131131

132132
args := tokenBucketReconfigureArgs{
@@ -147,7 +147,7 @@ func TestCancel(t *testing.T) {
147147
ctx := context.Background()
148148
ctx1, cancel1 := context.WithDeadline(ctx, t2)
149149
re := require.New(t)
150-
nc := make(chan struct{}, 1)
150+
nc := make(chan NotifyMsg, 1)
151151
lim1 := NewLimiter(t0, 1, 0, 10, nc)
152152
lim2 := NewLimiter(t0, 1, 0, 0, nc)
153153

@@ -186,3 +186,16 @@ func TestCancel(t *testing.T) {
186186
checkTokens(re, lim1, t5, 15)
187187
checkTokens(re, lim2, t5, 5)
188188
}
189+
190+
func TestCancelErrorOfReservation(t *testing.T) {
191+
re := require.New(t)
192+
nc := make(chan NotifyMsg, 1)
193+
lim := NewLimiter(t0, 10, 0, 10, nc)
194+
ctx, cancel := context.WithCancel(context.Background())
195+
cancel()
196+
r := lim.Reserve(ctx, InfDuration, t0, 5)
197+
d, err := WaitReservations(context.Background(), t0, []*Reservation{r})
198+
re.Equal(0*time.Second, d)
199+
re.Error(err)
200+
re.Contains(err.Error(), "context canceled")
201+
}

0 commit comments

Comments
 (0)