Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster_link/deps.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class consumer_groups_router {

virtual ss::future<kafka::offset_commit_response>
offset_commit(kafka::offset_commit_request) = 0;

virtual ss::future<bool> assure_topic_exists() = 0;
};

} // namespace cluster_link
5 changes: 5 additions & 0 deletions src/v/cluster_link/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ manager::upsert_cluster_link(model::metadata md) {
auto name = md.name;
vlog(cllog.info, "Attempting to create cluster link named '{}'", md.name);
vlog(cllog.trace, "Cluster link metadata: {}", md);
const auto needs_consumer_offsets_topic
= md.configuration.consumer_groups_mirroring_cfg.is_enabled;
auto ec = co_await _registry->upsert_link(
std::move(md), ::model::timeout_clock::now() + 30s);
auto err = map_cluster_errc(ec);
Expand All @@ -145,6 +147,9 @@ manager::upsert_cluster_link(model::metadata md) {
}

try {
if (needs_consumer_offsets_topic) {
co_await _group_router->assure_topic_exists();
}
co_await _link_created_cv.wait(
wait_for_link_creation_timeout, [this, name] {
return _registry->find_link_by_name(name).has_value();
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster_link/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ class kafka_consumer_groups_router : public consumer_groups_router {
co_return result.get();
}

ss::future<bool> assure_topic_exists() final {
return _router->local().group_initializer().assure_topic_exists(false);
}

private:
ss::sharded<kafka::group_router>* _router;
};
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster_link/tests/deps.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ struct test_consumer_group_router : public consumer_groups_router {
ss::future<kafka::offset_commit_response>
offset_commit(kafka::offset_commit_request) override;

ss::future<bool> assure_topic_exists() override { co_return true; }

chunked_hash_map<kafka::group_id, group_state> groups;

int partition_count = 1;
Expand Down
20 changes: 20 additions & 0 deletions src/v/redpanda/admin/services/shadow_link/converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ create_topic_metadata_mirroring_config(
return config;
}

cluster_link::model::consumer_groups_mirroring_config
create_consumer_groups_mirroring_config(
const proto::admin::consumer_offset_sync_options& options) {
cluster_link::model::consumer_groups_mirroring_config config;

if (options.get_interval() > absl::ZeroDuration()) {
config.task_interval = absl::ToChronoNanoseconds(
options.get_interval());
}

config.filters = to_filter_patterns(options.get_group_filters());

return config;
}
cluster_link::model::link_configuration
create_link_configuration(const create_shadow_link_request& req) {
cluster_link::model::link_configuration config;
Expand All @@ -115,6 +129,12 @@ create_link_configuration(const create_shadow_link_request& req) {
.get_configurations()
.get_topic_metadata_sync_options());

config.consumer_groups_mirroring_cfg
= create_consumer_groups_mirroring_config(
req.get_shadow_link()
.get_configurations()
.get_consumer_offset_sync_options());

return config;
}

Expand Down
66 changes: 65 additions & 1 deletion tests/rptest/tests/cluster_linking_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
shadow_link_pb2,
shadow_link_pb2_connect,
)
from rptest.clients.types import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.multi_cluster_services import (
Cluster,
Expand All @@ -22,7 +23,12 @@
)
from rptest.tests.cluster_linking_test_base import ShadowLinkTestBase
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import expect_exception, wait_until_result
from rptest.util import expect_exception, wait_until, wait_until_result
from rptest.services.kgo_verifier_services import (
KgoVerifierProducer,
KgoVerifierConsumerGroupConsumer,
)
from rptest.clients.rpk import RpkTool


class MultiClusterTestBase(RedpandaTest):
Expand Down Expand Up @@ -180,3 +186,61 @@ def test_can_not_create_more_than_one_link(self):
assert e.code == ConnectErrorCode.RESOURCE_EXHAUSTED, (
f"Expected {ConnectErrorCode.RESOURCE_EXHAUSTED}, got {e.code}"
)

def create_source_consumer(self, topic, group_name="test_group", consumer_count=1):
return KgoVerifierConsumerGroupConsumer(
self.test_context,
self.source_cluster.service,
topic=topic,
group_name=group_name,
msg_size=128,
readers=consumer_count,
)

@cluster(num_nodes=7)
def test_consumer_groups_mirroring(self):
# Create a shadow link

topic = TopicSpec(name="source-topic", partition_count=5, replication_factor=3)

self.source_default_client().create_topic(topic)
# produce some data to the source cluster

KgoVerifierProducer.oneshot(
self.test_context, self.source_cluster.service, topic.name, 128, 10000
)

consumer = self.create_source_consumer(
topic=topic.name, group_name="test_group", consumer_count=1
)
consumer.start()
consumer.wait()
consumer.stop()
source_rpk = RpkTool(self.source_cluster.service)
description = source_rpk.group_describe(group="test_group")
self.logger.info(f">>> source_state: {description}")

self.create_link("test-link")

def _group_present_in_target_cluster():
target_rpk = RpkTool(self.target_cluster.service)
groups = target_rpk.group_list()

if not any(g.group == "test_group" for g in groups):
return False, None

desc = target_rpk.group_describe(
group="test_group", tolerant=True, summary=False
)

return True, desc

target_cluster_group = wait_until_result(
lambda: _group_present_in_target_cluster(),
timeout_sec=20,
err_msg="Failed to find consumer group in the target cluster",
)

assert target_cluster_group.state == "Empty", (
"Group test_group state expected to be empty on target cluster"
)
35 changes: 30 additions & 5 deletions tests/rptest/tests/cluster_linking_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,43 @@ def target_cluster_service(self) -> RedpandaService:
def target_cluster(self) -> RedpandaCluster:
return self.services.primary

def create_link(self, link_name: str):
topic_sync_options = shadow_link_pb2.TopicMetadataSyncOptions(
interval=google.protobuf.duration_pb2.Duration(seconds=1)
)

def create_link(
self,
link_name: str,
mirror_all_topics: bool = True,
mirror_all_groups: bool = True,
):
if mirror_all_topics:
topic_sync_options = shadow_link_pb2.TopicMetadataSyncOptions(
interval=google.protobuf.duration_pb2.Duration(seconds=1),
topic_filters=[
shadow_link_pb2.NameFilter(
pattern_type=shadow_link_pb2.PATTERN_TYPE_LITERAL,
filter_type=shadow_link_pb2.FILTER_TYPE_INCLUDE,
name="*",
)
],
)

if mirror_all_groups:
group_sync_options = shadow_link_pb2.ConsumerOffsetSyncOptions(
interval=google.protobuf.duration_pb2.Duration(seconds=1),
group_filters=[
shadow_link_pb2.NameFilter(
pattern_type=shadow_link_pb2.PATTERN_TYPE_LITERAL,
filter_type=shadow_link_pb2.FILTER_TYPE_INCLUDE,
name="*",
)
],
)
client_options = shadow_link_pb2.ShadowLinkClientOptions(
bootstrap_servers=self.source_cluster.service.brokers_list()
)

link_cfg = shadow_link_pb2.ShadowLinkConfigurations(
client_options=client_options,
topic_metadata_sync_options=topic_sync_options,
consumer_offset_sync_options=group_sync_options,
)

link_resource = shadow_link_pb2.ShadowLink(configurations=link_cfg)
Expand Down