Skip to content

Commit 2cde9b1

Browse files
fix: skip streams over limits in dry-run mode (#17114)
1 parent 32745ba commit 2cde9b1

File tree

3 files changed

+276
-43
lines changed

3 files changed

+276
-43
lines changed

pkg/distributor/distributor.go

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,7 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
721721
return &logproto.PushResponse{}, validationErr
722722
}
723723

724+
var skipMetadataHashes map[uint64]struct{}
724725
if d.cfg.IngestLimitsEnabled {
725726
streamsAfterLimits, reasonsForHashes, err := d.ingestLimits.enforceLimits(ctx, tenantID, streams)
726727
if err != nil {
@@ -738,6 +739,18 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
738739
streams = streamsAfterLimits
739740
}
740741
}
742+
743+
if len(reasonsForHashes) > 0 && d.cfg.IngestLimitsDryRunEnabled {
744+
// When IngestLimitsDryRunEnabled is true, we need to stop stream hashes
745+
// that exceed the stream limit from being written to the metadata topic.
746+
// If we don't do this, the stream hashes that should have been rejected
747+
// will instead being counted as a known stream, causing a disagreement
748+
// in metrics between the limits service and ingesters.
749+
skipMetadataHashes = make(map[uint64]struct{})
750+
for streamHash := range reasonsForHashes {
751+
skipMetadataHashes[streamHash] = struct{}{}
752+
}
753+
}
741754
}
742755

743756
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
@@ -778,7 +791,7 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
778791
return nil, err
779792
}
780793
// We don't need to create a new context like the ingester writes, because we don't return unless all writes have succeeded.
781-
d.sendStreamsToKafka(ctx, streams, tenantID, &tracker, subring)
794+
d.sendStreamsToKafka(ctx, streams, skipMetadataHashes, tenantID, &tracker, subring)
782795
}
783796

784797
if d.cfg.IngesterEnabled {
@@ -1213,10 +1226,10 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
12131226
return err
12141227
}
12151228

1216-
func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker, subring *ring.PartitionRing) {
1229+
func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, skipMetadataHashes map[uint64]struct{}, tenant string, tracker *pushTracker, subring *ring.PartitionRing) {
12171230
for _, s := range streams {
12181231
go func(s KeyedStream) {
1219-
err := d.sendStreamToKafka(ctx, s, tenant, subring)
1232+
err := d.sendStreamToKafka(ctx, s, skipMetadataHashes, tenant, subring)
12201233
if err != nil {
12211234
err = fmt.Errorf("failed to write stream to kafka: %w", err)
12221235
}
@@ -1225,7 +1238,7 @@ func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStr
12251238
}
12261239
}
12271240

1228-
func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string, subring *ring.PartitionRing) error {
1241+
func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, skipMetadataHashes map[uint64]struct{}, tenant string, subring *ring.PartitionRing) error {
12291242
if len(stream.Stream.Entries) == 0 {
12301243
return nil
12311244
}
@@ -1255,26 +1268,27 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
12551268

12561269
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
12571270

1258-
// However, unlike stream records, the distributor writes stream metadata
1259-
// records to one of a fixed number of partitions, the size of which is
1260-
// determined ahead of time. It does not use a ring. The reason for this
1261-
// is that we want to be able to scale components that consume metadata
1262-
// records independent of ingesters.
1263-
metadataPartitionID := int32(stream.HashKeyNoShard % uint64(d.numMetadataPartitions))
1264-
metadata, err := kafka.EncodeStreamMetadata(
1265-
metadataPartitionID,
1266-
d.cfg.KafkaConfig.Topic,
1267-
tenant,
1268-
stream.HashKeyNoShard,
1269-
entriesSize,
1270-
structuredMetadataSize,
1271-
)
1272-
if err != nil {
1273-
return fmt.Errorf("failed to marshal metadata: %w", err)
1271+
if _, ok := skipMetadataHashes[stream.HashKeyNoShard]; !ok {
1272+
// However, unlike stream records, the distributor writes stream metadata
1273+
// records to one of a fixed number of partitions, the size of which is
1274+
// determined ahead of time. It does not use a ring. The reason for this
1275+
// is that we want to be able to scale components that consume metadata
1276+
// records independent of ingesters.
1277+
metadataPartitionID := int32(stream.HashKeyNoShard % uint64(d.numMetadataPartitions))
1278+
metadata, err := kafka.EncodeStreamMetadata(
1279+
metadataPartitionID,
1280+
d.cfg.KafkaConfig.Topic,
1281+
tenant,
1282+
stream.HashKeyNoShard,
1283+
entriesSize,
1284+
structuredMetadataSize,
1285+
)
1286+
if err != nil {
1287+
return fmt.Errorf("failed to marshal metadata: %w", err)
1288+
}
1289+
records = append(records, metadata)
12741290
}
12751291

1276-
records = append(records, metadata)
1277-
12781292
d.kafkaRecordsPerRequest.Observe(float64(len(records)))
12791293

12801294
produceResults := d.kafkaWriter.ProduceSync(ctx, records)

0 commit comments

Comments
 (0)