Skip to content

Commit 90a4884

Browse files
committed
tests: added a simple tests that validates consumer offsets cache
Added a test case that validates if `__consumer_offsets` cache is enabled when cluster property changes. Signed-off-by: Michał Maślanka <[email protected]>
1 parent 88e5105 commit 90a4884

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Copyright 2025 Redpanda Data, Inc.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the file licenses/BSL.md
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0
9+
import random
10+
import time
11+
12+
from ducktape.utils.util import wait_until
13+
from rptest.services.cluster import cluster
14+
from rptest.tests.redpanda_test import DefaultClient, RedpandaTest
15+
16+
from rptest.clients.types import TopicSpec
17+
from rptest.clients.kafka_cli_tools import KafkaCliTools
18+
from rptest.clients.kcl import KCL
19+
from confluent_kafka import Consumer, TopicPartition
20+
import re
21+
22+
23+
class ConsumerOffsetsCacheTest(RedpandaTest):
24+
def _consumer_offsets_cache_hit_ratio(self):
25+
cached_read_metrics = self.redpanda.metrics_sample(
26+
"vectorized_storage_log_cached_read_bytes",
27+
nodes=self.redpanda.nodes)
28+
read_metrics = self.redpanda.metrics_sample(
29+
"vectorized_storage_log_read_bytes", nodes=self.redpanda.nodes)
30+
31+
def _sum_metric_value(metrics):
32+
return sum(s.value for s in metrics.samples
33+
if s.labels["topic"] == "__consumer_offsets")
34+
35+
total_read = _sum_metric_value(read_metrics)
36+
cache_read = _sum_metric_value(cached_read_metrics)
37+
self.logger.info(
38+
f"Read from cache: {cache_read}, total read: {total_read}")
39+
self.logger.info(f"Read metrics: {_sum_metric_value(read_metrics)}")
40+
return cache_read / total_read if total_read > 0 else 0
41+
42+
def _commit_random_offsets(self, topic: str, partition: int):
43+
# Create a consumer that will commit random offsets without consuming
44+
consumer = Consumer({
45+
'bootstrap.servers': self.redpanda.brokers(),
46+
'group.id': 'test-consumer-group',
47+
'enable.auto.commit': False,
48+
'auto.offset.reset': 'earliest'
49+
})
50+
51+
# Commit random offsets for the topic partition
52+
try:
53+
partition = 0
54+
for _ in range(100):
55+
# Generate a random offset between 0 and 1000
56+
random_offset = random.randint(0, 1000)
57+
58+
# Commit the random offset
59+
consumer.commit(offsets=[
60+
TopicPartition(topic, partition, offset=random_offset)
61+
],
62+
asynchronous=False)
63+
finally:
64+
consumer.close()
65+
66+
@cluster(num_nodes=3)
67+
def test_enabling_consumer_offsets_cache_test(self):
68+
topic = TopicSpec(name="test-topic",
69+
partition_count=1,
70+
replication_factor=3)
71+
72+
DefaultClient(self.redpanda).create_topic(topic)
73+
self._commit_random_offsets(topic.name, 0)
74+
75+
ratio_no_cache = self._consumer_offsets_cache_hit_ratio()
76+
assert ratio_no_cache == 0.0, "By default consumer offsets cache should not be enabled"
77+
self.redpanda.set_cluster_config(
78+
{
79+
"consumer_offsets_topic_batch_cache_enabled": True,
80+
},
81+
expect_restart=True)
82+
self._commit_random_offsets(topic.name, 0)
83+
hit_ratio = self._consumer_offsets_cache_hit_ratio()
84+
assert hit_ratio > 0.2, "Consumer offsets topic should use cache now"

0 commit comments

Comments
 (0)