Skip to content

Commit af97644

Browse files
authored
feat(tools): Add stream generator (#17214)
1 parent 805125c commit af97644

File tree

12 files changed

+1243
-0
lines changed

12 files changed

+1243
-0
lines changed

tools/stream-generator/Dockerfile

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
ARG GO_VERSION=1.24
2+
3+
# Go build stage
4+
FROM golang:${GO_VERSION} AS build
5+
COPY . /src/loki
6+
WORKDIR /src/loki
7+
RUN CGO_ENABLED=0 go build -o stream-generator ./tools/stream-generator/main.go
8+
9+
# Final stage
10+
FROM gcr.io/distroless/static:debug
11+
12+
COPY --from=build /src/loki/stream-generator /usr/bin/stream-generator
13+
14+
SHELL [ "/busybox/sh", "-c" ]
15+
16+
RUN addgroup -g 10001 -S streamgenerator && \
17+
adduser -u 10001 -S streamgenerator -G streamgenerator && \
18+
chown -R streamgenerator:streamgenerator /usr/bin/stream-generator && \
19+
ln -s /busybox/sh /bin/sh
20+
21+
USER 10001
22+
EXPOSE 9090
23+
ENTRYPOINT [ "/usr/bin/stream-generator" ]

tools/stream-generator/README.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# Stream Generator
2+
3+
A utility to generate streams (full or metadata) and send them to loki. This tool is useful for testing and benchmarking Loki's Kafka-based ingestion path.
4+
5+
## Building
6+
7+
From the root of the Loki repository:
8+
9+
```bash
10+
docker build -t grafana/stream-generator -f tools/stream-generator/Dockerfile .
11+
```
12+
13+
## Usage
14+
15+
The stream generator supports various configuration options through command-line flags:
16+
17+
### Basic Configuration
18+
19+
- `--tenants.total`: Number of tenants to generate streams or stream-metadata for (default: 1)
20+
- `--tenants.streams.desired-rate`: Desired ingestion rate in bytes per second (default: 1MB/s)
21+
- `--tenants.streams.total`: Number of streams per tenant (default: 100)
22+
- `--tenants.qps`: Number of queries per second per tenant (default: 10)
23+
- `--http-listen-port`: HTTP server listen address for metrics (default: ":9090")
24+
25+
### Kafka Configuration
26+
27+
- `--kafka.addresses`: The addresses of the Kafka brokers (comma separated) (default: "localhost:9092")
28+
- `--kafka.topic`: The name of the Kafka topic to write to (default: "loki")
29+
- `--kafka.client-id`: The client ID to use when connecting to Kafka (default: "stream-generator")
30+
- `--kafka.version`: The version of the Kafka protocol to use (default: "2.3.0")
31+
- `--kafka.timeout`: The timeout to use when connecting to Kafka (default: "10s")
32+
- `--kafka.sasl.enabled`: Enable SASL authentication (default: false)
33+
- `--kafka.sasl.mechanism`: SASL mechanism to use (default: "PLAIN")
34+
- `--kafka.sasl.username`: SASL username for authentication (default: "")
35+
- `--kafka.sasl.password`: SASL password for authentication (default: "")
36+
37+
### Ring Configuration
38+
39+
- `--ring.store`: The backend storage to use for the ring (supported: consul, etcd, inmemory, memberlist) (default: "inmemory")
40+
- `--ring.replication-factor`: The number of replicas to write to (default: 3)
41+
- `--ring.kvstore.store`: The backing store to use for the KVStore (default: "inmemory")
42+
- `--ring.heartbeat-timeout`: The heartbeat timeout after which ingesters are considered unhealthy (default: "1m")
43+
44+
## Example Usage
45+
46+
### Basic Example
47+
48+
Run with default settings (1 tenant, 100 streams per tenant, 10 QPS):
49+
50+
```bash
51+
docker run -p 9090:9090 grafana/stream-generator
52+
```
53+
54+
### Full Example
55+
56+
Run with custom settings:
57+
58+
```bash
59+
docker run -p 9091:9090 \
60+
grafana/stream-generator \
61+
--tenants.total=5 \
62+
--tenants.streams.total=1000 \
63+
--tenants.qps=100 \
64+
--http-listen-port=3100 \
65+
--kafka.address=kafka-1:9092 \
66+
--kafka.topic=loki \
67+
--kafka.client-id=stream-meta-gen-1 \
68+
--kafka.sasl.enabled=true \
69+
--kafka.sasl.mechanism=PLAIN \
70+
--kafka.sasl.username=loki \
71+
--kafka.sasl.password=secret123 \
72+
--stream-metadata-generator.store=consul \
73+
--stream-metadata-generator.replication-factor=3 \
74+
--stream-metadata-generator.kvstore.consul.hostname=consul:8500
75+
```
76+
77+
### Local Development Example
78+
79+
For local development with the provided docker-compose setup:
80+
81+
```bash
82+
docker run --network=host \
83+
grafana/stream-metadata-generator \
84+
--tenants.total=2 \
85+
--tenants.streams.total=500 \
86+
--tenants.qps=50 \
87+
--kafka.address=localhost:9092 \
88+
--kafka.topic=loki \
89+
--kafka.sasl.enabled=true \
90+
--kafka.sasl.mechanism=PLAIN \
91+
--kafka.sasl.username=loki \
92+
--kafka.sasl.password=secret123 \
93+
--stream-metadata-generator.store=inmemory
94+
```
95+
96+
## Metrics
97+
98+
The generator exposes Prometheus metrics at `/metrics` on port 3100. When running with Docker, make sure to expose this port with the `-p` flag if you need to access the metrics from outside the container.
99+
100+
## Labels
101+
102+
The generator creates streams with the following labels:
103+
- `cluster`
104+
- `namespace`
105+
- `job`
106+
- `instance`
107+
108+
Each label value is generated using a pattern that ensures uniqueness across streams.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Package client provides gRPC client implementation for distributor service.
2+
package client
3+
4+
import (
5+
"flag"
6+
"fmt"
7+
"io"
8+
"time"
9+
10+
"github.com/grafana/dskit/grpcclient"
11+
"github.com/grafana/dskit/middleware"
12+
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
13+
"github.com/opentracing/opentracing-go"
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
16+
"google.golang.org/grpc"
17+
"google.golang.org/grpc/health/grpc_health_v1"
18+
"google.golang.org/grpc/keepalive"
19+
20+
"github.com/grafana/loki/v3/pkg/logproto"
21+
"github.com/grafana/loki/v3/pkg/util/server"
22+
)
23+
24+
const (
25+
GRPCLoadBalancingPolicyRoundRobin = "round_robin"
26+
27+
grpcServiceConfigTemplate = `{"loadBalancingPolicy":"%s"}`
28+
)
29+
30+
var (
31+
requestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
32+
Name: "loki_distributor_client_request_duration_seconds",
33+
Help: "Time spent doing distributor requests.",
34+
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
35+
}, []string{"operation", "status_code"})
36+
)
37+
38+
// Config contains the config for an ingest-limits client.
39+
type Config struct {
40+
Addr string `yaml:"addr"`
41+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
42+
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
43+
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`
44+
45+
// Internal is used to indicate that this client communicates on behalf of
46+
// a machine and not a user. When Internal = true, the client won't attempt
47+
// to inject an userid into the context.
48+
Internal bool `yaml:"-"`
49+
}
50+
51+
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
52+
f.StringVar(&cfg.Addr, prefix+".addr", "", "The address of the distributor. Preferred 'dns:///distributor.namespace.svc.cluster.local:3100'")
53+
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f)
54+
}
55+
56+
// Client is a gRPC client for the distributor.
57+
type Client struct {
58+
logproto.PusherClient
59+
grpc_health_v1.HealthClient
60+
io.Closer
61+
}
62+
63+
// New returns a new Client for the specified ingest-limits.
64+
func New(cfg Config) (*Client, error) {
65+
opts := []grpc.DialOption{
66+
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
67+
}
68+
unaryInterceptors, streamInterceptors := getGRPCInterceptors(&cfg)
69+
dialOpts, err := cfg.GRPCClientConfig.DialOption(unaryInterceptors, streamInterceptors, middleware.NoOpInvalidClusterValidationReporter)
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
serviceConfig := fmt.Sprintf(grpcServiceConfigTemplate, GRPCLoadBalancingPolicyRoundRobin)
75+
76+
dialOpts = append(dialOpts,
77+
grpc.WithBlock(), // nolint:staticcheck // grpc.WithBlock() has been deprecated; we'll address it before upgrading to gRPC 2
78+
grpc.WithKeepaliveParams(
79+
keepalive.ClientParameters{
80+
Time: time.Second * 10,
81+
Timeout: time.Second * 5,
82+
PermitWithoutStream: true,
83+
},
84+
),
85+
grpc.WithDefaultServiceConfig(serviceConfig),
86+
)
87+
88+
opts = append(opts, dialOpts...)
89+
// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
90+
conn, err := grpc.Dial(cfg.Addr, opts...)
91+
if err != nil {
92+
return nil, err
93+
}
94+
return &Client{
95+
PusherClient: logproto.NewPusherClient(conn),
96+
HealthClient: grpc_health_v1.NewHealthClient(conn),
97+
Closer: conn,
98+
}, nil
99+
}
100+
101+
// getInterceptors returns the gRPC interceptors for the given ClientConfig.
102+
func getGRPCInterceptors(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
103+
var (
104+
unaryInterceptors []grpc.UnaryClientInterceptor
105+
streamInterceptors []grpc.StreamClientInterceptor
106+
)
107+
108+
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
109+
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor)
110+
unaryInterceptors = append(unaryInterceptors, server.UnaryClientHTTPHeadersInterceptor)
111+
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
112+
if !cfg.Internal {
113+
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
114+
}
115+
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(requestDuration))
116+
117+
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
118+
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor)
119+
streamInterceptors = append(streamInterceptors, server.StreamClientHTTPHeadersInterceptor)
120+
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
121+
if !cfg.Internal {
122+
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
123+
}
124+
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(requestDuration))
125+
126+
return unaryInterceptors, streamInterceptors
127+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
services:
2+
kafka-ui:
3+
image: provectuslabs/kafka-ui:latest
4+
hostname: kafka-ui
5+
container_name: kafka-ui
6+
ports:
7+
- 8080:8080
8+
environment:
9+
KAFKA_CLUSTERS_0_NAME: local
10+
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
11+
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
12+
depends_on:
13+
- broker
14+
networks:
15+
- loki
16+
extra_hosts:
17+
- "host.docker.internal:host-gateway"
18+
19+
broker:
20+
image: apache/kafka:latest
21+
hostname: broker
22+
container_name: broker
23+
ports:
24+
- 9092:9092
25+
- 29092:29092
26+
- 29093:29093
27+
- 29094:29094
28+
environment:
29+
KAFKA_BROKER_ID: 1
30+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT,STREAMETAGEN:PLAINTEXT
31+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092,STREAMETAGEN://broker:29094
32+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
33+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
34+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
35+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
36+
KAFKA_PROCESS_ROLES: broker,controller
37+
KAFKA_NODE_ID: 1
38+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
39+
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092,STREAMETAGEN://0.0.0.0:29094
40+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
41+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
42+
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
43+
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
44+
networks:
45+
- loki
46+
extra_hosts:
47+
- "host.docker.internal:host-gateway"
48+
49+
loki:
50+
image: theperiklis/loki:feat-usage-tracker-293aed8
51+
hostname: loki
52+
container_name: loki
53+
user: root
54+
ports:
55+
- 3100:3100
56+
- 9096:9096
57+
- 7946:7946
58+
volumes:
59+
- ./loki-local-config.debug.yaml:/etc/loki/local-config.yaml
60+
- ./entrypoint.sh:/entrypoint.sh
61+
- loki-data:/tmp/loki
62+
entrypoint: ["/entrypoint.sh"]
63+
depends_on:
64+
- broker
65+
networks:
66+
- loki
67+
extra_hosts:
68+
- "host.docker.internal:host-gateway"
69+
70+
generator:
71+
image: theperiklis/stream-generator:latest
72+
hostname: generator
73+
container_name: generator
74+
build:
75+
context: ../..
76+
dockerfile: tools/stream-generator/Dockerfile
77+
ports:
78+
- 3101:3100
79+
- 7947:7947
80+
command:
81+
- --log.level=debug
82+
- --tenants.total=2
83+
- --tenants.streams.total=500
84+
- --tenants.qps=50
85+
- --memberlist.bind-port=7947
86+
- --memberlist.advertise-port=7947
87+
- --memberlist.join=loki:7946
88+
- --kafka.address=broker:29094
89+
- --kafka.topic=loki
90+
- --kafka.auto-create-topic-default-partitions=1000
91+
- --stream-generator.store=memberlist
92+
- --stream-generator.push-target=metadata-topic-only
93+
depends_on:
94+
- broker
95+
- loki
96+
networks:
97+
- loki
98+
extra_hosts:
99+
- "host.docker.internal:host-gateway"
100+
101+
networks:
102+
loki:
103+
driver: bridge
104+
105+
volumes:
106+
loki-data:
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/bin/sh
2+
3+
# Create required directories
4+
mkdir -p /tmp/loki/chunks /tmp/loki/rules
5+
6+
# Set proper permissions
7+
chown -R nobody:nobody /tmp/loki
8+
chmod -R 777 /tmp/loki
9+
10+
# Start Loki
11+
exec loki --config.file=/etc/loki/local-config.yaml

0 commit comments

Comments
 (0)