Skip to content

Commit e962398

Browse files
authored
feat: Optionally enable kafka histograms for read/write latency (#17089)
1 parent 54ff5dc commit e962398

File tree

10 files changed

+46
-13
lines changed

10 files changed

+46
-13
lines changed

docs/sources/shared/configuration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -826,6 +826,11 @@ kafka_config:
826826
# CLI flag: -kafka.max-consumer-lag-at-startup
827827
[max_consumer_lag_at_startup: <duration> | default = 15s]
828828

829+
# Enable collection of the following kafka latency histograms: read-wait,
830+
# read-timing, write-wait, write-timing
831+
# CLI flag: -kafka.enable-kafka-histograms
832+
[enable_kafka_histograms: <boolean> | default = false]
833+
829834
dataobj:
830835
consumer:
831836
builderconfig:

pkg/blockbuilder/builder/builder.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ func NewBlockBuilder(
139139
logger log.Logger,
140140
registerer prometheus.Registerer,
141141
) (*BlockBuilder,
142-
error) {
142+
error,
143+
) {
143144
decoder, err := kafka.NewDecoder()
144145
if err != nil {
145146
return nil, err
@@ -176,7 +177,7 @@ func (i *BlockBuilder) running(ctx context.Context) error {
176177
errgrp.Go(func() error {
177178
c, err := client.NewReaderClient(
178179
i.kafkaCfg,
179-
client.NewReaderClientMetrics(workerID, i.registerer),
180+
client.NewReaderClientMetrics(workerID, i.registerer, false),
180181
log.With(i.logger, "component", workerID),
181182
)
182183
if err != nil {
@@ -202,7 +203,6 @@ func (i *BlockBuilder) running(ctx context.Context) error {
202203
}
203204
}
204205
}
205-
206206
})
207207
}
208208

@@ -351,7 +351,6 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
351351
"appender",
352352
i.cfg.ConcurrentWriters,
353353
func(ctx context.Context) error {
354-
355354
for {
356355
select {
357356
case <-ctx.Done():

pkg/dataobj/consumer/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore.
6363
kafkaCfg,
6464
partitionRing,
6565
groupName,
66-
client.NewReaderClientMetrics(groupName, reg),
66+
client.NewReaderClientMetrics(groupName, reg, kafkaCfg.EnableKafkaHistograms),
6767
logger,
6868
kgo.InstanceID(instanceID),
6969
kgo.SessionTimeout(3*time.Minute),

pkg/kafka/client/reader_client.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ func NewReaderClient(kafkaCfg kafka.Config, metrics *kprom.Metrics, logger log.L
4343
return client, nil
4444
}
4545

46-
func NewReaderClientMetrics(component string, reg prometheus.Registerer) *kprom.Metrics {
46+
func NewReaderClientMetrics(component string, reg prometheus.Registerer, enableKafkaHistograms bool) *kprom.Metrics {
4747
return kprom.NewMetrics("loki_ingest_storage_reader",
4848
kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"component": component}, reg)),
4949
// Do not export the client ID, because we use it to specify options to the backend.
50-
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
50+
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes),
51+
enableKafkaHistogramMetrics(enableKafkaHistograms),
52+
)
5153
}
5254

5355
// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
@@ -75,3 +77,24 @@ func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg kafka.Config, cl *kgo.
7577

7678
level.Info(logger).Log("msg", "configured Kafka-wide default number of partitions for auto-created topics (num.partitions)", "value", cfg.AutoCreateTopicDefaultPartitions)
7779
}
80+
81+
func enableKafkaHistogramMetrics(enable bool) kprom.Opt {
82+
histogramOpts := []kprom.HistogramOpts{}
83+
if enable {
84+
histogramOpts = append(histogramOpts,
85+
kprom.HistogramOpts{
86+
Enable: kprom.ReadTime,
87+
Buckets: prometheus.DefBuckets,
88+
}, kprom.HistogramOpts{
89+
Enable: kprom.ReadWait,
90+
Buckets: prometheus.DefBuckets,
91+
}, kprom.HistogramOpts{
92+
Enable: kprom.WriteTime,
93+
Buckets: prometheus.DefBuckets,
94+
}, kprom.HistogramOpts{
95+
Enable: kprom.WriteWait,
96+
Buckets: prometheus.DefBuckets,
97+
})
98+
}
99+
return kprom.HistogramsFromOpts(histogramOpts...)
100+
}

pkg/kafka/client/writer_client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ func NewWriterClient(kafkaCfg kafka.Config, maxInflightProduceRequests int, logg
3838
metrics := kprom.NewMetrics(
3939
"", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix.
4040
kprom.Registerer(reg),
41-
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes))
41+
kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes),
42+
enableKafkaHistogramMetrics(kafkaCfg.EnableKafkaHistograms),
43+
)
4244

4345
opts := append(
4446
commonKafkaClientOptions(kafkaCfg, metrics, logger),

pkg/kafka/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type Config struct {
5656
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`
5757

5858
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
59+
60+
EnableKafkaHistograms bool `yaml:"enable_kafka_histograms"`
5961
}
6062

6163
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -85,6 +87,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
8587

8688
consumerLagUsage := fmt.Sprintf("Set -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".max-consumer-lag-at-startup")
8789
f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+consumerLagUsage)
90+
91+
f.BoolVar(&cfg.EnableKafkaHistograms, prefix+".enable-kafka-histograms", false, "Enable collection of the following kafka latency histograms: read-wait, read-timing, write-wait, write-timing")
8892
}
8993

9094
func (cfg *Config) Validate() error {

pkg/kafka/partition/offset_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func NewKafkaOffsetManager(
9292
reg prometheus.Registerer,
9393
) (*KafkaOffsetManager, error) {
9494
// Create a new Kafka client for the partition manager.
95-
clientMetrics := client.NewReaderClientMetrics("partition-manager", reg)
95+
clientMetrics := client.NewReaderClientMetrics("partition-manager", reg, cfg.EnableKafkaHistograms)
9696
c, err := client.NewReaderClient(
9797
cfg,
9898
clientMetrics,

pkg/kafka/partition/reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type ReaderMetrics struct {
5555
kprom *kprom.Metrics
5656
}
5757

58-
func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics {
58+
func NewReaderMetrics(r prometheus.Registerer, enableKafkaHistograms bool) *ReaderMetrics {
5959
return &ReaderMetrics{
6060
consumptionLag: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
6161
Name: "loki_kafka_reader_consumption_lag_seconds",
@@ -84,7 +84,7 @@ func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics {
8484
Name: "loki_kafka_reader_fetches_total",
8585
Help: "Total number of Kafka fetches performed.",
8686
}),
87-
kprom: client.NewReaderClientMetrics("partition-reader", r),
87+
kprom: client.NewReaderClientMetrics("partition-reader", r, enableKafkaHistograms),
8888
}
8989
}
9090

pkg/kafka/partition/reader_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func NewReaderService(
7575
logger log.Logger,
7676
reg prometheus.Registerer,
7777
) (*ReaderService, error) {
78-
readerMetrics := NewReaderMetrics(reg)
78+
readerMetrics := NewReaderMetrics(reg, kafkaCfg.EnableKafkaHistograms)
7979
reader, err := NewKafkaReader(
8080
kafkaCfg,
8181
partitionID,

pkg/limits/ingest_limits.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func NewIngestLimits(cfg Config, logger log.Logger, reg prometheus.Registerer) (
162162
s.lifecyclerWatcher = services.NewFailureWatcher()
163163
s.lifecyclerWatcher.WatchService(s.lifecycler)
164164

165-
metrics := client.NewReaderClientMetrics("ingest-limits", reg)
165+
metrics := client.NewReaderClientMetrics("ingest-limits", reg, cfg.KafkaConfig.EnableKafkaHistograms)
166166

167167
// Create a copy of the config to modify the topic
168168
kCfg := cfg.KafkaConfig

0 commit comments

Comments
 (0)