Skip to content

Commit d8053b1

Browse files
authored
feat(resourcemanager): add trickle-time enforcement to service limiter (#9684)
ref #9296, close #9668 Enhance `serviceLimiter.applyServiceLimit` to return both limited tokens and a minimum trickle time, ensuring client-side rate does not exceed service limit. Signed-off-by: JmPotato <[email protected]>
1 parent 98b85a0 commit d8053b1

File tree

5 files changed

+51
-29
lines changed

5 files changed

+51
-29
lines changed

pkg/mcs/resourcemanager/server/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
530530
if err != nil {
531531
continue
532532
}
533-
setOrRemoveServiceLimitMetrics(keyspaceName, krgm.getServiceLimiter().GetServiceLimit())
533+
setOrRemoveServiceLimitMetrics(keyspaceName, krgm.getServiceLimiter().getServiceLimit())
534534

535535
for _, group := range krgm.getResourceGroupList(true, true) {
536536
groupName := group.Name

pkg/mcs/resourcemanager/server/resource_group.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,14 +273,17 @@ func (rg *ResourceGroup) RequestRU(
273273
}
274274
// Then, try to apply the service limit.
275275
grantedTokens := tb.GetTokens()
276-
limitedTokens := sl.applyServiceLimit(now, grantedTokens)
276+
limitedTokens, minTrickleTimeMs := sl.applyServiceLimit(now, grantedTokens)
277277
if limitedTokens < grantedTokens {
278278
tb.Tokens = limitedTokens
279279
// Retain the unused tokens for the later requests if it has a burst limit.
280280
if rg.getBurstLimitLocked() > 0 {
281281
rg.RUSettings.RU.reservedServiceTokens += grantedTokens - limitedTokens
282282
}
283283
}
284+
if trickleTimeMs < minTrickleTimeMs {
285+
trickleTimeMs = minTrickleTimeMs
286+
}
284287
return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
285288
}
286289

pkg/mcs/resourcemanager/server/service_limit.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,16 @@ func (krl *serviceLimiter) setServiceLimit(newServiceLimit float64) {
9191
}
9292
}
9393

94-
// GetServiceLimit return the service limit value of this keyspace.
95-
func (krl *serviceLimiter) GetServiceLimit() float64 {
94+
func (krl *serviceLimiter) getServiceLimit() float64 {
9695
if krl == nil {
9796
return 0.0
9897
}
9998
krl.RLock()
10099
defer krl.RUnlock()
100+
return krl.getServiceLimitLocked()
101+
}
102+
103+
func (krl *serviceLimiter) getServiceLimitLocked() float64 {
101104
return krl.ServiceLimit
102105
}
103106

@@ -130,35 +133,35 @@ func (krl *serviceLimiter) refillTokensLocked(now time.Time) {
130133
func (krl *serviceLimiter) applyServiceLimit(
131134
now time.Time,
132135
requestedTokens float64,
133-
) (limitedTokens float64) {
136+
) (limitedTokens float64, minTrickleTimeMs int64) {
134137
if krl == nil {
135-
return requestedTokens
138+
return requestedTokens, 0
136139
}
137140
krl.Lock()
138141
defer krl.Unlock()
139142

140143
// No limit configured, allow all tokens.
141144
if krl.ServiceLimit <= 0 {
142-
return requestedTokens
145+
return requestedTokens, 0
143146
}
144147

145148
// Refill first to ensure the available tokens is up to date.
146149
krl.refillTokensLocked(now)
147150

148151
// If the requested tokens is less than the available tokens, grant all tokens.
149152
if requestedTokens <= krl.AvailableTokens {
153+
limitedTokens = requestedTokens
150154
krl.AvailableTokens -= requestedTokens
151-
return requestedTokens
152-
}
153-
154-
// If the requested tokens is greater than the available tokens, grant all available tokens.
155-
if requestedTokens > krl.AvailableTokens {
155+
} else {
156+
// If the requested tokens is greater than the available tokens, grant all available tokens.
156157
limitedTokens = math.Max(0, krl.AvailableTokens)
157-
// TODO: allow the loan to decrease the allocation at a smooth rate.
158158
krl.AvailableTokens = 0
159159
}
160160

161-
return limitedTokens
161+
// Calculate a minimum trickle time to ensure the granted tokens' rate in client won't exceed the service limit.
162+
minTrickleTimeMs = int64(math.Round(limitedTokens * 1000.0 / krl.getServiceLimitLocked()))
163+
164+
return limitedTokens, minTrickleTimeMs
162165
}
163166

164167
// Clone returns a copy of the service limiter.

pkg/mcs/resourcemanager/server/service_limit_test.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -135,38 +135,47 @@ func TestApplyServiceLimit(t *testing.T) {
135135

136136
// Test with nil limiter
137137
var limiter *serviceLimiter
138-
tokens := limiter.applyServiceLimit(time.Now(), 50.0)
138+
now := time.Now()
139+
tokens, minTrickleTimeMs := limiter.applyServiceLimit(now, 50.0)
139140
re.Equal(50.0, tokens)
141+
re.Zero(minTrickleTimeMs)
140142

141143
// Test with zero service limit (no limit)
142144
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0, nil)
143-
now := time.Now()
144-
tokens = limiter.applyServiceLimit(now, 50.0)
145+
now = time.Now()
146+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 50.0)
145147
re.Equal(50.0, tokens)
148+
re.Zero(minTrickleTimeMs)
146149

147150
// Test request within available tokens (need to set available tokens first)
148151
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
152+
now = time.Now()
149153
limiter.AvailableTokens = 100.0 // Manually set available tokens
150154
limiter.LastUpdate = now
151-
tokens = limiter.applyServiceLimit(now, 50.0)
155+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 50.0)
152156
re.Equal(50.0, tokens)
153157
re.Equal(50.0, limiter.AvailableTokens) // 100 - 50 = 50
158+
re.Equal(int64(500), minTrickleTimeMs) // 50/100 * 1000 = 500ms
154159

155160
// Test request exactly equal to available tokens
156161
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
162+
now = time.Now()
157163
limiter.AvailableTokens = 100.0 // Manually set available tokens
158164
limiter.LastUpdate = now
159-
tokens = limiter.applyServiceLimit(now, 100.0)
165+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 100.0)
160166
re.Equal(100.0, tokens)
161167
re.Equal(0.0, limiter.AvailableTokens)
168+
re.Equal(int64(1000), minTrickleTimeMs) // 100/100 * 1000 = 1000ms
162169

163170
// Test request exceeding available tokens
164171
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
172+
now = time.Now()
165173
limiter.LastUpdate = now
166174
limiter.AvailableTokens = 30.0
167-
tokens = limiter.applyServiceLimit(now, 80.0)
175+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 80.0)
168176
re.Equal(30.0, tokens) // Only available tokens granted
169177
re.Equal(0.0, limiter.AvailableTokens)
178+
re.Equal(int64(300), minTrickleTimeMs) // 30/100 * 1000 = 300ms
170179
}
171180

172181
func TestApplyServiceLimitWithRefill(t *testing.T) {
@@ -180,21 +189,23 @@ func TestApplyServiceLimitWithRefill(t *testing.T) {
180189

181190
// Request after 1 second should trigger refill first
182191
futureTime := baseTime.Add(time.Second)
183-
tokens := limiter.applyServiceLimit(futureTime, 50.0)
192+
tokens, minTrickleTimeMs := limiter.applyServiceLimit(futureTime, 50.0)
184193
re.Equal(50.0, tokens)
185194
re.Equal(70.0, limiter.AvailableTokens) // 20 + 100 - 50 = 70
186195
re.Equal(futureTime, limiter.LastUpdate)
196+
re.Equal(int64(500), minTrickleTimeMs) // 50/100 * 1000 = 500ms
187197

188198
// Test partial refill scenario
189199
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
190200
limiter.LastUpdate = baseTime
191201
limiter.AvailableTokens = 10.0
192202

193203
halfSecondLater := baseTime.Add(500 * time.Millisecond)
194-
tokens = limiter.applyServiceLimit(halfSecondLater, 80.0)
204+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(halfSecondLater, 80.0)
195205
// After refill: 10 + 100*0.5 = 60 tokens available
196206
re.Equal(60.0, tokens)
197207
re.Equal(0.0, limiter.AvailableTokens)
208+
re.Equal(int64(600), minTrickleTimeMs) // 60/100 * 1000 = 600ms
198209
}
199210

200211
func TestServiceLimiterEdgeCases(t *testing.T) {
@@ -205,32 +216,37 @@ func TestServiceLimiterEdgeCases(t *testing.T) {
205216
limiter.AvailableTokens = 0.1 // Manually set available tokens
206217
limiter.LastUpdate = time.Now() // Set LastUpdate to current time to avoid refill
207218
now := time.Now()
208-
tokens := limiter.applyServiceLimit(now, 1.0)
209-
re.InDelta(0.1, tokens, 0.001) // Use InDelta to handle floating point precision
219+
tokens, minTrickleTimeMs := limiter.applyServiceLimit(now, 1.0)
220+
re.InDelta(0.1, tokens, 0.001) // Use InDelta to handle floating point precision
221+
re.Equal(int64(1000), minTrickleTimeMs) // 0.1/0.1 * 1000 = 1000ms
210222

211223
// Test with very large service limit
212224
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000000.0, nil)
213225
limiter.AvailableTokens = 1000000.0 // Manually set available tokens
214-
tokens = limiter.applyServiceLimit(now, 500000.0)
226+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 500000.0)
215227
re.Equal(500000.0, tokens)
228+
re.Equal(int64(500), minTrickleTimeMs) // 500000/1000000 * 1000 = 500ms
216229

217230
// Test with zero requested tokens
218231
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
219232
limiter.AvailableTokens = 100.0 // Manually set available tokens
220-
tokens = limiter.applyServiceLimit(now, 0.0)
233+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 0.0)
221234
re.Equal(0.0, tokens)
222235
re.Equal(100.0, limiter.AvailableTokens) // Should remain unchanged
236+
re.Zero(minTrickleTimeMs)
223237

224238
// Test with fractional tokens
225239
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.5, nil)
226240
limiter.LastUpdate = now
227241
limiter.AvailableTokens = 5.25
228-
tokens = limiter.applyServiceLimit(now, 7.75)
242+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 7.75)
229243
re.Equal(5.25, tokens)
244+
re.Equal(int64(500), minTrickleTimeMs) // 5.25/10.5 * 1000 = 500ms
230245
// Test apply with 0 available ru
231-
tokens = limiter.applyServiceLimit(now, 5)
246+
tokens, minTrickleTimeMs = limiter.applyServiceLimit(now, 5)
232247
re.Equal(0.0, tokens)
233248
re.Equal(0.0, limiter.AvailableTokens)
249+
re.Zero(minTrickleTimeMs)
234250
}
235251

236252
func TestSetServiceLimit(t *testing.T) {

pkg/mcs/resourcemanager/server/token_buckets.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ func (gtb *GroupTokenBucket) inspectAnomalies(
438438
) bool {
439439
var errMsg string
440440
// Verify whether the allocated token is invalid, such as negative values, math.Inf, or math.NaN.
441-
if tb.Tokens <= 0 || math.IsInf(tb.Tokens, 0) || math.IsNaN(tb.Tokens) {
441+
if tb.Tokens < 0 || math.IsInf(tb.Tokens, 0) || math.IsNaN(tb.Tokens) {
442442
errMsg = "assigned token is invalid"
443443
}
444444
// Verify whether the state of the slot is abnormal.

0 commit comments

Comments
 (0)