@@ -38,14 +38,12 @@ import (
38
38
)
39
39
40
40
const (
41
- controllerConfigPath = "resource_group/controller"
42
- maxRetry = 10
43
- retryInterval = 50 * time .Millisecond
44
- maxNotificationChanLen = 200
45
- needTokensAmplification = 1.1
46
- trickleReserveDuration = 1250 * time .Millisecond
47
-
48
- watchRetryInterval = 30 * time .Second
41
+ controllerConfigPath = "resource_group/controller"
42
+ maxNotificationChanLen = 200
43
+ needTokensAmplification = 1.1
44
+ trickleReserveDuration = 1250 * time .Millisecond
45
+ slowNotifyFilterDuration = 10 * time .Millisecond
46
+ watchRetryInterval = 30 * time .Second
49
47
)
50
48
51
49
type selectType int
@@ -104,6 +102,20 @@ func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
104
102
}
105
103
}
106
104
105
+ // WithWaitRetryInterval is the option to set the retry interval when waiting for the token.
106
+ func WithWaitRetryInterval (d time.Duration ) ResourceControlCreateOption {
107
+ return func (controller * ResourceGroupsController ) {
108
+ controller .ruConfig .WaitRetryInterval = d
109
+ }
110
+ }
111
+
112
+ // WithWaitRetryTimes is the option to set the times to retry when waiting for the token.
113
+ func WithWaitRetryTimes (times int ) ResourceControlCreateOption {
114
+ return func (controller * ResourceGroupsController ) {
115
+ controller .ruConfig .WaitRetryTimes = times
116
+ }
117
+ }
118
+
107
119
var _ ResourceGroupKVInterceptor = (* ResourceGroupsController )(nil )
108
120
109
121
// ResourceGroupsController implements ResourceGroupKVInterceptor.
@@ -119,7 +131,7 @@ type ResourceGroupsController struct {
119
131
calculators []ResourceCalculator
120
132
121
133
// When a signal is received, it means the number of available token is low.
122
- lowTokenNotifyChan chan struct {}
134
+ lowTokenNotifyChan chan notifyMsg
123
135
// When a token bucket response received from server, it will be sent to the channel.
124
136
tokenResponseChan chan []* rmpb.TokenBucketResponse
125
137
// When the token bucket of a resource group is updated, it will be sent to the channel.
@@ -161,7 +173,7 @@ func NewResourceGroupController(
161
173
clientUniqueID : clientUniqueID ,
162
174
provider : provider ,
163
175
ruConfig : ruConfig ,
164
- lowTokenNotifyChan : make (chan struct {} , 1 ),
176
+ lowTokenNotifyChan : make (chan notifyMsg , 1 ),
165
177
tokenResponseChan : make (chan []* rmpb.TokenBucketResponse , 1 ),
166
178
tokenBucketUpdateChan : make (chan * groupCostController , maxNotificationChanLen ),
167
179
opts : opts ,
@@ -172,6 +184,7 @@ func NewResourceGroupController(
172
184
log .Info ("load resource controller config" , zap .Reflect ("config" , config ), zap .Reflect ("ru-config" , controller .ruConfig ))
173
185
controller .calculators = []ResourceCalculator {newKVCalculator (controller .ruConfig ), newSQLCalculator (controller .ruConfig )}
174
186
controller .safeRuConfig .Store (controller .ruConfig )
187
+ enableControllerTraceLog .Store (config .EnableControllerTraceLog )
175
188
return controller , nil
176
189
}
177
190
@@ -180,12 +193,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con
180
193
if err != nil {
181
194
return nil , err
182
195
}
196
+ config := DefaultConfig ()
197
+ defer config .Adjust ()
183
198
kvs := resp .GetKvs ()
184
199
if len (kvs ) == 0 {
185
200
log .Warn ("[resource group controller] server does not save config, load config failed" )
186
- return DefaultConfig () , nil
201
+ return config , nil
187
202
}
188
- config := & Config {}
189
203
err = json .Unmarshal (kvs [0 ].GetValue (), config )
190
204
if err != nil {
191
205
return nil , err
@@ -267,7 +281,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
267
281
c .executeOnAllGroups ((* groupCostController ).updateRunState )
268
282
c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
269
283
if len (c .run .currentRequests ) == 0 {
270
- c .collectTokenBucketRequests (c .loopCtx , FromPeriodReport , periodicReport /* select resource groups which should be reported periodically */ )
284
+ c .collectTokenBucketRequests (c .loopCtx , FromPeriodReport , periodicReport /* select resource groups which should be reported periodically */ , notifyMsg {} )
271
285
}
272
286
case <- watchRetryTimer .C :
273
287
if ! c .ruConfig .isSingleGroupByKeyspace && watchMetaChannel == nil {
@@ -288,7 +302,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
288
302
watchRetryTimer .Reset (watchRetryInterval )
289
303
}
290
304
}
291
-
292
305
case <- emergencyTokenAcquisitionTicker .C :
293
306
c .executeOnAllGroups ((* groupCostController ).resetEmergencyTokenAcquisition )
294
307
/* channels */
@@ -305,11 +318,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
305
318
c .handleTokenBucketResponse (resp )
306
319
}
307
320
c .run .currentRequests = nil
308
- case <- c .lowTokenNotifyChan :
321
+ case notifyMsg := <- c .lowTokenNotifyChan :
309
322
c .executeOnAllGroups ((* groupCostController ).updateRunState )
310
323
c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
311
324
if len (c .run .currentRequests ) == 0 {
312
- c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ )
325
+ c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ , notifyMsg )
313
326
}
314
327
if c .run .inDegradedMode {
315
328
c .executeOnAllGroups ((* groupCostController ).applyDegradedMode )
@@ -366,10 +379,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
366
379
}
367
380
for _ , item := range resp {
368
381
cfgRevision = item .Kv .ModRevision
369
- config := & Config {}
382
+ config := DefaultConfig ()
370
383
if err := json .Unmarshal (item .Kv .Value , config ); err != nil {
371
384
continue
372
385
}
386
+ config .Adjust ()
373
387
c .ruConfig = GenerateRUConfig (config )
374
388
375
389
// Stay compatible with serverless
@@ -383,7 +397,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
383
397
}
384
398
log .Info ("load resource controller config after config changed" , zap .Reflect ("config" , config ), zap .Reflect ("ruConfig" , c .ruConfig ))
385
399
}
386
-
387
400
case gc := <- c .tokenBucketUpdateChan :
388
401
now := gc .run .now
389
402
go gc .handleTokenBucketUpdateEvent (c .loopCtx , now )
@@ -489,7 +502,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB
489
502
}
490
503
}
491
504
492
- func (c * ResourceGroupsController ) collectTokenBucketRequests (ctx context.Context , source string , typ selectType ) {
505
+ func (c * ResourceGroupsController ) collectTokenBucketRequests (ctx context.Context , source string , typ selectType , notifyMsg notifyMsg ) {
493
506
c .run .currentRequests = make ([]* rmpb.TokenBucketRequest , 0 )
494
507
c .groupsController .Range (func (name , value any ) bool {
495
508
gc := value .(* groupCostController )
@@ -501,11 +514,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
501
514
return true
502
515
})
503
516
if len (c .run .currentRequests ) > 0 {
504
- c .sendTokenBucketRequests (ctx , c .run .currentRequests , source )
517
+ c .sendTokenBucketRequests (ctx , c .run .currentRequests , source , notifyMsg )
505
518
}
506
519
}
507
520
508
- func (c * ResourceGroupsController ) sendTokenBucketRequests (ctx context.Context , requests []* rmpb.TokenBucketRequest , source string ) {
521
+ func (c * ResourceGroupsController ) sendTokenBucketRequests (ctx context.Context , requests []* rmpb.TokenBucketRequest , source string , notifyMsg notifyMsg ) {
509
522
now := time .Now ()
510
523
req := & rmpb.TokenBucketsRequest {
511
524
Requests : requests ,
@@ -523,13 +536,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
523
536
if err != nil {
524
537
// Don't log any errors caused by the stopper canceling the context.
525
538
if ! errors .ErrorEqual (err , context .Canceled ) {
526
- log .L (). Sugar (). Infof ( "[resource group controller] token bucket rpc error: %v " , err )
539
+ log .Error ( "[resource group controller] token bucket rpc error" , zap . Error ( err ) )
527
540
}
528
541
resp = nil
529
542
failedTokenRequestDuration .Observe (latency .Seconds ())
530
543
} else {
531
544
successfulTokenRequestDuration .Observe (latency .Seconds ())
532
545
}
546
+ if ! notifyMsg .startTime .IsZero () && time .Since (notifyMsg .startTime ) > slowNotifyFilterDuration {
547
+ log .Warn ("[resource group controller] slow token bucket request" , zap .String ("source" , source ), zap .Duration ("cost" , time .Since (notifyMsg .startTime )))
548
+ }
533
549
logControllerTrace ("[resource group controller] token bucket response" , zap .Time ("now" , time .Now ()), zap .Any ("resp" , resp ), zap .String ("source" , source ), zap .Duration ("latency" , latency ))
534
550
c .tokenResponseChan <- resp
535
551
}()
@@ -625,7 +641,7 @@ type groupCostController struct {
625
641
// fast path to make once token limit with un-limit burst.
626
642
burstable * atomic.Bool
627
643
628
- lowRUNotifyChan chan <- struct {}
644
+ lowRUNotifyChan chan <- notifyMsg
629
645
tokenBucketUpdateChan chan <- * groupCostController
630
646
631
647
// run contains the state that is updated by the main loop.
@@ -715,7 +731,7 @@ type tokenCounter struct {
715
731
func newGroupCostController (
716
732
group * rmpb.ResourceGroup ,
717
733
mainCfg * RUConfig ,
718
- lowRUNotifyChan chan struct {} ,
734
+ lowRUNotifyChan chan notifyMsg ,
719
735
tokenBucketUpdateChan chan * groupCostController ,
720
736
) (* groupCostController , error ) {
721
737
switch group .Mode {
@@ -834,7 +850,7 @@ func (gc *groupCostController) updateRunState() {
834
850
}
835
851
* gc .run .consumption = * gc .mu .consumption
836
852
gc .mu .Unlock ()
837
- logControllerTrace ("[resource group controller] update run state" , zap .Any ("request-unit-consumption" , gc .run .consumption ))
853
+ logControllerTrace ("[resource group controller] update run state" , zap .String ( "name" , gc . name ), zap . Any ("request-unit-consumption" , gc .run .consumption ))
838
854
gc .run .now = newTime
839
855
}
840
856
@@ -1034,7 +1050,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() {
1034
1050
cfg .NewRate = 99999999
1035
1051
})
1036
1052
counter .limiter .Reconfigure (gc .run .now , cfg , resetLowProcess ())
1037
- log .Info ("[resource group controller] resource token bucket enter degraded mode" , zap .String ("resource-group " , gc .name ), zap .String ("type" , rmpb .RequestUnitType_name [int32 (typ )]))
1053
+ log .Info ("[resource group controller] resource token bucket enter degraded mode" , zap .String ("name " , gc .name ), zap .String ("type" , rmpb .RequestUnitType_name [int32 (typ )]))
1038
1054
}
1039
1055
}
1040
1056
@@ -1088,6 +1104,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
1088
1104
timerDuration = (trickleDuration + trickleReserveDuration ) / 2
1089
1105
}
1090
1106
counter .notify .mu .Lock ()
1107
+ if counter .notify .setupNotificationTimer != nil {
1108
+ counter .notify .setupNotificationTimer .Stop ()
1109
+ }
1091
1110
counter .notify .setupNotificationTimer = time .NewTimer (timerDuration )
1092
1111
counter .notify .setupNotificationCh = counter .notify .setupNotificationTimer .C
1093
1112
counter .notify .setupNotificationThreshold = 1
@@ -1222,7 +1241,7 @@ func (gc *groupCostController) onRequestWait(
1222
1241
var i int
1223
1242
var d time.Duration
1224
1243
retryLoop:
1225
- for i = 0 ; i < maxRetry ; i ++ {
1244
+ for i = 0 ; i < gc . mainCfg . WaitRetryTimes ; i ++ {
1226
1245
switch gc .mode {
1227
1246
case rmpb .GroupMode_RawMode :
1228
1247
res := make ([]* Reservation , 0 , len (requestResourceLimitTypeList ))
@@ -1246,8 +1265,8 @@ func (gc *groupCostController) onRequestWait(
1246
1265
}
1247
1266
}
1248
1267
gc .metrics .requestRetryCounter .Inc ()
1249
- time .Sleep (retryInterval )
1250
- waitDuration += retryInterval
1268
+ time .Sleep (gc . mainCfg . WaitRetryInterval )
1269
+ waitDuration += gc . mainCfg . WaitRetryInterval
1251
1270
}
1252
1271
if err != nil {
1253
1272
if errs .ErrClientResourceGroupThrottled .Equal (err ) {
@@ -1260,7 +1279,7 @@ func (gc *groupCostController) onRequestWait(
1260
1279
sub (gc .mu .consumption , delta )
1261
1280
gc .mu .Unlock ()
1262
1281
failpoint .Inject ("triggerUpdate" , func () {
1263
- gc .lowRUNotifyChan <- struct {} {}
1282
+ gc .lowRUNotifyChan <- notifyMsg {}
1264
1283
})
1265
1284
return nil , nil , waitDuration , 0 , err
1266
1285
}
0 commit comments