@@ -38,14 +38,14 @@ import (
38
38
)
39
39
40
40
const (
41
- controllerConfigPath = "resource_group/controller"
42
- maxRetry = 10
43
- retryInterval = 50 * time .Millisecond
44
- maxNotificationChanLen = 200
45
- needTokensAmplification = 1.1
46
- trickleReserveDuration = 1250 * time .Millisecond
47
-
48
- watchRetryInterval = 30 * time .Second
41
+ controllerConfigPath = "resource_group/controller"
42
+ maxRetry = 10
43
+ retryInterval = 50 * time .Millisecond
44
+ maxNotificationChanLen = 200
45
+ needTokensAmplification = 1.1
46
+ trickleReserveDuration = 1250 * time .Millisecond
47
+ slowNotifyFilterDuration = 10 * time . Millisecond
48
+ watchRetryInterval = 30 * time .Second
49
49
)
50
50
51
51
type selectType int
@@ -119,7 +119,7 @@ type ResourceGroupsController struct {
119
119
calculators []ResourceCalculator
120
120
121
121
// When a signal is received, it means the number of available token is low.
122
- lowTokenNotifyChan chan struct {}
122
+ lowTokenNotifyChan chan notifyMsg
123
123
// When a token bucket response received from server, it will be sent to the channel.
124
124
tokenResponseChan chan []* rmpb.TokenBucketResponse
125
125
// When the token bucket of a resource group is updated, it will be sent to the channel.
@@ -161,7 +161,7 @@ func NewResourceGroupController(
161
161
clientUniqueID : clientUniqueID ,
162
162
provider : provider ,
163
163
ruConfig : ruConfig ,
164
- lowTokenNotifyChan : make (chan struct {} , 1 ),
164
+ lowTokenNotifyChan : make (chan notifyMsg , 1 ),
165
165
tokenResponseChan : make (chan []* rmpb.TokenBucketResponse , 1 ),
166
166
tokenBucketUpdateChan : make (chan * groupCostController , maxNotificationChanLen ),
167
167
opts : opts ,
@@ -267,7 +267,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
267
267
c .executeOnAllGroups ((* groupCostController ).updateRunState )
268
268
c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
269
269
if len (c .run .currentRequests ) == 0 {
270
- c .collectTokenBucketRequests (c .loopCtx , FromPeriodReport , periodicReport /* select resource groups which should be reported periodically */ )
270
+ c .collectTokenBucketRequests (c .loopCtx , FromPeriodReport , periodicReport /* select resource groups which should be reported periodically */ , notifyMsg {} )
271
271
}
272
272
case <- watchRetryTimer .C :
273
273
if ! c .ruConfig .isSingleGroupByKeyspace && watchMetaChannel == nil {
@@ -305,11 +305,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
305
305
c .handleTokenBucketResponse (resp )
306
306
}
307
307
c .run .currentRequests = nil
308
- case <- c .lowTokenNotifyChan :
308
+ case notifyMsg := <- c .lowTokenNotifyChan :
309
309
c .executeOnAllGroups ((* groupCostController ).updateRunState )
310
310
c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
311
311
if len (c .run .currentRequests ) == 0 {
312
- c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ )
312
+ c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ , notifyMsg )
313
313
}
314
314
if c .run .inDegradedMode {
315
315
c .executeOnAllGroups ((* groupCostController ).applyDegradedMode )
@@ -489,7 +489,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
489
489
}
490
490
}
491
491
492
- func (c * ResourceGroupsController ) collectTokenBucketRequests (ctx context.Context , source string , typ selectType ) {
492
+ func (c * ResourceGroupsController ) collectTokenBucketRequests (ctx context.Context , source string , typ selectType , notifyMsg notifyMsg ) {
493
493
c .run .currentRequests = make ([]* rmpb.TokenBucketRequest , 0 )
494
494
c .groupsController .Range (func (name , value any ) bool {
495
495
gc := value .(* groupCostController )
@@ -501,11 +501,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
501
501
return true
502
502
})
503
503
if len (c .run .currentRequests ) > 0 {
504
- c .sendTokenBucketRequests (ctx , c .run .currentRequests , source )
504
+ c .sendTokenBucketRequests (ctx , c .run .currentRequests , source , notifyMsg )
505
505
}
506
506
}
507
507
508
- func (c * ResourceGroupsController ) sendTokenBucketRequests (ctx context.Context , requests []* rmpb.TokenBucketRequest , source string ) {
508
+ func (c * ResourceGroupsController ) sendTokenBucketRequests (ctx context.Context , requests []* rmpb.TokenBucketRequest , source string , notifyMsg notifyMsg ) {
509
509
now := time .Now ()
510
510
req := & rmpb.TokenBucketsRequest {
511
511
Requests : requests ,
@@ -523,13 +523,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
523
523
if err != nil {
524
524
// Don't log any errors caused by the stopper canceling the context.
525
525
if ! errors .ErrorEqual (err , context .Canceled ) {
526
- log .L (). Sugar (). Infof ( "[resource group controller] token bucket rpc error: %v " , err )
526
+ log .Error ( "[resource group controller] token bucket rpc error" , zap . Error ( err ) )
527
527
}
528
528
resp = nil
529
529
failedTokenRequestDuration .Observe (latency .Seconds ())
530
530
} else {
531
531
successfulTokenRequestDuration .Observe (latency .Seconds ())
532
532
}
533
+ if ! notifyMsg .startTime .IsZero () && time .Since (notifyMsg .startTime ) > slowNotifyFilterDuration {
534
+ log .Warn ("[resource group controller] slow token bucket request" , zap .String ("source" , source ), zap .Duration ("cost" , time .Since (notifyMsg .startTime )))
535
+ }
533
536
logControllerTrace ("[resource group controller] token bucket response" , zap .Time ("now" , time .Now ()), zap .Any ("resp" , resp ), zap .String ("source" , source ), zap .Duration ("latency" , latency ))
534
537
c .tokenResponseChan <- resp
535
538
}()
@@ -625,7 +628,7 @@ type groupCostController struct {
625
628
// fast path to make once token limit with un-limit burst.
626
629
burstable * atomic.Bool
627
630
628
- lowRUNotifyChan chan <- struct {}
631
+ lowRUNotifyChan chan <- notifyMsg
629
632
tokenBucketUpdateChan chan <- * groupCostController
630
633
631
634
// run contains the state that is updated by the main loop.
@@ -715,7 +718,7 @@ type tokenCounter struct {
715
718
func newGroupCostController (
716
719
group * rmpb.ResourceGroup ,
717
720
mainCfg * RUConfig ,
718
- lowRUNotifyChan chan struct {} ,
721
+ lowRUNotifyChan chan notifyMsg ,
719
722
tokenBucketUpdateChan chan * groupCostController ,
720
723
) (* groupCostController , error ) {
721
724
switch group .Mode {
@@ -834,7 +837,7 @@ func (gc *groupCostController) updateRunState() {
834
837
}
835
838
* gc .run .consumption = * gc .mu .consumption
836
839
gc .mu .Unlock ()
837
- logControllerTrace ("[resource group controller] update run state" , zap .Any ("request-unit-consumption" , gc .run .consumption ))
840
+ logControllerTrace ("[resource group controller] update run state" , zap .String ( "name" , gc . name ), zap . Any ("request-unit-consumption" , gc .run .consumption ))
838
841
gc .run .now = newTime
839
842
}
840
843
@@ -1034,7 +1037,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() {
1034
1037
cfg .NewRate = 99999999
1035
1038
})
1036
1039
counter .limiter .Reconfigure (gc .run .now , cfg , resetLowProcess ())
1037
- log .Info ("[resource group controller] resource token bucket enter degraded mode" , zap .String ("resource-group " , gc .name ), zap .String ("type" , rmpb .RequestUnitType_name [int32 (typ )]))
1040
+ log .Info ("[resource group controller] resource token bucket enter degraded mode" , zap .String ("name " , gc .name ), zap .String ("type" , rmpb .RequestUnitType_name [int32 (typ )]))
1038
1041
}
1039
1042
}
1040
1043
@@ -1088,6 +1091,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
1088
1091
timerDuration = (trickleDuration + trickleReserveDuration ) / 2
1089
1092
}
1090
1093
counter .notify .mu .Lock ()
1094
+ if counter .notify .setupNotificationTimer != nil {
1095
+ counter .notify .setupNotificationTimer .Stop ()
1096
+ }
1091
1097
counter .notify .setupNotificationTimer = time .NewTimer (timerDuration )
1092
1098
counter .notify .setupNotificationCh = counter .notify .setupNotificationTimer .C
1093
1099
counter .notify .setupNotificationThreshold = 1
@@ -1260,7 +1266,7 @@ func (gc *groupCostController) onRequestWait(
1260
1266
sub (gc .mu .consumption , delta )
1261
1267
gc .mu .Unlock ()
1262
1268
failpoint .Inject ("triggerUpdate" , func () {
1263
- gc .lowRUNotifyChan <- struct {} {}
1269
+ gc .lowRUNotifyChan <- notifyMsg {}
1264
1270
})
1265
1271
return nil , nil , waitDuration , 0 , err
1266
1272
}
0 commit comments