Skip to content

Commit 4a733fc

Browse files
feat: add method to get zone-aware partition consumers (#17377)
1 parent 6aed2ba commit 4a733fc

File tree

2 files changed

+232
-13
lines changed

2 files changed

+232
-13
lines changed

pkg/limits/frontend/ring.go

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,81 @@ func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, rs rin
119119
return responses, nil
120120
}
121121

122+
type zonePartitionConsumersResult struct {
123+
zone string
124+
partitions map[int32]string
125+
}
126+
127+
// getZoneAwarePartitionConsumers returns partition consumers for each zone
128+
// in the replication set. If a zone has no active partition consumers, the
129+
// zone will still be returned but its partition consumers will be nil.
130+
// If ZoneAwarenessEnabled is false, it returns all partition consumers under
131+
// a psuedo-zone ("").
132+
func (g *RingStreamUsageGatherer) getZoneAwarePartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[string]map[int32]string, error) {
133+
zoneDescs := make(map[string][]ring.InstanceDesc)
134+
for _, instance := range instances {
135+
zoneDescs[instance.Zone] = append(zoneDescs[instance.Zone], instance)
136+
}
137+
// Get the partition consumers for each zone.
138+
resultsCh := make(chan zonePartitionConsumersResult, len(zoneDescs))
139+
errg, ctx := errgroup.WithContext(ctx)
140+
for zone, instances := range zoneDescs {
141+
errg.Go(func() error {
142+
res, err := g.getPartitionConsumers(ctx, instances)
143+
if err != nil {
144+
level.Error(g.logger).Log("msg", "failed to get partition consumers for zone", "zone", zone, "err", err.Error())
145+
}
146+
// If the consumers could not be fetched for a zone, then it is
147+
// expected partitionConsumers is nil.
148+
resultsCh <- zonePartitionConsumersResult{
149+
zone: zone,
150+
partitions: res,
151+
}
152+
return nil
153+
})
154+
}
155+
errg.Wait() //nolint
156+
close(resultsCh)
157+
results := make(map[string]map[int32]string)
158+
for result := range resultsCh {
159+
results[result.zone] = result.partitions
160+
}
161+
return results, nil
162+
}
163+
122164
type getAssignedPartitionsResponse struct {
123-
Addr string
124-
Response *logproto.GetAssignedPartitionsResponse
165+
addr string
166+
response *logproto.GetAssignedPartitionsResponse
125167
}
126168

169+
// getPartitionConsumers returns the consumer for each partition.
170+
171+
// In some cases, it might not be possible to know the consumer for a
172+
// partition. If this happens, it returns the consumers for a subset of
173+
// partitions that it does know about.
174+
//
175+
// For example, if a partition does not have a consumer then the partition
176+
// will be absent from the result. Likewise, if an instance does not respond,
177+
// the partition that it consumes will be absent from the result too. This
178+
// also means that if no partitions are assigned consumers, or if no instances
179+
// respond, the result will be empty.
180+
//
181+
// This method is not zone-aware, so if ZoneAwarenessEnabled is true, it
182+
// should be called once for each zone, and instances should be filtered to
183+
// the respective zone. Alternatively, you can pass all instances for all zones
184+
// to find the most up to date consumer for each partition across all zones.
127185
func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[int32]string, error) {
128186
errg, ctx := errgroup.WithContext(ctx)
129-
responses := make(chan getAssignedPartitionsResponse, len(instances))
187+
responseCh := make(chan getAssignedPartitionsResponse, len(instances))
130188
for _, instance := range instances {
131189
errg.Go(func() error {
132190
// We use a cache to eliminate redundant gRPC requests for
133191
// GetAssignedPartitions as the set of assigned partitions is
134192
// expected to be stable outside consumer rebalances.
135193
if resp, ok := g.assignedPartitionsCache.Get(instance.Addr); ok {
136-
responses <- getAssignedPartitionsResponse{
137-
Addr: instance.Addr,
138-
Response: resp,
194+
responseCh <- getAssignedPartitionsResponse{
195+
addr: instance.Addr,
196+
response: resp,
139197
}
140198
return nil
141199
}
@@ -150,24 +208,24 @@ func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, ins
150208
return nil
151209
}
152210
g.assignedPartitionsCache.Set(instance.Addr, resp)
153-
responses <- getAssignedPartitionsResponse{
154-
Addr: instance.Addr,
155-
Response: resp,
211+
responseCh <- getAssignedPartitionsResponse{
212+
addr: instance.Addr,
213+
response: resp,
156214
}
157215
return nil
158216
})
159217
}
160218
if err := errg.Wait(); err != nil {
161219
return nil, err
162220
}
163-
close(responses)
221+
close(responseCh)
164222
highestTimestamp := make(map[int32]int64)
165223
assigned := make(map[int32]string)
166-
for resp := range responses {
167-
for partition, assignedAt := range resp.Response.AssignedPartitions {
224+
for resp := range responseCh {
225+
for partition, assignedAt := range resp.response.AssignedPartitions {
168226
if t := highestTimestamp[partition]; t < assignedAt {
169227
highestTimestamp[partition] = assignedAt
170-
assigned[partition] = resp.Addr
228+
assigned[partition] = resp.addr
171229
}
172230
}
173231
}

pkg/limits/frontend/ring_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,167 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
205205
}
206206
}
207207

208+
func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
209+
tests := []struct {
210+
name string
211+
instances []ring.InstanceDesc
212+
expectedAssignedPartitionsRequests []*logproto.GetAssignedPartitionsRequest
213+
getAssignedPartitionsResponses []*logproto.GetAssignedPartitionsResponse
214+
getAssignedPartitionsResponseErrs []error
215+
expected map[string]map[int32]string
216+
}{{
217+
name: "single zone",
218+
instances: []ring.InstanceDesc{{
219+
Addr: "instance-a-0",
220+
Zone: "a",
221+
}},
222+
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}},
223+
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
224+
AssignedPartitions: map[int32]int64{
225+
0: time.Now().UnixNano(),
226+
},
227+
}},
228+
getAssignedPartitionsResponseErrs: []error{nil},
229+
expected: map[string]map[int32]string{"a": {0: "instance-a-0"}},
230+
}, {
231+
name: "two zones",
232+
instances: []ring.InstanceDesc{{
233+
Addr: "instance-a-0",
234+
Zone: "a",
235+
}, {
236+
Addr: "instance-b-0",
237+
Zone: "b",
238+
}},
239+
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}},
240+
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
241+
AssignedPartitions: map[int32]int64{
242+
0: time.Now().UnixNano(),
243+
},
244+
}, {
245+
AssignedPartitions: map[int32]int64{
246+
0: time.Now().UnixNano(),
247+
},
248+
}},
249+
getAssignedPartitionsResponseErrs: []error{nil, nil},
250+
expected: map[string]map[int32]string{
251+
"a": {0: "instance-a-0"},
252+
"b": {0: "instance-b-0"},
253+
},
254+
}, {
255+
name: "two zones, subset of partitions in zone b",
256+
instances: []ring.InstanceDesc{{
257+
Addr: "instance-a-0",
258+
Zone: "a",
259+
}, {
260+
Addr: "instance-b-0",
261+
Zone: "b",
262+
}},
263+
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}},
264+
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
265+
AssignedPartitions: map[int32]int64{
266+
0: time.Now().UnixNano(),
267+
1: time.Now().UnixNano(),
268+
},
269+
}, {
270+
AssignedPartitions: map[int32]int64{
271+
0: time.Now().UnixNano(),
272+
},
273+
}},
274+
getAssignedPartitionsResponseErrs: []error{nil, nil},
275+
expected: map[string]map[int32]string{
276+
"a": {0: "instance-a-0", 1: "instance-a-0"},
277+
"b": {0: "instance-b-0"},
278+
},
279+
}, {
280+
name: "two zones, error in zone b",
281+
instances: []ring.InstanceDesc{{
282+
Addr: "instance-a-0",
283+
Zone: "a",
284+
}, {
285+
Addr: "instance-b-0",
286+
Zone: "b",
287+
}},
288+
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}},
289+
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
290+
AssignedPartitions: map[int32]int64{
291+
0: time.Now().UnixNano(),
292+
1: time.Now().UnixNano(),
293+
},
294+
}, nil},
295+
getAssignedPartitionsResponseErrs: []error{nil, errors.New("an unexpected error occurred")},
296+
expected: map[string]map[int32]string{
297+
"a": {0: "instance-a-0", 1: "instance-a-0"},
298+
"b": {},
299+
},
300+
}, {
301+
name: "two zones, different number of instances per zone",
302+
instances: []ring.InstanceDesc{{
303+
Addr: "instance-a-0",
304+
Zone: "a",
305+
}, {
306+
Addr: "instance-a-1",
307+
Zone: "a",
308+
}, {
309+
Addr: "instance-b-0",
310+
Zone: "b",
311+
}},
312+
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}, {}},
313+
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
314+
AssignedPartitions: map[int32]int64{
315+
0: time.Now().UnixNano(),
316+
},
317+
}, {
318+
AssignedPartitions: map[int32]int64{
319+
1: time.Now().UnixNano(),
320+
},
321+
}, {
322+
AssignedPartitions: map[int32]int64{
323+
0: time.Now().UnixNano(),
324+
1: time.Now().UnixNano(),
325+
},
326+
}},
327+
getAssignedPartitionsResponseErrs: []error{nil, nil, nil},
328+
expected: map[string]map[int32]string{
329+
"a": {0: "instance-a-0", 1: "instance-a-1"},
330+
"b": {0: "instance-b-0", 1: "instance-b-0"},
331+
},
332+
}}
333+
334+
for _, test := range tests {
335+
t.Run(test.name, func(t *testing.T) {
336+
// Set up the mock clients, one for each pair of mock RPC responses.
337+
clients := make([]*mockIngestLimitsClient, len(test.instances))
338+
for i := range test.instances {
339+
// These test cases assume one request/response per instance.
340+
expectedNumAssignedPartitionsRequests := 0
341+
if test.expectedAssignedPartitionsRequests[i] != nil {
342+
expectedNumAssignedPartitionsRequests = 1
343+
}
344+
clients[i] = &mockIngestLimitsClient{
345+
t: t,
346+
expectedAssignedPartitionsRequest: test.expectedAssignedPartitionsRequests[i],
347+
getAssignedPartitionsResponse: test.getAssignedPartitionsResponses[i],
348+
getAssignedPartitionsResponseErr: test.getAssignedPartitionsResponseErrs[i],
349+
expectedNumAssignedPartitionsRequests: expectedNumAssignedPartitionsRequests,
350+
}
351+
t.Cleanup(clients[i].AssertExpectedNumRequests)
352+
}
353+
// Set up the mocked ring and client pool for the tests.
354+
readRing, clientPool := newMockRingWithClientPool(t, "test", clients, test.instances)
355+
cache := NewNopCache[string, *logproto.GetAssignedPartitionsResponse]()
356+
g := NewRingStreamUsageGatherer(readRing, clientPool, 2, cache, log.NewNopLogger())
357+
358+
// Set a maximum upper bound on the test execution time.
359+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
360+
defer cancel()
361+
362+
result, err := g.getZoneAwarePartitionConsumers(ctx, test.instances)
363+
require.NoError(t, err)
364+
require.Equal(t, test.expected, result)
365+
})
366+
}
367+
}
368+
208369
func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
209370
tests := []struct {
210371
name string

0 commit comments

Comments
 (0)