Skip to content

Commit c32e479

Browse files
authored
fix(ingest-limits): Use stripe locking for metadata (#17150)
1 parent 35e32f3 commit c32e479

File tree

8 files changed

+1558
-1185
lines changed

8 files changed

+1558
-1185
lines changed

pkg/limits/http.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ type httpTenantLimitsResponse struct {
1919
// ServeHTTP implements the http.Handler interface.
2020
// It returns the current stream counts and status per tenant as a JSON response.
2121
func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request) {
22-
// TODO(grobinson): Avoid acquiring the mutex for the entire duration
23-
// of the request.
24-
s.mtx.RLock()
25-
defer s.mtx.RUnlock()
26-
2722
tenant := mux.Vars(r)["tenant"]
2823
if tenant == "" {
2924
http.Error(w, "invalid tenant", http.StatusBadRequest)
@@ -43,20 +38,22 @@ func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request) {
4338
response httpTenantLimitsResponse
4439
)
4540

46-
for _, partitions := range s.metadata[tenant] {
47-
for _, stream := range partitions {
48-
if stream.lastSeenAt >= cutoff {
49-
activeStreams++
41+
s.metadata.All(func(tenantID string, _ int32, stream Stream) {
42+
if tenantID != tenant {
43+
return
44+
}
45+
46+
if stream.LastSeenAt >= cutoff {
47+
activeStreams++
5048

51-
// Calculate size only within the rate window
52-
for _, bucket := range stream.rateBuckets {
53-
if bucket.timestamp >= rateWindowCutoff {
54-
totalSize += bucket.size
55-
}
49+
// Calculate size only within the rate window
50+
for _, bucket := range stream.RateBuckets {
51+
if bucket.Timestamp >= rateWindowCutoff {
52+
totalSize += bucket.Size
5653
}
5754
}
5855
}
59-
}
56+
})
6057

6158
// Calculate rate using only data from within the rate window
6259
calculatedRate := float64(totalSize) / s.cfg.WindowSize.Seconds()

pkg/limits/http_test.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,30 @@ func TestIngestLimits_ServeHTTP(t *testing.T) {
2020
RateWindow: time.Minute,
2121
BucketDuration: 30 * time.Second,
2222
},
23-
metadata: map[string]map[int32][]streamMetadata{
24-
"tenant": {
25-
0: {{
26-
hash: 0x1,
27-
totalSize: 100,
28-
rateBuckets: []rateBucket{{
29-
timestamp: time.Now().UnixNano(),
30-
size: 1,
31-
}},
32-
lastSeenAt: time.Now().UnixNano(),
33-
}},
23+
metadata: &streamMetadata{
24+
stripes: []map[string]map[int32][]Stream{
25+
{
26+
"tenant": {
27+
0: {{
28+
Hash: 0x1,
29+
TotalSize: 100,
30+
RateBuckets: []RateBucket{{
31+
Timestamp: time.Now().UnixNano(),
32+
Size: 1,
33+
}},
34+
LastSeenAt: time.Now().UnixNano(),
35+
}},
36+
},
37+
},
3438
},
39+
locks: make([]stripeLock, 1),
3540
},
3641
logger: log.NewNopLogger(),
42+
partitionManager: &PartitionManager{
43+
partitions: map[int32]int64{
44+
0: time.Now().UnixNano(),
45+
},
46+
},
3747
}
3848

3949
// Set up a mux router for the test server otherwise mux.Vars() won't work.

0 commit comments

Comments
 (0)