Skip to content

Commit 9c11c0b

Browse files
feat: check for failed reason in distributors
1 parent c42ccc3 commit 9c11c0b

File tree

4 files changed

+70
-87
lines changed

4 files changed

+70
-87
lines changed

pkg/distributor/distributor.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -719,21 +719,13 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
719719
}
720720

721721
if d.cfg.IngestLimitsEnabled {
722-
streamsAfterLimits, reasonsForHashes, err := d.ingestLimits.enforceLimits(ctx, tenantID, streams)
723-
if err != nil {
724-
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
725-
} else if len(streamsAfterLimits) == 0 {
726-
// All streams have been dropped.
727-
level.Debug(d.logger).Log("msg", "request exceeded limits, all streams will be dropped", "tenant", tenantID)
728-
if !d.cfg.IngestLimitsDryRunEnabled {
729-
return nil, httpgrpc.Error(http.StatusTooManyRequests, "request exceeded limits: "+firstReasonForHashes(reasonsForHashes))
730-
}
731-
} else if len(streamsAfterLimits) < len(streams) {
732-
// Some streams have been dropped.
733-
level.Debug(d.logger).Log("msg", "request exceeded limits, some streams will be dropped", "tenant", tenantID)
734-
if !d.cfg.IngestLimitsDryRunEnabled {
735-
streams = streamsAfterLimits
722+
accepted, err := d.ingestLimits.EnforceLimits(ctx, tenantID, streams)
723+
if err == nil && !d.cfg.IngestLimitsDryRunEnabled {
724+
if len(accepted) == 0 {
725+
// All streams were rejected, the request should be failed.
726+
return nil, httpgrpc.Error(http.StatusTooManyRequests, "request exceeded limits")
736727
}
728+
streams = accepted
737729
}
738730
}
739731

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2462,7 +2462,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
24622462
Reason: uint32(limits.ReasonMaxStreams),
24632463
}},
24642464
},
2465-
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams",
2465+
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits",
24662466
}, {
24672467
name: "one of two streams exceed max stream limit, request is accepted",
24682468
ingestLimitsEnabled: true,

pkg/distributor/ingest_limits.go

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
// ingestLimitsFrontendClient is used for tests.
2020
type ingestLimitsFrontendClient interface {
21-
exceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
21+
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
2222
}
2323

2424
// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends.
@@ -35,7 +35,7 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
3535
}
3636

3737
// Implements the ingestLimitsFrontendClient interface.
38-
func (c *ingestLimitsFrontendRingClient) exceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
38+
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
3939
// We use an FNV-1 of all stream hashes in the request to load balance requests
4040
// to limits-frontends instances.
4141
h := fnv.New32()
@@ -85,57 +85,65 @@ func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer)
8585
return &ingestLimits{
8686
client: client,
8787
limitsFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
88-
Name: "loki_distributor_ingest_limits_failures_total",
89-
Help: "The total number of failures checking ingest limits.",
88+
Name: "loki_distributor_ingest_limits_requests_failed_total",
89+
Help: "The total number of requests that failed.",
9090
}),
9191
}
9292
}
9393

94-
// enforceLimits returns a slice of streams that are within the per-tenant
95-
// limits, and in the case where one or more streams exceed per-tenant
96-
// limits, the reasons those streams were not included in the result.
97-
// An error is returned if per-tenant limits could not be enforced.
98-
func (l *ingestLimits) enforceLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]KeyedStream, map[uint64][]string, error) {
99-
exceedsLimits, reasons, err := l.exceedsLimits(ctx, tenant, streams)
100-
if !exceedsLimits || err != nil {
101-
return streams, nil, err
94+
// EnforceLimits checks all streams against the per-tenant limits and returns
95+
// a slice containing the streams that are accepted (within the per-tenant
96+
// limits). Any streams that could not have their limits checked are also
97+
// accepted.
98+
func (l *ingestLimits) EnforceLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]KeyedStream, error) {
99+
results, err := l.ExceedsLimits(ctx, tenant, streams)
100+
if err != nil {
101+
return streams, err
102102
}
103103
// We can do this without allocation if needed, but doing so will modify
104104
// the original backing array. See "Filtering without allocation" from
105105
// https://go.dev/wiki/SliceTricks.
106-
withinLimits := make([]KeyedStream, 0, len(streams))
106+
accepted := make([]KeyedStream, 0, len(streams))
107107
for _, s := range streams {
108-
if _, ok := reasons[s.HashKeyNoShard]; !ok {
109-
withinLimits = append(withinLimits, s)
108+
// Check each stream to see if it failed.
109+
// TODO(grobinson): We have an O(N*M) loop here. Need to benchmark if
110+
// its faster to do this or if we should create a map instead.
111+
var (
112+
found bool
113+
reason uint32
114+
)
115+
for _, res := range results {
116+
if res.StreamHash == s.HashKeyNoShard {
117+
found = true
118+
reason = res.Reason
119+
break
120+
}
121+
}
122+
if !found || reason == uint32(limits.ReasonFailed) {
123+
accepted = append(accepted, s)
110124
}
111125
}
112-
return withinLimits, reasons, nil
126+
return accepted, nil
113127
}
114128

115-
// ExceedsLimits returns true if one or more streams exceeds per-tenant limits,
116-
// and false if no streams exceed per-tenant limits. In the case where one or
117-
// more streams exceeds per-tenant limits, it returns the reasons for each stream.
118-
// An error is returned if per-tenant limits could not be checked.
119-
func (l *ingestLimits) exceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) (bool, map[uint64][]string, error) {
129+
// ExceedsLimits checks all streams against the per-tenant limits. It returns
130+
// an error if the client failed to send the request or receive a response
131+
// from the server. Any streams that could not have their limits checked
132+
// and returned in the results with the reason "ReasonFailed".
133+
func (l *ingestLimits) ExceedsLimits(
134+
ctx context.Context,
135+
tenant string,
136+
streams []KeyedStream,
137+
) ([]*proto.ExceedsLimitsResult, error) {
120138
req, err := newExceedsLimitsRequest(tenant, streams)
121139
if err != nil {
122-
return false, nil, err
140+
return nil, err
123141
}
124-
resp, err := l.client.exceedsLimits(ctx, req)
142+
resp, err := l.client.ExceedsLimits(ctx, req)
125143
if err != nil {
126-
return false, nil, err
127-
}
128-
if len(resp.Results) == 0 {
129-
return false, nil, nil
130-
}
131-
reasonsForHashes := make(map[uint64][]string)
132-
for _, result := range resp.Results {
133-
reasons := reasonsForHashes[result.StreamHash]
134-
humanized := limits.Reason(result.Reason).String()
135-
reasons = append(reasons, humanized)
136-
reasonsForHashes[result.StreamHash] = reasons
144+
return nil, err
137145
}
138-
return true, reasonsForHashes, nil
146+
return resp.Results, nil
139147
}
140148

141149
func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.ExceedsLimitsRequest, error) {
@@ -156,10 +164,3 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
156164
Streams: streamMetadata,
157165
}, nil
158166
}
159-
160-
func firstReasonForHashes(reasonsForHashes map[uint64][]string) string {
161-
for _, reasons := range reasonsForHashes {
162-
return reasons[0]
163-
}
164-
return "unknown reason"
165-
}

pkg/distributor/ingest_limits_test.go

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type mockIngestLimitsFrontendClient struct {
2626
}
2727

2828
// Implements the ingestLimitsFrontendClient interface.
29-
func (c *mockIngestLimitsFrontendClient) exceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
29+
func (c *mockIngestLimitsFrontendClient) ExceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
3030
c.calls.Add(1)
3131
if c.expectedRequest != nil {
3232
require.Equal(c.t, c.expectedRequest, r)
@@ -49,7 +49,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
4949
response *proto.ExceedsLimitsResponse
5050
responseErr error
5151
expectedStreams []KeyedStream
52-
expectedReasons map[uint64][]string
5352
expectedErr string
5453
}{{
5554
// This test also asserts that streams are returned unmodified.
@@ -116,7 +115,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
116115
}},
117116
},
118117
expectedStreams: []KeyedStream{},
119-
expectedReasons: map[uint64][]string{1: {"max streams"}},
120118
}, {
121119
name: "one of two streams exceeds limits",
122120
tenant: "test",
@@ -145,7 +143,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
145143
HashKey: 2000, // Should not be used.
146144
HashKeyNoShard: 2,
147145
}},
148-
expectedReasons: map[uint64][]string{1: {"max streams"}},
149146
}, {
150147
name: "does not exceed limits",
151148
tenant: "test",
@@ -174,7 +171,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
174171
HashKey: 2000, // Should not be used.
175172
HashKeyNoShard: 2,
176173
}},
177-
expectedReasons: nil,
178174
}}
179175

180176
for _, test := range tests {
@@ -188,35 +184,29 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
188184
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
189185
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
190186
defer cancel()
191-
streams, reasons, err := l.enforceLimits(ctx, test.tenant, test.streams)
187+
accepted, err := l.EnforceLimits(ctx, test.tenant, test.streams)
192188
if test.expectedErr != "" {
193189
require.EqualError(t, err, test.expectedErr)
194190
// The streams should be returned unmodified.
195-
require.Equal(t, test.streams, streams)
196-
require.Nil(t, reasons)
191+
require.Equal(t, test.streams, accepted)
197192
} else {
198193
require.Nil(t, err)
199-
require.Equal(t, test.expectedStreams, streams)
200-
require.Equal(t, test.expectedReasons, reasons)
194+
require.Equal(t, test.expectedStreams, accepted)
201195
}
202196
})
203197
}
204198
}
205199

206-
// This test asserts that when checking ingest limits the expected proto
207-
// message is sent, and that for a given response, the result contains the
208-
// expected streams each with their expected reasons.
209200
func TestIngestLimits_ExceedsLimits(t *testing.T) {
210201
tests := []struct {
211-
name string
212-
tenant string
213-
streams []KeyedStream
214-
expectedRequest *proto.ExceedsLimitsRequest
215-
response *proto.ExceedsLimitsResponse
216-
responseErr error
217-
expectedExceedsLimits bool
218-
expectedReasons map[uint64][]string
219-
expectedErr string
202+
name string
203+
tenant string
204+
streams []KeyedStream
205+
expectedRequest *proto.ExceedsLimitsRequest
206+
response *proto.ExceedsLimitsResponse
207+
responseErr error
208+
expectedResult []*proto.ExceedsLimitsResult
209+
expectedErr string
220210
}{{
221211
name: "error should be returned if limits cannot be checked",
222212
tenant: "test",
@@ -249,8 +239,10 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
249239
Reason: uint32(limits.ReasonMaxStreams),
250240
}},
251241
},
252-
expectedExceedsLimits: true,
253-
expectedReasons: map[uint64][]string{1: {"max streams"}},
242+
expectedResult: []*proto.ExceedsLimitsResult{{
243+
StreamHash: 1,
244+
Reason: uint32(limits.ReasonMaxStreams),
245+
}},
254246
}, {
255247
name: "does not exceed limits",
256248
tenant: "test",
@@ -266,7 +258,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
266258
response: &proto.ExceedsLimitsResponse{
267259
Results: []*proto.ExceedsLimitsResult{},
268260
},
269-
expectedReasons: nil,
261+
expectedResult: []*proto.ExceedsLimitsResult{},
270262
}}
271263

272264
for _, test := range tests {
@@ -280,15 +272,13 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
280272
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
281273
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
282274
defer cancel()
283-
exceedsLimits, reasons, err := l.exceedsLimits(ctx, test.tenant, test.streams)
275+
res, err := l.ExceedsLimits(ctx, test.tenant, test.streams)
284276
if test.expectedErr != "" {
285277
require.EqualError(t, err, test.expectedErr)
286-
require.False(t, exceedsLimits)
287-
require.Nil(t, reasons)
278+
require.Nil(t, res)
288279
} else {
289280
require.Nil(t, err)
290-
require.Equal(t, test.expectedExceedsLimits, exceedsLimits)
291-
require.Equal(t, test.expectedReasons, reasons)
281+
require.Equal(t, test.expectedResult, res)
292282
}
293283
})
294284
}

0 commit comments

Comments
 (0)