@@ -2,6 +2,9 @@ package frontend
2
2
3
3
import (
4
4
"context"
5
+ "slices"
6
+ "sort"
7
+ "strings"
5
8
6
9
"github.com/go-kit/log"
7
10
"github.com/go-kit/log/level"
21
24
LimitsRead = ring .NewOp ([]ring.InstanceState {ring .ACTIVE }, nil )
22
25
)
23
26
27
+ var (
28
+ // lexicoCmp sorts a slice of strings in lexicographical order.
29
+ lexicoCmp = func (a , b string ) int {
30
+ return strings .Compare (a , b )
31
+ }
32
+ )
33
+
24
34
// RingStreamUsageGatherer implements StreamUsageGatherer. It uses a ring to find
25
35
// limits instances.
26
36
type RingStreamUsageGatherer struct {
@@ -29,6 +39,7 @@ type RingStreamUsageGatherer struct {
29
39
pool * ring_client.Pool
30
40
numPartitions int
31
41
assignedPartitionsCache Cache [string , * logproto.GetAssignedPartitionsResponse ]
42
+ zoneCmp func (a , b string ) int
32
43
}
33
44
34
45
// NewRingStreamUsageGatherer returns a new RingStreamUsageGatherer.
@@ -45,6 +56,7 @@ func NewRingStreamUsageGatherer(
45
56
pool : pool ,
46
57
numPartitions : numPartitions ,
47
58
assignedPartitionsCache : assignedPartitionsCache ,
59
+ zoneCmp : lexicoCmp ,
48
60
}
49
61
}
50
62
@@ -53,70 +65,129 @@ func (g *RingStreamUsageGatherer) GetStreamUsage(ctx context.Context, r GetStrea
53
65
if len (r .StreamHashes ) == 0 {
54
66
return nil , nil
55
67
}
56
- return g .forAllBackends (ctx , r )
57
- }
58
-
59
- // TODO(grobinson): Need to rename this to something more accurate.
60
- func (g * RingStreamUsageGatherer ) forAllBackends (ctx context.Context , r GetStreamUsageRequest ) ([]GetStreamUsageResponse , error ) {
68
+ // Get all healthy instances across all zones.
61
69
rs , err := g .ring .GetAllHealthy (LimitsRead )
62
70
if err != nil {
63
71
return nil , err
64
72
}
65
- return g .forGivenReplicaSet (ctx , rs , r )
66
- }
67
-
68
- func (g * RingStreamUsageGatherer ) forGivenReplicaSet (ctx context.Context , rs ring.ReplicationSet , r GetStreamUsageRequest ) ([]GetStreamUsageResponse , error ) {
69
- partitionConsumers , err := g .getPartitionConsumers (ctx , rs .Instances )
73
+ // Get the partition consumers for each zone.
74
+ zonesPartitions , err := g .getZoneAwarePartitionConsumers (ctx , rs .Instances )
70
75
if err != nil {
71
76
return nil , err
72
77
}
78
+ // In practice, we want zones to be queried in random order to spread
79
+ // reads. However, in tests we want a deterministic order so test cases
80
+ // are stable and reproducible. When compared to just iterating over
81
+ // a map, this allows us to achieve both.
82
+ zonesToQuery := make ([]string , len (zonesPartitions ))
83
+ for zone := range zonesPartitions {
84
+ zonesToQuery = append (zonesToQuery , zone )
85
+ }
86
+ slices .SortFunc (zonesToQuery , g .zoneCmp )
87
+ // Make a copy of the stream hashes as we will prune this slice each time
88
+ // we receive the responses from a zone.
89
+ streamHashesToQuery := make ([]uint64 , len (r .StreamHashes ))
90
+ copy (streamHashesToQuery , r .StreamHashes )
91
+ // Query each zone as ordered in zonesToQuery. If a zone answers all
92
+ // stream hashes, the request is satisifed and there is no need to query
93
+ // subsequent zones. If a zone answers just a subset of stream hashes
94
+ // (i.e. the instance that is consuming a partition is unavailable or the
95
+ // partition that owns one or more stream hashes does not have a consumer)
96
+ // then query the next zone for the remaining stream hashes. We repeat
97
+ // this process until all stream hashes have been queried or we have
98
+ // exhausted all zones.
99
+ responses := make ([]GetStreamUsageResponse , 0 )
100
+ for _ , zone := range zonesToQuery {
101
+ result , streamHashesToDelete , err := g .getStreamUsage (ctx , r .Tenant , streamHashesToQuery , zonesPartitions [zone ])
102
+ if err != nil {
103
+ continue
104
+ }
105
+ responses = append (responses , result ... )
106
+ // Delete the queried stream hashes from the slice of stream hashes
107
+ // to query. The slice of queried stream hashes must be sorted so we
108
+ // can use sort.Search to subtract the two slices.
109
+ slices .Sort (streamHashesToDelete )
110
+ streamHashesToQuery = slices .DeleteFunc (streamHashesToQuery , func (streamHashToQuery uint64 ) bool {
111
+ // see https://pkg.go.dev/sort#Search
112
+ i := sort .Search (len (streamHashesToDelete ), func (i int ) bool {
113
+ return streamHashesToDelete [i ] >= streamHashToQuery
114
+ })
115
+ return i < len (streamHashesToDelete ) && streamHashesToDelete [i ] == streamHashToQuery
116
+ })
117
+ // All stream hashes have been queried.
118
+ if len (streamHashesToQuery ) == 0 {
119
+ break
120
+ }
121
+ }
122
+ // Treat remaining stream hashes as unknown streams.
123
+ if len (streamHashesToQuery ) > 0 {
124
+ responses = append (responses , GetStreamUsageResponse {
125
+ Response : & logproto.GetStreamUsageResponse {
126
+ Tenant : "test" ,
127
+ UnknownStreams : streamHashesToQuery ,
128
+ },
129
+ })
130
+ }
131
+ return responses , nil
132
+ }
133
+
134
+ type getStreamUsageResponse struct {
135
+ addr string
136
+ response * logproto.GetStreamUsageResponse
137
+ streamHashes []uint64
138
+ }
73
139
140
+ func (g * RingStreamUsageGatherer ) getStreamUsage (ctx context.Context , tenant string , streamHashes []uint64 , partitions map [int32 ]string ) ([]GetStreamUsageResponse , []uint64 , error ) {
74
141
instancesToQuery := make (map [string ][]uint64 )
75
- for _ , hash := range r . StreamHashes {
76
- partitionID := int32 (hash % uint64 (g .numPartitions ))
77
- addr , ok := partitionConsumers [partitionID ]
142
+ for _ , streamHash := range streamHashes {
143
+ partitionID := int32 (streamHash % uint64 (g .numPartitions ))
144
+ addr , ok := partitions [partitionID ]
78
145
if ! ok {
79
146
// TODO Replace with a metric for partitions missing owners.
80
147
level .Warn (g .logger ).Log ("msg" , "no instance found for partition" , "partition" , partitionID )
81
148
continue
82
149
}
83
- instancesToQuery [addr ] = append (instancesToQuery [addr ], hash )
150
+ instancesToQuery [addr ] = append (instancesToQuery [addr ], streamHash )
84
151
}
85
-
152
+ // Get the stream usage from each instance.
153
+ responseCh := make (chan getStreamUsageResponse , len (instancesToQuery ))
86
154
errg , ctx := errgroup .WithContext (ctx )
87
- responses := make ([]GetStreamUsageResponse , len (instancesToQuery ))
88
-
89
- // Query each instance for stream usage
90
- i := 0
91
- for addr , hashes := range instancesToQuery {
92
- j := i
93
- i ++
155
+ for addr , streamHashes := range instancesToQuery {
94
156
errg .Go (func () error {
95
157
client , err := g .pool .GetClientFor (addr )
96
158
if err != nil {
97
- return err
159
+ level .Error (g .logger ).Log ("failed to get client for instance" , "instance" , addr , "err" , err .Error ())
160
+ return nil
98
161
}
99
-
100
162
protoReq := & logproto.GetStreamUsageRequest {
101
- Tenant : r . Tenant ,
102
- StreamHashes : hashes ,
163
+ Tenant : tenant ,
164
+ StreamHashes : streamHashes ,
103
165
}
104
-
105
166
resp , err := client .(logproto.IngestLimitsClient ).GetStreamUsage (ctx , protoReq )
106
167
if err != nil {
107
- return err
168
+ level .Error (g .logger ).Log ("failed to get stream usage for instance" , "instance" , addr , "err" , err .Error ())
169
+ return nil
170
+ }
171
+ responseCh <- getStreamUsageResponse {
172
+ addr : addr ,
173
+ response : resp ,
174
+ streamHashes : streamHashes ,
108
175
}
109
-
110
- responses [j ] = GetStreamUsageResponse {Addr : addr , Response : resp }
111
176
return nil
112
177
})
113
178
}
114
-
115
- if err := errg .Wait (); err != nil {
116
- return nil , err
179
+ errg .Wait () //nolint
180
+ close (responseCh )
181
+ responses := make ([]GetStreamUsageResponse , 0 , len (instancesToQuery ))
182
+ queriedStreamHashes := make ([]uint64 , 0 , len (streamHashes ))
183
+ for r := range responseCh {
184
+ responses = append (responses , GetStreamUsageResponse {
185
+ Addr : r .addr ,
186
+ Response : r .response ,
187
+ })
188
+ queriedStreamHashes = append (queriedStreamHashes , r .streamHashes ... )
117
189
}
118
-
119
- return responses , nil
190
+ return responses , queriedStreamHashes , nil
120
191
}
121
192
122
193
type zonePartitionConsumersResult struct {
0 commit comments