Skip to content

Commit d197cda

Browse files
feat: add enforceLimits to ingest_limits.go (#17117)
1 parent b0931b1 commit d197cda

File tree

2 files changed

+193
-0
lines changed

2 files changed

+193
-0
lines changed

pkg/distributor/ingest_limits.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,27 @@ func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer)
9090
}
9191
}
9292

93+
// enforceLimits returns a slice of streams that are within the per-tenant
94+
// limits, and in the case where one or more streams exceed per-tenant
95+
// limits, the reasons those streams were not included in the result.
96+
// An error is returned if per-tenant limits could not be enforced.
97+
func (l *ingestLimits) enforceLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]KeyedStream, map[uint64][]string, error) {
98+
exceedsLimits, reasons, err := l.exceedsLimits(ctx, tenant, streams)
99+
if !exceedsLimits || err != nil {
100+
return streams, nil, err
101+
}
102+
// We can do this without allocation if needed, but doing so will modify
103+
// the original backing array. See "Filtering without allocation" from
104+
// https://go.dev/wiki/SliceTricks.
105+
withinLimits := make([]KeyedStream, 0, len(streams))
106+
for _, s := range streams {
107+
if _, ok := reasons[s.HashKeyNoShard]; !ok {
108+
withinLimits = append(withinLimits, s)
109+
}
110+
}
111+
return withinLimits, reasons, nil
112+
}
113+
93114
// ExceedsLimits returns true if one or more streams exceeds per-tenant limits,
94115
// and false if no streams exceed per-tenant limits. In the case where one or
95116
// more streams exceeds per-tenant limits, it returns the reasons for each stream.

pkg/distributor/ingest_limits_test.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/coder/quartz"
910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/stretchr/testify/require"
1112

@@ -29,6 +30,177 @@ func (c *mockIngestLimitsFrontendClient) exceedsLimits(_ context.Context, r *log
2930
return c.response, nil
3031
}
3132

33+
func TestIngestLimits_EnforceLimits(t *testing.T) {
34+
clock := quartz.NewMock(t)
35+
clock.Set(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC))
36+
37+
tests := []struct {
38+
name string
39+
tenant string
40+
streams []KeyedStream
41+
expectedRequest *logproto.ExceedsLimitsRequest
42+
response *logproto.ExceedsLimitsResponse
43+
responseErr error
44+
expectedStreams []KeyedStream
45+
expectedReasons map[uint64][]string
46+
expectedErr string
47+
}{{
48+
// This test also asserts that streams are returned unmodified.
49+
name: "error should be returned if limits cannot be checked",
50+
tenant: "test",
51+
streams: []KeyedStream{{
52+
HashKey: 1000, // Should not be used.
53+
HashKeyNoShard: 1,
54+
Stream: logproto.Stream{
55+
Labels: "foo",
56+
Entries: []logproto.Entry{{
57+
Timestamp: clock.Now(),
58+
Line: "bar",
59+
StructuredMetadata: []logproto.LabelAdapter{{
60+
Name: "baz",
61+
Value: "qux",
62+
}},
63+
}},
64+
},
65+
}, {
66+
HashKey: 2000, // Should not be used.
67+
HashKeyNoShard: 2,
68+
Stream: logproto.Stream{
69+
Labels: "bar",
70+
Entries: []logproto.Entry{{
71+
Timestamp: clock.Now(),
72+
Line: "baz",
73+
StructuredMetadata: []logproto.LabelAdapter{{
74+
Name: "qux",
75+
Value: "corge",
76+
}},
77+
}},
78+
},
79+
}},
80+
expectedRequest: &logproto.ExceedsLimitsRequest{
81+
Tenant: "test",
82+
Streams: []*logproto.StreamMetadata{{
83+
StreamHash: 1,
84+
EntriesSize: 0x3,
85+
StructuredMetadataSize: 0x6,
86+
}, {
87+
StreamHash: 2,
88+
EntriesSize: 0x3,
89+
StructuredMetadataSize: 0x8,
90+
}},
91+
},
92+
responseErr: errors.New("failed to check limits"),
93+
expectedErr: "failed to check limits",
94+
}, {
95+
name: "exceeds limits",
96+
tenant: "test",
97+
streams: []KeyedStream{{
98+
HashKey: 1000, // Should not be used.
99+
HashKeyNoShard: 1,
100+
}},
101+
expectedRequest: &logproto.ExceedsLimitsRequest{
102+
Tenant: "test",
103+
Streams: []*logproto.StreamMetadata{{
104+
StreamHash: 1,
105+
}},
106+
},
107+
response: &logproto.ExceedsLimitsResponse{
108+
Tenant: "test",
109+
Results: []*logproto.ExceedsLimitsResult{{
110+
StreamHash: 1,
111+
Reason: "test",
112+
}},
113+
},
114+
expectedStreams: []KeyedStream{},
115+
expectedReasons: map[uint64][]string{1: {"test"}},
116+
}, {
117+
name: "one of two streams exceeds limits",
118+
tenant: "test",
119+
streams: []KeyedStream{{
120+
HashKey: 1000, // Should not be used.
121+
HashKeyNoShard: 1,
122+
}, {
123+
HashKey: 2000, // Should not be used.
124+
HashKeyNoShard: 2,
125+
}},
126+
expectedRequest: &logproto.ExceedsLimitsRequest{
127+
Tenant: "test",
128+
Streams: []*logproto.StreamMetadata{{
129+
StreamHash: 1,
130+
}, {
131+
StreamHash: 2,
132+
}},
133+
},
134+
response: &logproto.ExceedsLimitsResponse{
135+
Tenant: "test",
136+
Results: []*logproto.ExceedsLimitsResult{{
137+
StreamHash: 1,
138+
Reason: "test",
139+
}},
140+
},
141+
expectedStreams: []KeyedStream{{
142+
HashKey: 2000, // Should not be used.
143+
HashKeyNoShard: 2,
144+
}},
145+
expectedReasons: map[uint64][]string{1: {"test"}},
146+
}, {
147+
name: "does not exceed limits",
148+
tenant: "test",
149+
streams: []KeyedStream{{
150+
HashKey: 1000, // Should not be used.
151+
HashKeyNoShard: 1,
152+
}, {
153+
HashKey: 2000, // Should not be used.
154+
HashKeyNoShard: 2,
155+
}},
156+
expectedRequest: &logproto.ExceedsLimitsRequest{
157+
Tenant: "test",
158+
Streams: []*logproto.StreamMetadata{{
159+
StreamHash: 1,
160+
}, {
161+
StreamHash: 2,
162+
}},
163+
},
164+
response: &logproto.ExceedsLimitsResponse{
165+
Tenant: "test",
166+
Results: []*logproto.ExceedsLimitsResult{},
167+
},
168+
expectedStreams: []KeyedStream{{
169+
HashKey: 1000, // Should not be used.
170+
HashKeyNoShard: 1,
171+
}, {
172+
HashKey: 2000, // Should not be used.
173+
HashKeyNoShard: 2,
174+
}},
175+
expectedReasons: nil,
176+
}}
177+
178+
for _, test := range tests {
179+
t.Run(test.name, func(t *testing.T) {
180+
mockClient := mockIngestLimitsFrontendClient{
181+
t: t,
182+
expectedRequest: test.expectedRequest,
183+
response: test.response,
184+
responseErr: test.responseErr,
185+
}
186+
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
187+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
188+
defer cancel()
189+
streams, reasons, err := l.enforceLimits(ctx, test.tenant, test.streams)
190+
if test.expectedErr != "" {
191+
require.EqualError(t, err, test.expectedErr)
192+
// The streams should be returned unmodified.
193+
require.Equal(t, test.streams, streams)
194+
require.Nil(t, reasons)
195+
} else {
196+
require.Nil(t, err)
197+
require.Equal(t, test.expectedStreams, streams)
198+
require.Equal(t, test.expectedReasons, reasons)
199+
}
200+
})
201+
}
202+
}
203+
32204
// This test asserts that when checking ingest limits the expected proto
33205
// message is sent, and that for a given response, the result contains the
34206
// expected streams each with their expected reasons.

0 commit comments

Comments
 (0)