@@ -38,10 +38,11 @@ import (
38
38
)
39
39
40
40
const (
41
- controllerConfigPath = "resource_group/controller"
42
- maxNotificationChanLen = 200
43
- needTokensAmplification = 1.1
44
- trickleReserveDuration = 1250 * time .Millisecond
41
+ controllerConfigPath = "resource_group/controller"
42
+ maxNotificationChanLen = 200
43
+ needTokensAmplification = 1.1
44
+ trickleReserveDuration = 1250 * time .Millisecond
45
+ slowNotifyFilterDuration = 100 * time .Millisecond
45
46
46
47
watchRetryInterval = 30 * time .Second
47
48
)
@@ -139,7 +140,7 @@ type ResourceGroupsController struct {
139
140
calculators []ResourceCalculator
140
141
141
142
// When a signal is received, it means the number of available token is low.
142
- lowTokenNotifyChan chan struct {}
143
+ lowTokenNotifyChan chan NotifyMsg
143
144
// When a token bucket response received from server, it will be sent to the channel.
144
145
tokenResponseChan chan []* rmpb.TokenBucketResponse
145
146
// When the token bucket of a resource group is updated, it will be sent to the channel.
@@ -181,7 +182,7 @@ func NewResourceGroupController(
181
182
clientUniqueID : clientUniqueID ,
182
183
provider : provider ,
183
184
ruConfig : ruConfig ,
184
- lowTokenNotifyChan : make (chan struct {} , 1 ),
185
+ lowTokenNotifyChan : make (chan NotifyMsg , 1 ),
185
186
tokenResponseChan : make (chan []* rmpb.TokenBucketResponse , 1 ),
186
187
tokenBucketUpdateChan : make (chan * groupCostController , maxNotificationChanLen ),
187
188
opts : opts ,
@@ -287,7 +288,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
287
288
c .executeOnAllGroups ((* groupCostController ).updateRunState )
288
289
c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
289
290
if len (c .run .currentRequests ) == 0 {
290
- c .collectTokenBucketRequests (c .loopCtx , FromPeriodReport , periodicReport /* select resource groups which should be reported periodically */ )
291
+ c .collectTokenBucketRequests (c .loopCtx , FromPeriodReport , periodicReport /* select resource groups which should be reported periodically */ , NotifyMsg {} )
291
292
}
292
293
case <- watchRetryTimer .C :
293
294
if ! c .ruConfig .isSingleGroupByKeyspace && watchMetaChannel == nil {
@@ -325,11 +326,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
325
326
c .handleTokenBucketResponse (resp )
326
327
}
327
328
c .run .currentRequests = nil
328
- case <- c .lowTokenNotifyChan :
329
+ case notifyMsg := <- c .lowTokenNotifyChan :
329
330
c .executeOnAllGroups ((* groupCostController ).updateRunState )
330
331
c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
331
332
if len (c .run .currentRequests ) == 0 {
332
- c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ )
333
+ c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ , notifyMsg )
333
334
}
334
335
if c .run .inDegradedMode {
335
336
c .executeOnAllGroups ((* groupCostController ).applyDegradedMode )
@@ -508,7 +509,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
508
509
}
509
510
}
510
511
511
- func (c * ResourceGroupsController ) collectTokenBucketRequests (ctx context.Context , source string , typ selectType ) {
512
+ func (c * ResourceGroupsController ) collectTokenBucketRequests (ctx context.Context , source string , typ selectType , notifyMsg NotifyMsg ) {
512
513
c .run .currentRequests = make ([]* rmpb.TokenBucketRequest , 0 )
513
514
c .groupsController .Range (func (_ , value any ) bool {
514
515
gc := value .(* groupCostController )
@@ -520,11 +521,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
520
521
return true
521
522
})
522
523
if len (c .run .currentRequests ) > 0 {
523
- c .sendTokenBucketRequests (ctx , c .run .currentRequests , source )
524
+ c .sendTokenBucketRequests (ctx , c .run .currentRequests , source , notifyMsg )
524
525
}
525
526
}
526
527
527
- func (c * ResourceGroupsController ) sendTokenBucketRequests (ctx context.Context , requests []* rmpb.TokenBucketRequest , source string ) {
528
+ func (c * ResourceGroupsController ) sendTokenBucketRequests (ctx context.Context , requests []* rmpb.TokenBucketRequest , source string , notifyMsg NotifyMsg ) {
528
529
now := time .Now ()
529
530
req := & rmpb.TokenBucketsRequest {
530
531
Requests : requests ,
@@ -542,15 +543,22 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
542
543
if err != nil {
543
544
// Don't log any errors caused by the stopper canceling the context.
544
545
if ! errors .ErrorEqual (err , context .Canceled ) {
545
- log .L (). Sugar (). Infof ( "[resource group controller] token bucket rpc error: %v " , err )
546
+ log .Info ( "[resource group controller] token bucket rpc error" , zap . Error ( err ) )
546
547
}
547
548
resp = nil
548
549
failedTokenRequestDuration .Observe (latency .Seconds ())
549
550
} else {
550
551
successfulTokenRequestDuration .Observe (latency .Seconds ())
551
552
}
553
+ if ! notifyMsg .StartTime .IsZero () && time .Since (notifyMsg .StartTime ) > slowNotifyFilterDuration {
554
+ log .Warn ("[resource group controller] slow token bucket request" , zap .String ("source" , source ), zap .Duration ("cost" , time .Since (notifyMsg .StartTime )))
555
+ }
552
556
logControllerTrace ("[resource group controller] token bucket response" , zap .Time ("now" , time .Now ()), zap .Any ("resp" , resp ), zap .String ("source" , source ), zap .Duration ("latency" , latency ))
553
- c .tokenResponseChan <- resp
557
+ select {
558
+ case c .tokenResponseChan <- resp :
559
+ case <- ctx .Done ():
560
+ log .Error ("[resource group controller] send token bucket response failed" , zap .Error (ctx .Err ()))
561
+ }
554
562
}()
555
563
}
556
564
@@ -644,7 +652,7 @@ type groupCostController struct {
644
652
// fast path to make once token limit with un-limit burst.
645
653
burstable * atomic.Bool
646
654
647
- lowRUNotifyChan chan <- struct {}
655
+ lowRUNotifyChan chan <- NotifyMsg
648
656
tokenBucketUpdateChan chan <- * groupCostController
649
657
650
658
// run contains the state that is updated by the main loop.
@@ -734,7 +742,7 @@ type tokenCounter struct {
734
742
func newGroupCostController (
735
743
group * rmpb.ResourceGroup ,
736
744
mainCfg * RUConfig ,
737
- lowRUNotifyChan chan struct {} ,
745
+ lowRUNotifyChan chan NotifyMsg ,
738
746
tokenBucketUpdateChan chan * groupCostController ,
739
747
) (* groupCostController , error ) {
740
748
switch group .Mode {
@@ -853,7 +861,7 @@ func (gc *groupCostController) updateRunState() {
853
861
}
854
862
* gc .run .consumption = * gc .mu .consumption
855
863
gc .mu .Unlock ()
856
- logControllerTrace ("[resource group controller] update run state" , zap .Any ("request-unit-consumption" , gc .run .consumption ))
864
+ logControllerTrace ("[resource group controller] update run state" , zap .String ( "name" , gc . name ), zap . Any ("request-unit-consumption" , gc .run .consumption ))
857
865
gc .run .now = newTime
858
866
}
859
867
0 commit comments