Skip to content

Commit ec5599c

Browse files
authored
fix(stream-generator): Split create/keep-alive streams routines (#17815)
1 parent 3686861 commit ec5599c

File tree

5 files changed

+119
-76
lines changed

5 files changed

+119
-76
lines changed

tools/stream-generator/generator/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ type Config struct {
4040
NumTenants int `yaml:"num_tenants"`
4141
TenantPrefix string `yaml:"tenant_prefix"`
4242
QPSPerTenant int `yaml:"qps_per_tenant"`
43-
BatchSize int `yaml:"batch_size"`
44-
BatchInterval time.Duration `yaml:"batch_interval"`
43+
CreateBatchSize int `yaml:"create_batch_size"`
44+
CreateNewStreamsInterval time.Duration `yaml:"create_new_streams_interval"`
4545
StreamsPerTenant int `yaml:"streams_per_tenant"`
4646
StreamLabels []string `yaml:"stream_labels"`
4747
MaxGlobalStreamsPerTenant int `yaml:"max_global_streams_per_tenant"`
@@ -84,8 +84,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
8484
f.IntVar(&c.NumTenants, "tenants.total", 1, "Number of tenants to generate metadata for")
8585
f.StringVar(&c.TenantPrefix, "tenants.prefix", "", "Prefix for tenant IDs")
8686
f.IntVar(&c.QPSPerTenant, "tenants.qps", 10, "Number of QPS per tenant")
87-
f.IntVar(&c.BatchSize, "tenants.streams.batch-size", 100, "Number of streams to send to Kafka per tick")
88-
f.DurationVar(&c.BatchInterval, "tenants.streams.batch-interval", 1*time.Minute, "Number of milliseconds to wait between batches. If set to 0, it will be calculated based on QPSPerTenant.")
87+
f.IntVar(&c.CreateBatchSize, "tenants.streams.create-batch-size", 100, "Number of streams to send to Kafka per tick")
88+
f.DurationVar(&c.CreateNewStreamsInterval, "tenants.streams.create-interval", 1*time.Minute, "Number of milliseconds to wait between batches. If set to 0, it will be calculated based on QPSPerTenant.")
8989
f.IntVar(&c.StreamsPerTenant, "tenants.streams.total", 100, "Number of streams per tenant")
9090
f.IntVar(&c.MaxGlobalStreamsPerTenant, "tenants.max-global-streams", 1000, "Maximum number of global streams per tenant")
9191
f.IntVar(&c.HTTPListenPort, "http-listen-port", 3100, "HTTP Listener port")
@@ -116,8 +116,8 @@ func (c *Config) Validate() error {
116116
}
117117
c.PushMode = PushModeType(c.pushModeRaw)
118118

119-
if c.BatchInterval <= 0 {
120-
c.BatchInterval = time.Second / time.Duration(c.QPSPerTenant)
119+
if c.CreateNewStreamsInterval <= 0 {
120+
c.CreateNewStreamsInterval = time.Second / time.Duration(c.QPSPerTenant)
121121
}
122122

123123
return nil

tools/stream-generator/generator/distributor.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,17 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/go-kit/log/level"
87
"github.com/grafana/dskit/user"
98
"github.com/grafana/loki/v3/pkg/distributor"
109
"github.com/grafana/loki/v3/pkg/logproto"
1110
)
1211

13-
func (s *Generator) sendStreams(ctx context.Context, batch []distributor.KeyedStream, streamIdx int, batchSize int, tenant string, errCh chan error) {
12+
func (s *Generator) sendStreams(ctx context.Context, tenant string, batch []distributor.KeyedStream, errCh chan<- error) {
13+
batchSize := len(batch)
14+
1415
userCtx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, tenant))
1516
if err != nil {
16-
errCh <- fmt.Errorf("failed to inject user context (tenant: %s, stream_idx: %d, batch_size: %d): %w", tenant, streamIdx, batchSize, err)
17+
errCh <- fmt.Errorf("failed to inject user context (tenant: %s, batch_size: %d): %w", tenant, batchSize, err)
1718
return
1819
}
1920

@@ -31,9 +32,7 @@ func (s *Generator) sendStreams(ctx context.Context, batch []distributor.KeyedSt
3132

3233
_, err = s.distributorClient.Push(userCtx, pushReq)
3334
if err != nil {
34-
errCh <- fmt.Errorf("failed to push streams to distributor (tenant: %s, stream_idx: %d, batch_size: %d): %w", tenant, streamIdx, batchSize, err)
35+
errCh <- fmt.Errorf("failed to push streams to distributor (tenant: %s, batch_size: %d): %w", tenant, batchSize, err)
3536
return
3637
}
37-
38-
level.Debug(s.logger).Log("msg", "Sent streams to distributor", "tenant", tenant, "batch_size", batchSize, "stream_idx", streamIdx)
3938
}

tools/stream-generator/generator/kafka.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,23 @@ import (
2020
"github.com/twmb/franz-go/pkg/kgo"
2121
)
2222

23-
func (s *Generator) sendStreamMetadata(ctx context.Context, streamsBatch []distributor.KeyedStream, streamIdx int, batchSize int, tenant string, errCh chan error) {
23+
func (s *Generator) sendStreamMetadata(ctx context.Context, tenant string, batch []distributor.KeyedStream, errCh chan<- error) {
24+
batchSize := len(batch)
25+
2426
client, err := s.getFrontendClient()
2527
if err != nil {
26-
errCh <- fmt.Errorf("failed to get ingest limits frontend client (tenant: %s, stream_idx: %d, batch_size: %d): %w", tenant, streamIdx, batchSize, err)
28+
errCh <- fmt.Errorf("failed to get ingest limits frontend client (tenant: %s, batch_size: %d): %w", tenant, batchSize, err)
2729
return
2830
}
2931

3032
userCtx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, tenant))
3133
if err != nil {
32-
errCh <- fmt.Errorf("failed to inject user context (tenant: %s, stream_idx: %d, batch_size: %d): %w", tenant, streamIdx, batchSize, err)
34+
errCh <- fmt.Errorf("failed to inject user context (tenant: %s, batch_size: %d): %w", tenant, batchSize, err)
3335
return
3436
}
3537

3638
var streamMetadata []*proto.StreamMetadata
37-
for _, stream := range streamsBatch {
39+
for _, stream := range batch {
3840
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
3941
StreamHash: stream.HashKeyNoShard,
4042
})
@@ -47,13 +49,13 @@ func (s *Generator) sendStreamMetadata(ctx context.Context, streamsBatch []distr
4749

4850
// Check if the stream exceeds limits
4951
if client == nil {
50-
errCh <- fmt.Errorf("no ingest limits frontend client (tenant: %s, stream_idx: %d, batch_size: %d)", tenant, streamIdx, batchSize)
52+
errCh <- fmt.Errorf("no ingest limits frontend client (tenant: %s, batch_size: %d)", tenant, batchSize)
5153
return
5254
}
5355

5456
resp, err := client.ExceedsLimits(userCtx, req)
5557
if err != nil {
56-
errCh <- fmt.Errorf("failed to check if stream exceeds limits (tenant: %s, stream_idx: %d, batch_size: %d): %w", tenant, streamIdx, batchSize, err)
58+
errCh <- fmt.Errorf("failed to check if stream exceeds limits (tenant: %s, batch_size: %d): %w", tenant, batchSize, err)
5759
return
5860
}
5961

@@ -69,18 +71,17 @@ func (s *Generator) sendStreamMetadata(ctx context.Context, streamsBatch []distr
6971
results += fmt.Sprintf("%s: %d, ", reason, count)
7072
}
7173

72-
level.Info(s.logger).Log("msg", "Stream exceeds limits", "tenant", tenant, "batch_size", batchSize, "stream_idx", streamIdx, "rejected", results)
74+
level.Info(s.logger).Log("msg", "Stream exceeds limits", "tenant", tenant, "batch_size", batchSize, "rejected", results)
7375
return
7476
case len(resp.Results) == 0:
75-
level.Debug(s.logger).Log("msg", "Stream accepted", "tenant", tenant, "batch_size", batchSize, "stream_idx", streamIdx)
77+
level.Debug(s.logger).Log("msg", "Stream accepted", "tenant", tenant, "batch_size", batchSize)
7678
}
7779

7880
// Send single stream to Kafka
79-
s.sendStreamsToKafka(ctx, streamsBatch, tenant, errCh)
80-
level.Debug(s.logger).Log("msg", "Sent streams to Kafka", "tenant", tenant, "batch_size", batchSize, "stream_idx", streamIdx)
81+
s.sendStreamsToKafka(ctx, batch, tenant, errCh)
8182
}
8283

83-
func (s *Generator) sendStreamsToKafka(ctx context.Context, streams []distributor.KeyedStream, tenant string, errCh chan error) {
84+
func (s *Generator) sendStreamsToKafka(ctx context.Context, streams []distributor.KeyedStream, tenant string, errCh chan<- error) {
8485
for _, stream := range streams {
8586
go func(stream distributor.KeyedStream) {
8687
partitionID := int32(stream.HashKeyNoShard % uint64(s.cfg.NumPartitions))

tools/stream-generator/generator/metrics.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,21 @@ import (
88
)
99

1010
type metrics struct {
11-
activeStreamsTotal *prometheus.CounterVec
12-
kafkaWriteLatency prometheus.Histogram
13-
kafkaWriteBytesTotal prometheus.Counter
11+
streamsCreatedTotal *prometheus.CounterVec
12+
streamsKeepAliveTotal *prometheus.CounterVec
13+
kafkaWriteLatency prometheus.Histogram
14+
kafkaWriteBytesTotal prometheus.Counter
1415
}
1516

1617
func newMetrics(reg prometheus.Registerer) *metrics {
1718
return &metrics{
18-
activeStreamsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
19-
Name: "active_streams_total",
20-
Help: "The total number of active streams",
19+
streamsCreatedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
20+
Name: "streams_created_total",
21+
Help: "The total number of streams create operations per tenant",
22+
}, []string{"tenant"}),
23+
streamsKeepAliveTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
24+
Name: "streams_keep_alive_total",
25+
Help: "The total number of streams keep alive operations per tenant",
2126
}, []string{"tenant"}),
2227
kafkaWriteLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
2328
Name: "kafka_write_latency_seconds",

tools/stream-generator/generator/service.go

Lines changed: 85 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type Generator struct {
4242
// payload
4343
streams map[string][]distributor.KeyedStream
4444

45+
// active streams
46+
activeStreams int
47+
activeStreamsMtx sync.RWMutex
48+
4549
// kafka
4650
writer *client.Producer
4751

@@ -138,7 +142,7 @@ func (s *Generator) starting(ctx context.Context) error {
138142
s.ctx, s.cancel = context.WithCancel(ctx)
139143

140144
// Calculate optimal QPS to match the desired rate
141-
s.cfg.QPSPerTenant = calculateOptimalQPS(s.cfg.DesiredRate, s.cfg.BatchSize, s.logger)
145+
s.cfg.QPSPerTenant = calculateOptimalQPS(s.cfg.DesiredRate, s.cfg.CreateBatchSize, s.logger)
142146
level.Info(s.logger).Log("msg", fmt.Sprintf("Adjusted QPS per tenant to %d to match desired rate of %d bytes/s",
143147
s.cfg.QPSPerTenant, s.cfg.DesiredRate))
144148

@@ -161,53 +165,12 @@ func (s *Generator) running(ctx context.Context) error {
161165
// Create error channel to collect errors from goroutines
162166
errCh := make(chan error, s.cfg.NumTenants)
163167

164-
// Start a goroutine for each tenant
168+
// Start goroutines for each tenant:
169+
// - create: creates new streams in intervals
170+
// - keepAlive: keeps existing streams alive by re-sending them to the backend
165171
for tenant, streams := range s.streams {
166-
s.wg.Add(1)
167-
go func(tenant string, streams []distributor.KeyedStream) {
168-
defer s.wg.Done()
169-
170-
// Create a ticker for rate limiting based on QPSPerTenant
171-
ticker := time.NewTicker(s.cfg.BatchInterval)
172-
defer ticker.Stop()
173-
174-
// Keep track of current stream index and whether we've completed first pass
175-
streamIdx := 0
176-
firstPassComplete := false
177-
178-
for {
179-
select {
180-
case <-ctx.Done():
181-
return
182-
case <-ticker.C:
183-
if streamIdx >= len(streams) {
184-
streamIdx = 0
185-
firstPassComplete = true
186-
}
187-
188-
batchSize := s.cfg.BatchSize
189-
if streamIdx+batchSize > len(streams) {
190-
batchSize = len(streams) - streamIdx
191-
}
192-
193-
streamsBatch := streams[streamIdx : streamIdx+batchSize]
194-
195-
switch s.cfg.PushMode {
196-
case PushStreamMetadataOnly:
197-
s.sendStreamMetadata(ctx, streamsBatch, streamIdx, batchSize, tenant, errCh)
198-
case PushStream:
199-
s.sendStreams(ctx, streamsBatch, streamIdx, batchSize, tenant, errCh)
200-
}
201-
202-
// Only increment during the first pass
203-
if !firstPassComplete {
204-
s.metrics.activeStreamsTotal.WithLabelValues(tenant).Add(float64(batchSize))
205-
}
206-
207-
streamIdx += batchSize
208-
}
209-
}
210-
}(tenant, streams)
172+
go s.create(ctx, tenant, streams, errCh)
173+
go s.keepAlive(ctx, tenant, streams, errCh)
211174
}
212175

213176
// Wait for context cancellation, subservice failure, or tenant error
@@ -245,6 +208,81 @@ func (s *Generator) GetFrontendRing() *ring.Ring {
245208
return s.frontendRing
246209
}
247210

211+
func (s *Generator) create(ctx context.Context, tenant string, streams []distributor.KeyedStream, errCh chan<- error) {
212+
s.wg.Add(1)
213+
defer s.wg.Done()
214+
215+
createT := time.NewTicker(s.cfg.CreateNewStreamsInterval)
216+
total := len(streams)
217+
218+
for {
219+
select {
220+
case <-ctx.Done():
221+
return
222+
case <-createT.C:
223+
func() {
224+
s.activeStreamsMtx.Lock()
225+
defer s.activeStreamsMtx.Unlock()
226+
227+
if s.activeStreams >= total {
228+
createT.Stop()
229+
return
230+
}
231+
232+
batchSize := s.cfg.CreateBatchSize
233+
if s.activeStreams+batchSize > total {
234+
batchSize = total - s.activeStreams
235+
}
236+
237+
batch := streams[s.activeStreams : s.activeStreams+batchSize]
238+
s.pushStreams(ctx, tenant, batch, errCh)
239+
240+
s.metrics.streamsCreatedTotal.WithLabelValues(tenant).Inc()
241+
s.activeStreams += batchSize
242+
}()
243+
}
244+
}
245+
}
246+
247+
func (s *Generator) keepAlive(ctx context.Context, tenant string, streams []distributor.KeyedStream, errCh chan<- error) {
248+
s.wg.Add(1)
249+
defer s.wg.Done()
250+
251+
// Create a aliveT to create new streams in intervals
252+
aliveT := time.NewTicker(time.Second / time.Duration(s.cfg.QPSPerTenant))
253+
defer aliveT.Stop()
254+
255+
for {
256+
select {
257+
case <-ctx.Done():
258+
return
259+
case <-aliveT.C:
260+
func() {
261+
s.activeStreamsMtx.RLock()
262+
defer s.activeStreamsMtx.RUnlock()
263+
264+
if s.activeStreams < s.cfg.CreateBatchSize {
265+
// Skip until first batch is created
266+
return
267+
}
268+
269+
batch := streams[:s.activeStreams-1]
270+
s.pushStreams(ctx, tenant, batch, errCh)
271+
s.metrics.streamsKeepAliveTotal.WithLabelValues(tenant).Inc()
272+
}()
273+
}
274+
}
275+
}
276+
277+
func (s *Generator) pushStreams(ctx context.Context, tenant string, streams []distributor.KeyedStream, errCh chan<- error) {
278+
switch s.cfg.PushMode {
279+
case PushStreamMetadataOnly:
280+
s.sendStreamMetadata(ctx, tenant, streams, errCh)
281+
case PushStream:
282+
s.sendStreams(ctx, tenant, streams, errCh)
283+
}
284+
}
285+
248286
// calculateOptimalQPS calculates the optimal QPS to achieve the desired ingestion rate
249287
func calculateOptimalQPS(desiredRate, batchSize int, logger log.Logger) int {
250288
// Calculate bytes per stream for normal streams

0 commit comments

Comments
 (0)