-
Notifications
You must be signed in to change notification settings - Fork 3.8k
feat(ingest-limits): Enforce synchronously max stream limit per partition #17527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
partitionID := int32(stream.StreamHash % uint64(s.cfg.NumPartitions)) | ||
|
||
if !s.partitionManager.Has(partitionID) { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should reject this case, at least that's how I've described the expected behavior it in the issue. For example, suppose we send a stream to the wrong instance because we are working on a outdated GetAssignedPartitions
response, the stream will be accepted. But we don't know that it should have been accepted as we sent it to the wrong pod that doesn't own this partition.
c83ddd9
to
229012e
Compare
229012e
to
d81ce90
Compare
c9964d6
to
1adf7ee
Compare
7d3fabf
to
a07946f
Compare
return | ||
} | ||
for _, stream := range req.Streams { | ||
partitionID := int32(stream.StreamHash % uint64(s.cfg.NumPartitions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are missing a check if the partitionID
is also assigned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed. Do you think we can silently skip the streams here then? Or should er report this as an error to the frontend to retry on the right partition consumer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for now we need to drop the streams (i.e. reject them). If we skip, then the streams are accepted, and this will allow limits to be exceeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I have to precise a bit my question here. We already drop these streams, however we just write a warn log message in addition. Do we want to bubble up an error to the frontend to retry on other partition consumers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK, to be clear, the streams also need to be rejected (not just dropped).
pkg/limits/stream_metadata.go
Outdated
s.stripes[i][tenant][partitionID] = make([]Stream, 0) | ||
} | ||
var ( | ||
exceedLimits = make(map[Reason][]uint64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be a slice of len(streams)
, as it doesn't look like we really use the result as a map in ingest_limits.go
, instead we just iterate over it and append to another slice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we are using the key (i.e. Reason
) as to feed the results:
var results []*logproto.ExceedsLimitsResult
for reason, streamHashes := range exceedLimits {
for _, streamHash := range streamHashes {
results = append(results, &logproto.ExceedsLimitsResult{
StreamHash: streamHash,
Reason: uint32(reason),
})
}
}
Also we will need the key as a differentiator when building the rate limits on a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You misunderstood my question I think! 😄 But the last sentence I think explains why this needs to be a map instead of slice of structs? You want to count all streams that are rate limited, or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes counting rate limited streams was what I had in mind. That's what we did in the frontend previously.
) | ||
|
||
// Count as active streams all stream that are not expired. | ||
for _, stored := range s.stripes[i][tenant][partitionID] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a feeling this will be a 🔥 path, we might need to optimize it later 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this too because on average we have ~20-30k streams per partitions. Let's address this as a counter per partition to be stored along the metadata in a follow up PR. I think this will immediately solve our problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you intend to keep the count up to date over time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind I was thinking out loudly about a active []map[string]map[int32]uint64
close to our stripes that could be counted up in StoreCond
and down in Evict
, but I totally forgot that evict is periodic. Sorry turns out to be a bad idea.
|
||
// Calculate rate using only data from within the rate window | ||
rate := float64(totalSize) / s.cfg.RateWindow.Seconds() | ||
s.metrics.tenantIngestedBytesTotal.WithLabelValues(req.Tenant).Add(float64(ingestedBytes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's OK to move this metric to StoreIf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference is to not overload the stream-metadata implementation with more dependencies than needed. Metrics are relevant to the hosting service. WDYT?
What this PR does / why we need it:
This pull request is adding the implementation to enforce synchronously the max stream limit per partition on each ingest limit pod. This ensures that we can check the limits locally per partition without suffering consumption lag from the queue.
Which issue(s) this PR fixes:
Fixes grafana/loki-private#1632
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR