Skip to content

Conversation

periklis
Copy link
Collaborator

What this PR does / why we need it:
This PR updates the stream usage retrieval routine in the ingest-limits-frontend component to query only ingest-limits instances that own the requested streams. The purpose of this optimization is to streamline traffic between these components to necessary minimum per ExceedLimits request.

Which issue(s) this PR fixes:
Fixes #

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@periklis periklis self-assigned this Mar 27, 2025
@periklis periklis requested a review from a team as a code owner March 27, 2025 19:31
@github-actions github-actions bot added the type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories label Mar 27, 2025
@periklis periklis changed the title Query only owning consumers fix(ingest-limits): Get usage from owing instances only Mar 27, 2025
Copy link
Contributor

github-actions bot commented Mar 27, 2025

💻 Deploy preview deleted.

[recheck_period: <duration> | default = 10s]

# The number of partitions to use for the ring.
# CLI flag: -ingest-limits-frontend.num-partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried that someone configures this for ingest-limits and forgets to configure it for ingest-limits-frontend. Perhaps we could share the configuration here? I'm not sure what's best...

@grobinson-grafana
Copy link
Contributor

Can we remove partitions from GetStreamUsageRequest in logproto.proto? Does it still serve a use?

@periklis
Copy link
Collaborator Author

periklis commented Apr 3, 2025

Can we remove partitions from GetStreamUsageRequest in logproto.proto? Does it still serve a use?

Yes it does because we want to check the requested patitions vs. the assigned ones. Between a the two RPC calls we still can get a re-assignment.

@periklis periklis force-pushed the query-only-owning-consumers branch 2 times, most recently from 249a12c to 0e9d003 Compare April 3, 2025 11:44
@periklis periklis force-pushed the query-only-owning-consumers branch from d9b6cf5 to 0558080 Compare April 3, 2025 13:39
instances := make([]ring.InstanceDesc, len(clients))

for i := 0; i < len(test.expectedAssignedPartitionsRequest); i++ {
expectedStreamUsageReq := (*logproto.GetStreamUsageRequest)(nil)
Copy link
Contributor

@grobinson-grafana grobinson-grafana Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do (the whole block of code rather than the specific line)? I'm not sure I follow here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad that you point that out. The whole loop needs an overhaul here. This setup was written when the two requests types matched one to one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I delete this, the tests still pass 😕

clients[i] = &mockIngestLimitsClient{
					expectedAssignedPartitionsRequest: test.expectedAssignedPartitionsRequest[i],
					getAssignedPartitionsResponse:     test.getAssignedPartitionsResponses[i],
					expectedStreamUsageRequest:        test.expectedStreamUsageRequest[i],
					getStreamUsageResponse:            test.getStreamUsageResponses[i],
					t:                                 t,
				}

@grobinson-grafana
Copy link
Contributor

One of the tests is panicking in frontend.go it seems:

// Sum the number of active streams and rates of all responses.
	for _, resp := range resps {
		activeStreamsTotal += resp.Response.ActiveStreams
		rateTotal += float64(resp.Response.Rate)
	}

@grobinson-grafana
Copy link
Contributor

One of the tests is panicking in frontend.go it seems:

// Sum the number of active streams and rates of all responses.
	for _, resp := range resps {
		activeStreamsTotal += resp.Response.ActiveStreams
		rateTotal += float64(resp.Response.Rate)
	}

It's because of j := 0 instead of j := i.

@periklis periklis force-pushed the query-only-owning-consumers branch from 1c3b77e to 744b63d Compare April 4, 2025 07:59
}
}

// Set up the mock clients for the stream usage requests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I still don't understand this? The tests pass without it. If you want to put in nils, wouldn't this work?

diff --git a/pkg/limits/frontend/ring_test.go b/pkg/limits/frontend/ring_test.go
index e7621d0cf..b3f2b7a48 100644
--- a/pkg/limits/frontend/ring_test.go
+++ b/pkg/limits/frontend/ring_test.go
@@ -106,7 +106,7 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
                        Tenant:       "test",
                        StreamHashes: []uint64{1}, // Hash 1 maps to partition 1
                },
-               expectedAssignedPartitionsRequest: []*logproto.GetAssignedPartitionsRequest{{}, {}},
+               expectedAssignedPartitionsRequest: []*logproto.GetAssignedPartitionsRequest{nil, nil},
                getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
                        AssignedPartitions: map[int32]int64{
                                1: time.Now().Add(-time.Second).UnixNano(),
@@ -116,15 +116,17 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
                                1: time.Now().UnixNano(),
                        },
                }},
-               expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{}, {
-                       Tenant:       "test",
-                       StreamHashes: []uint64{1},
-               }},
-               getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{}, {
-                       Tenant:        "test",
-                       ActiveStreams: 1,
-                       Rate:          10,
-               }},
+               expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{
+                       nil, {
+                               Tenant:       "test",
+                               StreamHashes: []uint64{1},
+                       }},
+               getStreamUsageResponses: []*logproto.GetStreamUsageResponse{
+                       nil, {
+                               Tenant:        "test",
+                               ActiveStreams: 1,
+                               Rate:          10,
+                       }},
                expectedResponses: []GetStreamUsageResponse{{
                        Addr: "instance-1",
                        Response: &logproto.GetStreamUsageResponse{
@@ -148,17 +150,11 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
                                        t:                                 t,
                                        expectedAssignedPartitionsRequest: test.expectedAssignedPartitionsRequest[i],
                                        getAssignedPartitionsResponse:     test.getAssignedPartitionsResponses[i],
+                                       expectedStreamUsageRequest:        test.expectedStreamUsageRequest[i],
+                                       getStreamUsageResponse:            test.getStreamUsageResponses[i],
                                }
                        }

-                       // Set up the mock clients for the stream usage requests.
-                       // Note: Not every client will have a stream usage request because the
-                       // requested stream hashes are owned only by a subset of partition consumers.
-                       for i := range test.expectedStreamUsageRequest {
-                               clients[i].(*mockIngestLimitsClient).expectedStreamUsageRequest = test.expectedStreamUsageRequest[i]
-                               clients[i].(*mockIngestLimitsClient).getStreamUsageResponse = test.getStreamUsageResponses[i]
-                       }
-
                        // Set up the instances for the ring.
                        for i := range len(clients) {
                                instances[i] = ring.InstanceDesc{

@periklis periklis merged commit 5229a2a into main Apr 4, 2025
63 checks passed
@periklis periklis deleted the query-only-owning-consumers branch April 4, 2025 13:41
grobinson-grafana added a commit that referenced this pull request Apr 14, 2025
This commit fixes a bug where limits were incorrect after the change
in #16937. This bug occurred because we stopped asking
each limit instance if it knew about all stream hashes, and instead
starting asking each limit instance if it knew about the stream
hashes for its assigned partitions. This broke the logic in the
frontend that was taking the intersection of all responses to
calculate if a stream was unknown. It should now take the union
instead.
grobinson-grafana added a commit that referenced this pull request Apr 15, 2025
This commit fixes a bug where limits were incorrect after the change
in #16937. This bug occurred because we stopped asking
each limit instance if it knew about all stream hashes, and instead
starting asking each limit instance if it knew about the stream
hashes for its assigned partitions. This broke the logic in the
frontend that was taking the intersection of all responses to
calculate if a stream was unknown. It should now take the union
instead.
chaudum pushed a commit that referenced this pull request Apr 15, 2025
Contains backports of these commits:

```
2cde9b1 fix: skip streams over limits in dry-run mode (#17114)
805125c fix: fix a bug where limits were incorrect after #16937 (#17224)
d106042 fix: fix bug where expectedStreamUsageRequest was not used (#17223)
69aeda1 feat: add tests for enforcing limits in distributors (#17124)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/L type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants