Skip to content

Commit 4343e5e

Browse files
Merge pull request redpanda-data#24568 from bharathv/lag_metric_changes
datalake/metrics: miscellaneous improvements to lag metrics
2 parents 3c5a1e4 + bef3254 commit 4343e5e

File tree

4 files changed

+94
-37
lines changed

4 files changed

+94
-37
lines changed

src/v/cluster/partition_probe.cc

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ namespace cluster {
2424
static const ss::sstring cluster_metrics_name
2525
= prometheus_sanitize::metrics_name("cluster:partition");
2626

27+
static constexpr int64_t follower_iceberg_lag_metric = 0;
28+
2729
replicated_partition_probe::replicated_partition_probe(
2830
const partition& p) noexcept
2931
: _partition(p) {
@@ -46,6 +48,16 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
4648
setup_public_metrics(ntp);
4749
}
4850

51+
int64_t replicated_partition_probe::iceberg_translation_offset_lag() const {
52+
return _partition.is_leader() ? _iceberg_translation_offset_lag
53+
: follower_iceberg_lag_metric;
54+
}
55+
56+
int64_t replicated_partition_probe::iceberg_commit_offset_lag() const {
57+
return _partition.is_leader() ? _iceberg_commit_offset_lag
58+
: follower_iceberg_lag_metric;
59+
}
60+
4961
void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
5062
namespace sm = ss::metrics;
5163

@@ -168,28 +180,40 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
168180
{sm::shard_label, partition_label});
169181

170182
if (model::is_user_topic(_partition.ntp())) {
183+
// Metrics are reported as follows
184+
// -2 (default initialized state)
185+
// -1 (iceberg disabled state)
186+
// 0 (iceberg enabled but follower replicas)
187+
// <actual lag> leader replicas
171188
_metrics.add_group(
172189
cluster_metrics_name,
173190
{
174191
sm::make_gauge(
175192
"iceberg_offsets_pending_translation",
176193
[this] {
177194
return _partition.log()->config().iceberg_enabled()
178-
? _iceberg_translation_offset_lag
195+
? iceberg_translation_offset_lag()
179196
: metric_feature_disabled_state;
180197
},
181-
sm::description("Total number of offsets that are pending "
182-
"translation to iceberg."),
198+
sm::description(
199+
"Total number of offsets that are pending "
200+
"translation to iceberg. Lag is reported only on leader "
201+
"replicas while followers report 0. -1 is reported if iceberg "
202+
"is disabled while -2 indicates the lag is "
203+
"not yet computed."),
183204
labels),
184205
sm::make_gauge(
185206
"iceberg_offsets_pending_commit",
186207
[this] {
187208
return _partition.log()->config().iceberg_enabled()
188-
? _iceberg_commit_offset_lag
209+
? iceberg_commit_offset_lag()
189210
: metric_feature_disabled_state;
190211
},
191-
sm::description("Total number of offsets that are pending "
192-
"commit to iceberg catalog."),
212+
sm::description(
213+
"Total number of offsets that are pending "
214+
"commit to iceberg catalog. Lag is reported only on leader "
215+
"while followers report 0. -1 is reported if iceberg is "
216+
"disabled while -2 indicates the lag is not yet computed."),
193217
labels),
194218
},
195219
{},

src/v/cluster/partition_probe.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ class replicated_partition_probe : public partition_probe::impl {
109109
void clear_metrics() final;
110110

111111
private:
112+
int64_t iceberg_translation_offset_lag() const;
113+
int64_t iceberg_commit_offset_lag() const;
112114
void reconfigure_metrics();
113115
void setup_public_metrics(const model::ntp&);
114116
void setup_internal_metrics(const model::ntp&);

src/v/datalake/coordinator/types.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ struct fetch_latest_translated_offset_reply
186186
friend std::ostream&
187187
operator<<(std::ostream&, const fetch_latest_translated_offset_reply&);
188188

189-
auto serde_fields() { return std::tie(last_added_offset, errc); }
189+
auto serde_fields() {
190+
return std::tie(last_added_offset, errc, last_iceberg_committed_offset);
191+
}
190192
};
191193

192194
// For a given topic/partition fetches the latest translated offset from

tests/rptest/tests/datalake/datalake_e2e_test.py

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@
2424
from ducktape.utils.util import wait_until
2525
from rptest.services.metrics_check import MetricCheck
2626

27-
NO_SCHEMA_ERRORS = [
28-
r'Must have parsed schema when using structured data mode',
29-
r'Error translating data to binary record'
30-
]
31-
3227

3328
class DatalakeE2ETests(RedpandaTest):
3429
def __init__(self, test_ctx, *args, **kwargs):
@@ -190,45 +185,79 @@ def table_deleted():
190185
dl.produce_to_topic(self.topic_name, 1024, count)
191186
dl.wait_for_translation(self.topic_name, msg_count=count)
192187

193-
@cluster(num_nodes=3, log_allow_list=NO_SCHEMA_ERRORS)
194-
@matrix(cloud_storage_type=supported_storage_types())
195-
def test_metrics(self, cloud_storage_type):
196188

197-
commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit'
198-
translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation'
189+
class DatalakeMetricsTest(RedpandaTest):
190+
191+
commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit'
192+
translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation'
193+
194+
def __init__(self, test_ctx, *args, **kwargs):
195+
super(DatalakeMetricsTest,
196+
self).__init__(test_ctx,
197+
num_brokers=3,
198+
si_settings=SISettings(test_context=test_ctx),
199+
extra_rp_conf={
200+
"iceberg_enabled": "true",
201+
"iceberg_catalog_commit_interval_ms": "5000",
202+
"enable_leader_balancer": False
203+
},
204+
schema_registry_config=SchemaRegistryConfig(),
205+
pandaproxy_config=PandaproxyConfig(),
206+
*args,
207+
**kwargs)
208+
self.test_ctx = test_ctx
209+
self.topic_name = "test"
210+
211+
def setUp(self):
212+
pass
213+
214+
def wait_for_lag(self, metric_check: MetricCheck, metric_name: str,
215+
count: int):
216+
wait_until(
217+
lambda: metric_check.evaluate([(metric_name, lambda _, val: val ==
218+
count)]),
219+
timeout_sec=30,
220+
backoff_sec=5,
221+
err_msg=f"Timed out waiting for {metric_name} to reach: {count}")
222+
223+
@cluster(num_nodes=5)
224+
@matrix(cloud_storage_type=supported_storage_types())
225+
def test_lag_metrics(self, cloud_storage_type):
199226

200227
with DatalakeServices(self.test_ctx,
201228
redpanda=self.redpanda,
202229
filesystem_catalog_mode=False,
203230
include_query_engines=[]) as dl:
204231

205-
dl.create_iceberg_enabled_topic(
206-
self.topic_name,
207-
partitions=1,
208-
replicas=1,
209-
iceberg_mode="value_schema_id_prefix")
232+
# Stop the catalog to halt the translation flow
233+
dl.catalog_service.stop()
234+
235+
dl.create_iceberg_enabled_topic(self.topic_name,
236+
partitions=1,
237+
replicas=3)
238+
topic_leader = self.redpanda.partitions(self.topic_name)[0].leader
210239
count = randint(12, 21)
211-
# Populate schemaless messages in schema-ed mode, this should
212-
# hold up translation and commits
213-
dl.produce_to_topic(self.topic_name, 1024, msg_count=count)
240+
dl.produce_to_topic(self.topic_name, 1, msg_count=count)
214241

215242
m = MetricCheck(self.redpanda.logger,
216243
self.redpanda,
217-
self.redpanda.nodes[0],
218-
[commit_lag, translation_lag],
244+
topic_leader, [
245+
DatalakeMetricsTest.commit_lag,
246+
DatalakeMetricsTest.translation_lag
247+
],
219248
labels={
220249
'namespace': 'kafka',
221250
'topic': self.topic_name,
222251
'partition': '0'
223252
},
224253
reduce=sum)
225-
expectations = []
226-
for metric in [commit_lag, translation_lag]:
227-
expectations.append([metric, lambda _, val: val == count])
228-
229-
# Ensure lag metric builds up as expected.
230-
wait_until(
231-
lambda: m.evaluate(expectations),
232-
timeout_sec=30,
233-
backoff_sec=5,
234-
err_msg=f"Timed out waiting for metrics to reach: {count}")
254+
255+
# Wait for lag build up
256+
self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, count)
257+
self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, count)
258+
259+
# Resume iceberg translation
260+
dl.catalog_service.start()
261+
262+
self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, 0)
263+
self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, 0)

0 commit comments

Comments
 (0)