Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions pkg/limits/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
tests := []struct {
name string
exceedsLimitsRequest *logproto.ExceedsLimitsRequest
numPartitions int
getAssignedPartitionsResponses []*logproto.GetAssignedPartitionsResponse
expectedStreamUsageRequest []*logproto.GetStreamUsageRequest
getStreamUsageResponses []*logproto.GetStreamUsageResponse
Expand All @@ -40,6 +41,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x1},
},
},
numPartitions: 1,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
Expand All @@ -62,6 +64,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x2},
},
},
numPartitions: 1,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
Expand All @@ -88,6 +91,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x2}, // Also exceeds limits.
},
},
numPartitions: 1,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
Expand Down Expand Up @@ -123,6 +127,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x7}, // Also exceeds limits.
},
},
numPartitions: 1,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
Expand Down Expand Up @@ -158,6 +163,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x2}, // Also exceeds limits.
},
},
numPartitions: 2,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(), // Instance 0 owns partition 0.
Expand All @@ -171,10 +177,10 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
// and instance 1 for the data for partition 1.
expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{
Tenant: "test",
StreamHashes: []uint64{0x1, 0x2},
StreamHashes: []uint64{0x2},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the main fix. When we stopped sending the partition ID in GetStreamusageRequest, we started sending just the stream hashes for the owned partitions, instead of all stream hashes. However, because expectedStreamUsageRequest was not being used in the test, this was never asserted. This test should have failed when we made the change a couple weeks ago.

}, {
Tenant: "test",
StreamHashes: []uint64{0x1, 0x2},
StreamHashes: []uint64{0x1},
}},
// Each instance will respond stating that it doesn't know about the
// other stream.
Expand Down Expand Up @@ -202,11 +208,16 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x2},
},
},
numPartitions: 1,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
},
}},
expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{
Tenant: "test",
StreamHashes: []uint64{0x1, 0x2},
}},
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{
Tenant: "test",
ActiveStreams: 2,
Expand All @@ -227,6 +238,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x2},
},
},
numPartitions: 2,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
Expand All @@ -236,6 +248,15 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
1: time.Now().UnixNano(),
},
}},
// The frontend will ask instance 0 for the data for partition 0,
// and instance 1 for the data for partition 1.
expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{
Tenant: "test",
StreamHashes: []uint64{0x2},
}, {
Tenant: "test",
StreamHashes: []uint64{0x1},
}},
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{
Tenant: "test",
ActiveStreams: 1,
Expand All @@ -260,11 +281,16 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x7},
},
},
numPartitions: 1,
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: 1,
0: time.Now().UnixNano(),
},
}},
expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{
Tenant: "test",
StreamHashes: []uint64{0x6, 0x7},
}},
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{
Tenant: "test",
ActiveStreams: 5,
Expand All @@ -290,6 +316,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
for i := range test.getAssignedPartitionsResponses {
clients[i] = &mockIngestLimitsClient{
getAssignedPartitionsResponse: test.getAssignedPartitionsResponses[i],
expectedStreamUsageRequest: test.expectedStreamUsageRequest[i],
getStreamUsageResponse: test.getStreamUsageResponses[i],
t: t,
}
Expand All @@ -310,7 +337,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
f := Frontend{
limits: l,
rateLimiter: rl,
streamUsage: NewRingStreamUsageGatherer(readRing, clientPool, log.NewNopLogger(), cache, 2),
streamUsage: NewRingStreamUsageGatherer(readRing, clientPool, log.NewNopLogger(), cache, test.numPartitions),
metrics: newMetrics(prometheus.NewRegistry()),
}

Expand Down