Skip to content

Commit c55f038

Browse files
authored
feat(ingest-limits): Enforce synchronously max stream limit per partition (#17527)
1 parent 94b1d2d commit c55f038

File tree

18 files changed

+1107
-772
lines changed

18 files changed

+1107
-772
lines changed

pkg/distributor/distributor_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040

4141
"github.com/grafana/loki/v3/pkg/ingester"
4242
"github.com/grafana/loki/v3/pkg/ingester/client"
43+
"github.com/grafana/loki/v3/pkg/limits"
4344
limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
4445
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
4546
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
@@ -2461,7 +2462,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
24612462
Tenant: "test",
24622463
Results: []*logproto.ExceedsLimitsResult{{
24632464
StreamHash: 0x90eb45def17f924,
2464-
Reason: limits_frontend.ReasonExceedsMaxStreams,
2465+
Reason: uint32(limits.ReasonExceedsMaxStreams),
24652466
}},
24662467
},
24672468
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams exceeded",
@@ -2491,7 +2492,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
24912492
Tenant: "test",
24922493
Results: []*logproto.ExceedsLimitsResult{{
24932494
StreamHash: 0x90eb45def17f924,
2494-
Reason: limits_frontend.ReasonExceedsRateLimit,
2495+
Reason: uint32(limits.ReasonExceedsRateLimit),
24952496
}},
24962497
},
24972498
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: rate limit exceeded",
@@ -2531,7 +2532,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
25312532
Tenant: "test",
25322533
Results: []*logproto.ExceedsLimitsResult{{
25332534
StreamHash: 1,
2534-
Reason: limits_frontend.ReasonExceedsMaxStreams,
2535+
Reason: uint32(limits.ReasonExceedsMaxStreams),
25352536
}},
25362537
},
25372538
}, {
@@ -2561,7 +2562,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
25612562
Tenant: "test",
25622563
Results: []*logproto.ExceedsLimitsResult{{
25632564
StreamHash: 1,
2564-
Reason: limits_frontend.ReasonExceedsMaxStreams,
2565+
Reason: uint32(limits.ReasonExceedsMaxStreams),
25652566
}},
25662567
},
25672568
}, {

pkg/distributor/ingest_limits.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/prometheus/client_golang/prometheus"
1212
"github.com/prometheus/client_golang/prometheus/promauto"
1313

14-
limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
14+
"github.com/grafana/loki/v3/pkg/limits"
1515
limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
1616
"github.com/grafana/loki/v3/pkg/logproto"
1717
)
@@ -131,7 +131,8 @@ func (l *ingestLimits) exceedsLimits(ctx context.Context, tenant string, streams
131131
reasonsForHashes := make(map[uint64][]string)
132132
for _, result := range resp.Results {
133133
reasons := reasonsForHashes[result.StreamHash]
134-
reasons = append(reasons, result.Reason)
134+
humanized := limits.Reason(result.Reason).String()
135+
reasons = append(reasons, humanized)
135136
reasonsForHashes[result.StreamHash] = reasons
136137
}
137138
return true, reasonsForHashes, nil
@@ -159,20 +160,7 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*logproto.Ex
159160

160161
func firstReasonForHashes(reasonsForHashes map[uint64][]string) string {
161162
for _, reasons := range reasonsForHashes {
162-
return humanizeReasonForHash(reasons[0])
163+
return reasons[0]
163164
}
164165
return "unknown reason"
165166
}
166-
167-
// TODO(grobinson): Move this to the same place where the consts
168-
// are defined.
169-
func humanizeReasonForHash(s string) string {
170-
switch s {
171-
case limits_frontend.ReasonExceedsMaxStreams:
172-
return "max streams exceeded"
173-
case limits_frontend.ReasonExceedsRateLimit:
174-
return "rate limit exceeded"
175-
default:
176-
return s
177-
}
178-
}

pkg/distributor/ingest_limits_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/require"
1212
"go.uber.org/atomic"
1313

14+
"github.com/grafana/loki/v3/pkg/limits"
1415
"github.com/grafana/loki/v3/pkg/logproto"
1516
)
1617

@@ -113,11 +114,11 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
113114
Tenant: "test",
114115
Results: []*logproto.ExceedsLimitsResult{{
115116
StreamHash: 1,
116-
Reason: "test",
117+
Reason: uint32(limits.ReasonExceedsRateLimit),
117118
}},
118119
},
119120
expectedStreams: []KeyedStream{},
120-
expectedReasons: map[uint64][]string{1: {"test"}},
121+
expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}},
121122
}, {
122123
name: "one of two streams exceeds limits",
123124
tenant: "test",
@@ -140,14 +141,14 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
140141
Tenant: "test",
141142
Results: []*logproto.ExceedsLimitsResult{{
142143
StreamHash: 1,
143-
Reason: "test",
144+
Reason: uint32(limits.ReasonExceedsRateLimit),
144145
}},
145146
},
146147
expectedStreams: []KeyedStream{{
147148
HashKey: 2000, // Should not be used.
148149
HashKeyNoShard: 2,
149150
}},
150-
expectedReasons: map[uint64][]string{1: {"test"}},
151+
expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}},
151152
}, {
152153
name: "does not exceed limits",
153154
tenant: "test",
@@ -250,11 +251,11 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
250251
Tenant: "test",
251252
Results: []*logproto.ExceedsLimitsResult{{
252253
StreamHash: 1,
253-
Reason: "test",
254+
Reason: uint32(limits.ReasonExceedsRateLimit),
254255
}},
255256
},
256257
expectedExceedsLimits: true,
257-
expectedReasons: map[uint64][]string{1: {"test"}},
258+
expectedReasons: map[uint64][]string{1: {"rate limit exceeded"}},
258259
}, {
259260
name: "does not exceed limits",
260261
tenant: "test",

pkg/limits/frontend/frontend_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/stretchr/testify/require"
99

10+
"github.com/grafana/loki/v3/pkg/limits"
1011
"github.com/grafana/loki/v3/pkg/logproto"
1112
)
1213

@@ -40,14 +41,14 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
4041
Tenant: "test",
4142
Results: []*logproto.ExceedsLimitsResult{{
4243
StreamHash: 0x1,
43-
Reason: ReasonExceedsMaxStreams,
44+
Reason: uint32(limits.ReasonExceedsMaxStreams),
4445
}},
4546
}},
4647
expected: &logproto.ExceedsLimitsResponse{
4748
Tenant: "test",
4849
Results: []*logproto.ExceedsLimitsResult{{
4950
StreamHash: 0x1,
50-
Reason: ReasonExceedsMaxStreams,
51+
Reason: uint32(limits.ReasonExceedsMaxStreams),
5152
}},
5253
},
5354
}, {
@@ -86,20 +87,20 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
8687
Tenant: "test",
8788
Results: []*logproto.ExceedsLimitsResult{{
8889
StreamHash: 0x1,
89-
Reason: ReasonExceedsMaxStreams,
90+
Reason: uint32(limits.ReasonExceedsMaxStreams),
9091
}, {
9192
StreamHash: 0x4,
92-
Reason: ReasonExceedsRateLimit,
93+
Reason: uint32(limits.ReasonExceedsRateLimit),
9394
}},
9495
}},
9596
expected: &logproto.ExceedsLimitsResponse{
9697
Tenant: "test",
9798
Results: []*logproto.ExceedsLimitsResult{{
9899
StreamHash: 0x1,
99-
Reason: ReasonExceedsMaxStreams,
100+
Reason: uint32(limits.ReasonExceedsMaxStreams),
100101
}, {
101102
StreamHash: 0x4,
102-
Reason: ReasonExceedsRateLimit,
103+
Reason: uint32(limits.ReasonExceedsRateLimit),
103104
}},
104105
},
105106
}, {
@@ -120,23 +121,23 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
120121
Tenant: "test",
121122
Results: []*logproto.ExceedsLimitsResult{{
122123
StreamHash: 0x1,
123-
Reason: ReasonExceedsMaxStreams,
124+
Reason: uint32(limits.ReasonExceedsMaxStreams),
124125
}},
125126
}, {
126127
Tenant: "test",
127128
Results: []*logproto.ExceedsLimitsResult{{
128129
StreamHash: 0x4,
129-
Reason: ReasonExceedsRateLimit,
130+
Reason: uint32(limits.ReasonExceedsRateLimit),
130131
}},
131132
}},
132133
expected: &logproto.ExceedsLimitsResponse{
133134
Tenant: "test",
134135
Results: []*logproto.ExceedsLimitsResult{{
135136
StreamHash: 0x1,
136-
Reason: ReasonExceedsMaxStreams,
137+
Reason: uint32(limits.ReasonExceedsMaxStreams),
137138
}, {
138139
StreamHash: 0x4,
139-
Reason: ReasonExceedsRateLimit,
140+
Reason: uint32(limits.ReasonExceedsRateLimit),
140141
}},
141142
},
142143
}}

pkg/limits/frontend/http_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/stretchr/testify/require"
1212

13+
"github.com/grafana/loki/v3/pkg/limits"
1314
"github.com/grafana/loki/v3/pkg/logproto"
1415
)
1516

@@ -53,7 +54,7 @@ func TestFrontend_ServeHTTP(t *testing.T) {
5354
exceedsLimitsResponses: []*logproto.ExceedsLimitsResponse{{
5455
Results: []*logproto.ExceedsLimitsResult{{
5556
StreamHash: 0x1,
56-
Reason: "exceeds_rate_limit",
57+
Reason: uint32(limits.ReasonExceedsRateLimit),
5758
}},
5859
}},
5960
request: httpExceedsLimitsRequest{
@@ -67,7 +68,7 @@ func TestFrontend_ServeHTTP(t *testing.T) {
6768
expected: httpExceedsLimitsResponse{
6869
Results: []*logproto.ExceedsLimitsResult{{
6970
StreamHash: 0x1,
70-
Reason: "exceeds_rate_limit",
71+
Reason: uint32(limits.ReasonExceedsRateLimit),
7172
}},
7273
},
7374
}}

pkg/limits/frontend/limits.go

Lines changed: 0 additions & 28 deletions
This file was deleted.

pkg/limits/frontend/mock_test.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -121,23 +121,6 @@ func (m *mockReadRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, err
121121
return m.rs, nil
122122
}
123123

124-
type mockLimits struct {
125-
maxGlobalStreams int
126-
ingestionRate float64
127-
}
128-
129-
func (m *mockLimits) MaxGlobalStreamsPerUser(_ string) int {
130-
return m.maxGlobalStreams
131-
}
132-
133-
func (m *mockLimits) IngestionRateBytes(_ string) float64 {
134-
return m.ingestionRate
135-
}
136-
137-
func (m *mockLimits) IngestionBurstSizeBytes(_ string) int {
138-
return 1000
139-
}
140-
141124
func newMockRingWithClientPool(_ *testing.T, name string, clients []*mockIngestLimitsClient, instances []ring.InstanceDesc) (ring.ReadRing, *ring_client.Pool) {
142125
// Set up the mock ring.
143126
ring := &mockReadRing{

pkg/limits/http_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,20 @@ func TestIngestLimits_ServeHTTP(t *testing.T) {
2121
BucketDuration: 30 * time.Second,
2222
},
2323
metadata: &streamMetadata{
24-
stripes: []map[string]map[int32][]Stream{
24+
stripes: []map[string]map[int32]map[uint64]Stream{
2525
{
2626
"tenant": {
27-
0: {{
28-
Hash: 0x1,
29-
TotalSize: 100,
30-
RateBuckets: []RateBucket{{
31-
Timestamp: time.Now().UnixNano(),
32-
Size: 1,
33-
}},
34-
LastSeenAt: time.Now().UnixNano(),
35-
}},
27+
0: {
28+
0x1: {
29+
Hash: 0x1,
30+
TotalSize: 100,
31+
RateBuckets: []RateBucket{{
32+
Timestamp: time.Now().UnixNano(),
33+
Size: 1,
34+
}},
35+
LastSeenAt: time.Now().UnixNano(),
36+
},
37+
},
3638
},
3739
},
3840
},

0 commit comments

Comments
 (0)