Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,25 +718,6 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
return &logproto.PushResponse{}, validationErr
}

if d.cfg.IngestLimitsEnabled {
streamsAfterLimits, reasonsForHashes, err := d.ingestLimits.enforceLimits(ctx, tenantID, streams)
if err != nil {
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
} else if len(streamsAfterLimits) == 0 {
// All streams have been dropped.
level.Debug(d.logger).Log("msg", "request exceeded limits, all streams will be dropped", "tenant", tenantID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of these log lines, will be far too much volume.

if !d.cfg.IngestLimitsDryRunEnabled {
return nil, httpgrpc.Error(http.StatusTooManyRequests, "request exceeded limits: "+firstReasonForHashes(reasonsForHashes))
}
} else if len(streamsAfterLimits) < len(streams) {
// Some streams have been dropped.
level.Debug(d.logger).Log("msg", "request exceeded limits, some streams will be dropped", "tenant", tenantID)
if !d.cfg.IngestLimitsDryRunEnabled {
streams = streamsAfterLimits
}
}
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited, streamResolver)

Expand All @@ -746,6 +727,19 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
}

// These limits are checked after the ingestion rate limit as this
// is how it works in ingesters.
if d.cfg.IngestLimitsEnabled {
accepted, err := d.ingestLimits.EnforceLimits(ctx, tenantID, streams)
if err == nil && !d.cfg.IngestLimitsDryRunEnabled {
if len(accepted) == 0 {
// All streams were rejected, the request should be failed.
return nil, httpgrpc.Error(http.StatusTooManyRequests, "request exceeded limits")
}
streams = accepted
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we "shink" the streams slice to the accepted ones, I believe the next check of the ingestionRateLimiter is operating on the wrong calculated value validationContext.validationMetrics.aggregatedPushStats.lineSize and we need to redo it here, right?

Imagine accepting a subset of the incoming streams but rate limiting on the total incoming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will think about how to solve this 😢 We had the same behavior before this PR where we are rate limiting on discarded streams over the stream limit, so I think let's keep this PR scoped to the feature and then I will open a second PR to address this problem.

}
}

// Nil check for performance reasons, to avoid dynamic lookup and/or no-op
// function calls that cannot be inlined.
if d.tee != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2462,7 +2462,7 @@ func TestDistributor_PushIngestLimits(t *testing.T) {
Reason: uint32(limits.ReasonMaxStreams),
}},
},
expectedErr: "rpc error: code = Code(429) desc = request exceeded limits: max streams",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got rid of the reason for now, I'm not sure if its useful as it doesn't mention which streams. I want to think about how to better communicate this data back to the user given that some requests can be really large.

expectedErr: "rpc error: code = Code(429) desc = request exceeded limits",
}, {
name: "one of two streams exceed max stream limit, request is accepted",
ingestLimitsEnabled: true,
Expand Down
102 changes: 58 additions & 44 deletions pkg/distributor/ingest_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// ingestLimitsFrontendClient is used for tests.
type ingestLimitsFrontendClient interface {
exceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
}

// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends.
Expand All @@ -35,7 +35,7 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
}

// Implements the ingestLimitsFrontendClient interface.
func (c *ingestLimitsFrontendRingClient) exceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
// We use an FNV-1 of all stream hashes in the request to load balance requests
// to limits-frontends instances.
h := fnv.New32()
Expand Down Expand Up @@ -78,64 +78,85 @@ func (c *ingestLimitsFrontendRingClient) exceedsLimits(ctx context.Context, req

type ingestLimits struct {
client ingestLimitsFrontendClient
limitsFailures prometheus.Counter
requests prometheus.Counter
requestsFailed prometheus.Counter
}

func newIngestLimits(client ingestLimitsFrontendClient, r prometheus.Registerer) *ingestLimits {
return &ingestLimits{
client: client,
limitsFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_ingest_limits_failures_total",
Help: "The total number of failures checking ingest limits.",
requests: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_ingest_limits_requests_total",
Help: "The total number of requests.",
}),
requestsFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_distributor_ingest_limits_requests_failed_total",
Help: "The total number of requests that failed.",
}),
}
}

// enforceLimits returns a slice of streams that are within the per-tenant
// limits, and in the case where one or more streams exceed per-tenant
// limits, the reasons those streams were not included in the result.
// An error is returned if per-tenant limits could not be enforced.
func (l *ingestLimits) enforceLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]KeyedStream, map[uint64][]string, error) {
exceedsLimits, reasons, err := l.exceedsLimits(ctx, tenant, streams)
if !exceedsLimits || err != nil {
return streams, nil, err
// EnforceLimits checks all streams against the per-tenant limits and returns
// a slice containing the streams that are accepted (within the per-tenant
// limits). Any streams that could not have their limits checked are also
// accepted.
func (l *ingestLimits) EnforceLimits(ctx context.Context, tenant string, streams []KeyedStream) ([]KeyedStream, error) {
results, err := l.ExceedsLimits(ctx, tenant, streams)
if err != nil {
return streams, err
}
// Fast path. No results means all streams were accepted and there were
// no failures, so we can return the input streams.
if len(results) == 0 {
return streams, nil
}
// We can do this without allocation if needed, but doing so will modify
// the original backing array. See "Filtering without allocation" from
// https://go.dev/wiki/SliceTricks.
withinLimits := make([]KeyedStream, 0, len(streams))
accepted := make([]KeyedStream, 0, len(streams))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to consider doing the "filtering without allocation" trick here if other validation stats are in following updated by this. OTH the backing array includes shards and not streams right, so the trick might not be possible at all. WDYT?

for _, s := range streams {
if _, ok := reasons[s.HashKeyNoShard]; !ok {
withinLimits = append(withinLimits, s)
// Check each stream to see if it failed.
// TODO(grobinson): We have an O(N*M) loop here. Need to benchmark if
// its faster to do this or if we should create a map instead.
var (
found bool
reason uint32
)
for _, res := range results {
if res.StreamHash == s.HashKeyNoShard {
found = true
reason = res.Reason
break
}
}
Comment on lines +119 to +131
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should measure this adds any latency in case of ingest-limits degradation.

if !found || reason == uint32(limits.ReasonFailed) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we stopped using iota, no valid reason can ever be 0, meaning we can do a check against the default value of uint32.

accepted = append(accepted, s)
}
}
return withinLimits, reasons, nil
return accepted, nil
}

// ExceedsLimits returns true if one or more streams exceeds per-tenant limits,
// and false if no streams exceed per-tenant limits. In the case where one or
// more streams exceeds per-tenant limits, it returns the reasons for each stream.
// An error is returned if per-tenant limits could not be checked.
func (l *ingestLimits) exceedsLimits(ctx context.Context, tenant string, streams []KeyedStream) (bool, map[uint64][]string, error) {
// ExceedsLimits checks all streams against the per-tenant limits. It returns
// an error if the client failed to send the request or receive a response
// from the server. Any streams that could not have their limits checked
// and returned in the results with the reason "ReasonFailed".
func (l *ingestLimits) ExceedsLimits(
ctx context.Context,
tenant string,
streams []KeyedStream,
) ([]*proto.ExceedsLimitsResult, error) {
l.requests.Inc()
req, err := newExceedsLimitsRequest(tenant, streams)
if err != nil {
return false, nil, err
l.requestsFailed.Inc()
return nil, err
}
resp, err := l.client.exceedsLimits(ctx, req)
resp, err := l.client.ExceedsLimits(ctx, req)
if err != nil {
return false, nil, err
}
if len(resp.Results) == 0 {
return false, nil, nil
l.requestsFailed.Inc()
return nil, err
}
reasonsForHashes := make(map[uint64][]string)
for _, result := range resp.Results {
reasons := reasonsForHashes[result.StreamHash]
humanized := limits.Reason(result.Reason).String()
reasons = append(reasons, humanized)
reasonsForHashes[result.StreamHash] = reasons
}
return true, reasonsForHashes, nil
return resp.Results, nil
}

func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.ExceedsLimitsRequest, error) {
Expand All @@ -156,10 +177,3 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
Streams: streamMetadata,
}, nil
}

func firstReasonForHashes(reasonsForHashes map[uint64][]string) string {
for _, reasons := range reasonsForHashes {
return reasons[0]
}
return "unknown reason"
}
50 changes: 20 additions & 30 deletions pkg/distributor/ingest_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type mockIngestLimitsFrontendClient struct {
}

// Implements the ingestLimitsFrontendClient interface.
func (c *mockIngestLimitsFrontendClient) exceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
func (c *mockIngestLimitsFrontendClient) ExceedsLimits(_ context.Context, r *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
c.calls.Add(1)
if c.expectedRequest != nil {
require.Equal(c.t, c.expectedRequest, r)
Expand All @@ -49,7 +49,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
response *proto.ExceedsLimitsResponse
responseErr error
expectedStreams []KeyedStream
expectedReasons map[uint64][]string
expectedErr string
}{{
// This test also asserts that streams are returned unmodified.
Expand Down Expand Up @@ -116,7 +115,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
}},
},
expectedStreams: []KeyedStream{},
expectedReasons: map[uint64][]string{1: {"max streams"}},
}, {
name: "one of two streams exceeds limits",
tenant: "test",
Expand Down Expand Up @@ -145,7 +143,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
HashKey: 2000, // Should not be used.
HashKeyNoShard: 2,
}},
expectedReasons: map[uint64][]string{1: {"max streams"}},
}, {
name: "does not exceed limits",
tenant: "test",
Expand Down Expand Up @@ -174,7 +171,6 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
HashKey: 2000, // Should not be used.
HashKeyNoShard: 2,
}},
expectedReasons: nil,
}}

for _, test := range tests {
Expand All @@ -188,35 +184,29 @@ func TestIngestLimits_EnforceLimits(t *testing.T) {
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
streams, reasons, err := l.enforceLimits(ctx, test.tenant, test.streams)
accepted, err := l.EnforceLimits(ctx, test.tenant, test.streams)
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
// The streams should be returned unmodified.
require.Equal(t, test.streams, streams)
require.Nil(t, reasons)
require.Equal(t, test.streams, accepted)
} else {
require.Nil(t, err)
require.Equal(t, test.expectedStreams, streams)
require.Equal(t, test.expectedReasons, reasons)
require.Equal(t, test.expectedStreams, accepted)
}
})
}
}

// This test asserts that when checking ingest limits the expected proto
// message is sent, and that for a given response, the result contains the
// expected streams each with their expected reasons.
func TestIngestLimits_ExceedsLimits(t *testing.T) {
tests := []struct {
name string
tenant string
streams []KeyedStream
expectedRequest *proto.ExceedsLimitsRequest
response *proto.ExceedsLimitsResponse
responseErr error
expectedExceedsLimits bool
expectedReasons map[uint64][]string
expectedErr string
name string
tenant string
streams []KeyedStream
expectedRequest *proto.ExceedsLimitsRequest
response *proto.ExceedsLimitsResponse
responseErr error
expectedResult []*proto.ExceedsLimitsResult
expectedErr string
}{{
name: "error should be returned if limits cannot be checked",
tenant: "test",
Expand Down Expand Up @@ -249,8 +239,10 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
Reason: uint32(limits.ReasonMaxStreams),
}},
},
expectedExceedsLimits: true,
expectedReasons: map[uint64][]string{1: {"max streams"}},
expectedResult: []*proto.ExceedsLimitsResult{{
StreamHash: 1,
Reason: uint32(limits.ReasonMaxStreams),
}},
}, {
name: "does not exceed limits",
tenant: "test",
Expand All @@ -266,7 +258,7 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
response: &proto.ExceedsLimitsResponse{
Results: []*proto.ExceedsLimitsResult{},
},
expectedReasons: nil,
expectedResult: []*proto.ExceedsLimitsResult{},
}}

for _, test := range tests {
Expand All @@ -280,15 +272,13 @@ func TestIngestLimits_ExceedsLimits(t *testing.T) {
l := newIngestLimits(&mockClient, prometheus.NewRegistry())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
exceedsLimits, reasons, err := l.exceedsLimits(ctx, test.tenant, test.streams)
res, err := l.ExceedsLimits(ctx, test.tenant, test.streams)
if test.expectedErr != "" {
require.EqualError(t, err, test.expectedErr)
require.False(t, exceedsLimits)
require.Nil(t, reasons)
require.Nil(t, res)
} else {
require.Nil(t, err)
require.Equal(t, test.expectedExceedsLimits, exceedsLimits)
require.Equal(t, test.expectedReasons, reasons)
require.Equal(t, test.expectedResult, res)
}
})
}
Expand Down
Loading