Skip to content

Commit 6dcd2fb

Browse files
committed
ct: reorganize cloud_topics::app
This reorganizes cloud_topics::app so that: 1. it is a single non-sharded entity, 2. it includes all cloud_topics-related sharded services Previously, when cloud_topics::app was sharded, #2 was difficult because there are some cloud_topics-related services that need to be sharded, but app itself was sharded in application.cc; having a sharded service own another sharded service seems like a bad idea. In doing this, I needed to wrap the data_plane_api in another class in order to shard it, which I've done with the recently added state_accessors. This lets us continue to use the data_plane_api instead of the data_plane_impl, as we were before, as a sharded service. Using the state_accessors also lets us plumb the data plane API through to cluster::partition and to the partition_proxy without introducing a circular dependency. Previously we plumbed down cloud_topics::app, which made it easy to trip over dependencies.
1 parent 5e8af40 commit 6dcd2fb

File tree

15 files changed

+183
-116
lines changed

15 files changed

+183
-116
lines changed

src/v/cloud_topics/BUILD

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ redpanda_cc_library(
7777
hdrs = [
7878
"app.h",
7979
],
80+
implementation_deps = [
81+
":cluster_services_interface",
82+
":data_plane_impl",
83+
],
8084
include_prefix = "cloud_topics",
8185
visibility = [
8286
"//src/v/cloud_topics/frontend:__pkg__",
@@ -87,11 +91,14 @@ redpanda_cc_library(
8791
],
8892
deps = [
8993
"//src/v/base",
90-
"//src/v/cloud_topics:data_plane_api",
94+
"//src/v/cloud_topics:state_accessors",
9195
"//src/v/cloud_topics/level_one/domain:domain_supervisor",
96+
"//src/v/cloud_topics/level_one/metastore:frontend",
9297
"//src/v/cloud_topics/level_zero/common:extent_meta",
98+
"//src/v/cluster",
9399
"//src/v/container:chunked_vector",
94100
"//src/v/model",
101+
"//src/v/ssx:sharded_service_container",
95102
"//src/v/storage",
96103
"@seastar",
97104
],

src/v/cloud_topics/app.cc

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,91 @@
1010

1111
#include "cloud_topics/app.h"
1212

13+
#include "cloud_topics/cluster_services.h"
14+
#include "cloud_topics/data_plane_impl.h"
15+
#include "cluster/cluster_epoch_service.h"
16+
#include "cluster/controller.h"
17+
#include "ssx/sharded_service_container.h"
18+
1319
#include <seastar/core/coroutine.hh>
1420

1521
namespace experimental::cloud_topics {
1622

17-
app::app(
18-
ss::shared_ptr<data_plane_api> dp,
19-
std::unique_ptr<l1::domain_supervisor> l1_cp)
20-
: _data_plane(std::move(dp))
21-
, _domain_supervisor(std::move(l1_cp)) {}
23+
namespace {
24+
class real_cluster_services
25+
: public experimental::cloud_topics::cluster_services {
26+
public:
27+
explicit real_cluster_services(
28+
ss::sharded<cluster::cluster_epoch_service<>>* epoch_generator)
29+
: _epoch_service(epoch_generator) {}
30+
31+
seastar::future<experimental::cloud_topics::cluster_epoch>
32+
current_epoch(seastar::abort_source* as) override {
33+
std::expected<int64_t, std::error_code> epoch
34+
= co_await _epoch_service->local().get_cached_epoch(as);
35+
if (!epoch) {
36+
throw std::system_error(epoch.error());
37+
}
38+
co_return experimental::cloud_topics::cluster_epoch(epoch.value());
39+
}
40+
41+
private:
42+
ss::sharded<cluster::cluster_epoch_service<>>* _epoch_service;
43+
};
44+
} // namespace
2245

23-
seastar::future<> app::start() {
24-
co_await _domain_supervisor->start();
25-
co_await _data_plane->start();
46+
app::app()
47+
: ssx::sharded_service_container("cloud_topics::app") {}
48+
49+
ss::future<> app::construct(
50+
model::node_id self,
51+
cluster::controller* controller,
52+
ss::sharded<cluster::partition_manager>* partition_mgr,
53+
ss::sharded<cluster::partition_leaders_table>* leaders_table,
54+
ss::sharded<cluster::shard_table>* shard_table,
55+
ss::sharded<cloud_io::remote>* remote,
56+
ss::sharded<cloud_storage::cache>* cloud_cache,
57+
ss::sharded<cluster::metadata_cache>* metadata_cache,
58+
ss::sharded<rpc::connection_cache>* connection_cache,
59+
cloud_storage_clients::bucket_name bucket,
60+
ss::sharded<storage::api>* storage) {
61+
co_await construct_service(
62+
state,
63+
ss::sharded_parameter([remote, cloud_cache, bucket, storage, controller] {
64+
return experimental::cloud_topics::make_data_plane(
65+
remote,
66+
cloud_cache,
67+
bucket,
68+
storage,
69+
std::make_unique<real_cluster_services>(
70+
&controller->get_cluster_epoch_generator()));
71+
}));
72+
co_await construct_service(domain_supervisor, controller);
73+
co_await construct_service(
74+
l1_metastore_fe,
75+
self,
76+
metadata_cache,
77+
leaders_table,
78+
shard_table,
79+
connection_cache,
80+
&domain_supervisor);
2681
}
2782

28-
seastar::future<> app::stop() {
29-
co_await _domain_supervisor->stop();
30-
co_await _data_plane->stop();
83+
ss::future<> app::start() {
84+
co_await state.invoke_on_all([](auto& s) { return s.start(); });
85+
co_await domain_supervisor.invoke_on_all(
86+
[](auto& ds) { return ds.start(); });
3187
}
3288

33-
ss::shared_ptr<data_plane_api> app::get_data_plane_api() { return _data_plane; }
89+
ss::future<> app::stop() {
90+
ssx::sharded_service_container::shutdown();
91+
co_return;
92+
}
3493

35-
l1::domain_supervisor* app::get_l1_domain_supervisor() {
36-
return _domain_supervisor.get();
94+
ss::sharded<l1::frontend>* app::get_sharded_l1_metastore_fe() {
95+
return &l1_metastore_fe;
3796
}
3897

98+
ss::sharded<state_accessors>* app::get_state() { return &state; }
99+
39100
} // namespace experimental::cloud_topics

src/v/cloud_topics/app.h

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,39 +10,64 @@
1010

1111
#pragma once
1212

13-
#include "cloud_topics/data_plane_api.h"
1413
#include "cloud_topics/level_one/domain/domain_supervisor.h"
14+
#include "cloud_topics/level_one/metastore/frontend.h"
15+
#include "cloud_topics/state_accessors.h"
16+
#include "ssx/sharded_service_container.h"
1517

1618
#include <seastar/core/future.hh>
1719
#include <seastar/core/lowres_clock.hh>
1820
#include <seastar/core/sharded.hh>
1921

22+
namespace cloud_io {
23+
class remote;
24+
} // namespace cloud_io
25+
namespace cloud_storage {
26+
class cache;
27+
} // namespace cloud_storage
28+
namespace storage {
29+
class api;
30+
} // namespace storage
31+
2032
namespace experimental::cloud_topics {
2133

22-
// Simple container to use with seastar::sharded.
23-
// The seastar::sharded wants to know the size of the object at compile time.
24-
class app {
34+
class app : public ssx::sharded_service_container {
2535
public:
26-
explicit app(
27-
ss::shared_ptr<data_plane_api>, std::unique_ptr<l1::domain_supervisor>);
36+
explicit app();
2837

2938
app(const app&) = delete;
3039
app& operator=(const app&) = delete;
3140
app(app&&) noexcept = delete;
3241
app& operator=(app&&) noexcept = delete;
3342
~app() = default;
3443

35-
seastar::future<> start();
36-
seastar::future<> stop();
44+
ss::future<> construct(
45+
model::node_id,
46+
cluster::controller*,
47+
ss::sharded<cluster::partition_manager>*,
48+
ss::sharded<cluster::partition_leaders_table>*,
49+
ss::sharded<cluster::shard_table>*,
50+
ss::sharded<cloud_io::remote>*,
51+
ss::sharded<cloud_storage::cache>*,
52+
ss::sharded<cluster::metadata_cache>*,
53+
ss::sharded<rpc::connection_cache>*,
54+
cloud_storage_clients::bucket_name,
55+
ss::sharded<storage::api>*);
56+
57+
ss::future<> start();
58+
59+
// Call stop on each sharded service and call their destructors.
60+
ss::future<> stop();
3761

38-
ss::shared_ptr<data_plane_api> get_data_plane_api();
39-
l1::domain_supervisor* get_l1_domain_supervisor();
62+
ss::sharded<l1::frontend>* get_sharded_l1_metastore_fe();
63+
ss::sharded<state_accessors>* get_state();
4064

4165
// TODO: add 'get_control_plane_api' etc
4266

4367
private:
44-
ss::shared_ptr<data_plane_api> _data_plane;
45-
std::unique_ptr<l1::domain_supervisor> _domain_supervisor;
68+
ss::sharded<state_accessors> state;
69+
ss::sharded<l1::domain_supervisor> domain_supervisor;
70+
ss::sharded<l1::frontend> l1_metastore_fe;
4671
};
4772

4873
} // namespace experimental::cloud_topics

src/v/cloud_topics/level_one/metastore/frontend.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,13 @@ frontend::frontend(
222222
ss::sharded<cluster::partition_leaders_table>* leaders,
223223
ss::sharded<cluster::shard_table>* shards,
224224
ss::sharded<::rpc::connection_cache>* connections,
225-
domain_supervisor* domain_supervisor)
225+
ss::sharded<domain_supervisor>* domain_supervisor)
226226
: _self(self)
227227
, _metadata(metadata)
228228
, _leaders(leaders)
229229
, _shard_table(shards)
230230
, _connection_cache(connections)
231-
, _domain_supervisor(domain_supervisor) {}
231+
, _domain_supervisor(&domain_supervisor->local()) {}
232232

233233
ss::future<> frontend::stop() { return _gate.close(); }
234234

src/v/cloud_topics/level_one/metastore/frontend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class frontend : public ss::peering_sharded_service<frontend> {
5151
ss::sharded<cluster::partition_leaders_table>*,
5252
ss::sharded<cluster::shard_table>*,
5353
ss::sharded<::rpc::connection_cache>*,
54-
domain_supervisor*);
54+
ss::sharded<domain_supervisor>*);
5555

5656
ss::future<> stop();
5757

src/v/cluster/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ redpanda_cc_library(
683683
"//src/v/cloud_storage:topic_mount_manifest_path",
684684
"//src/v/cloud_storage:types",
685685
"//src/v/cloud_storage_clients",
686+
"//src/v/cloud_topics:state_accessors",
686687
"//src/v/cloud_topics/level_zero/stm:ctp_stm",
687688
"//src/v/cloud_topics/level_zero/stm:ctp_stm_api",
688689
"//src/v/cluster_link/model",

src/v/cluster/partition.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ partition::partition(
5353
ss::sharded<features::feature_table>& feature_table,
5454
ss::sharded<archival::upload_housekeeping_service>& upload_hks,
5555
std::optional<cloud_storage_clients::bucket_name> read_replica_bucket,
56-
ss::sharded<experimental::cloud_topics::app>& ct_app)
56+
ss::sharded<experimental::cloud_topics::state_accessors>* ct_state)
5757
: _raft(std::move(r))
58-
, _cloud_topics_app(ct_app)
58+
, _cloud_topics_state(ct_state)
5959
, _probe(std::make_unique<replicated_partition_probe>(*this))
6060
, _feature_table(feature_table)
6161
, _archival_conf(std::move(archival_conf))
@@ -1856,9 +1856,9 @@ ss::future<result<ssx::rwlock_unit>> partition::hold_writes_enabled() {
18561856
co_return *std::move(maybe_units);
18571857
}
18581858

1859-
ss::sharded<experimental::cloud_topics::app>&
1860-
partition::get_cloud_topics_data_api() noexcept {
1861-
return _cloud_topics_app;
1859+
ss::sharded<experimental::cloud_topics::state_accessors>*
1860+
partition::get_cloud_topics_state() noexcept {
1861+
return _cloud_topics_state;
18621862
}
18631863

18641864
} // namespace cluster

src/v/cluster/partition.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
namespace experimental::cloud_topics {
3434
class ctp_stm_api;
35-
class app;
35+
class state_accessors;
3636
}; // namespace experimental::cloud_topics
3737

3838
namespace cluster {
@@ -58,7 +58,7 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
5858
ss::sharded<features::feature_table>&,
5959
ss::sharded<archival::upload_housekeeping_service>&,
6060
std::optional<cloud_storage_clients::bucket_name> read_replica_bucket,
61-
ss::sharded<experimental::cloud_topics::app>& ct_app);
61+
ss::sharded<experimental::cloud_topics::state_accessors>* ct_state);
6262

6363
~partition() = default;
6464

@@ -403,8 +403,8 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
403403

404404
// Returns a pointer to the data plane api if cloud topics is enabled for
405405
// this partition. Otherwise, nullopt is returned
406-
ss::sharded<experimental::cloud_topics::app>&
407-
get_cloud_topics_data_api() noexcept;
406+
ss::sharded<experimental::cloud_topics::state_accessors>*
407+
get_cloud_topics_state() noexcept;
408408

409409
private:
410410
ss::future<>
@@ -432,7 +432,8 @@ class partition : public ss::enable_lw_shared_from_this<partition> {
432432
ss::shared_ptr<archival_metadata_stm> _archival_meta_stm;
433433
ss::shared_ptr<partition_properties_stm> _partition_properties_stm;
434434
ss::shared_ptr<experimental::cloud_topics::ctp_stm_api> _ctp_stm_api;
435-
ss::sharded<experimental::cloud_topics::app>& _cloud_topics_app;
435+
ss::sharded<experimental::cloud_topics::state_accessors>*
436+
_cloud_topics_state;
436437
ss::abort_source _as;
437438
partition_probe _probe;
438439
ss::sharded<features::feature_table>& _feature_table;

src/v/cluster/partition_manager.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ partition_manager::partition_manager(
6060
ss::sharded<features::feature_table>& feature_table,
6161
ss::sharded<archival::upload_housekeeping_service>& upload_hks,
6262
config::binding<std::chrono::milliseconds> partition_shutdown_timeout,
63-
ss::sharded<experimental::cloud_topics::app>& cloud_topics_app)
63+
ss::sharded<experimental::cloud_topics::state_accessors>* cloud_topics_state)
6464
: _storage(storage.local())
6565
, _raft_manager(raft)
6666
, _partition_recovery_mgr(recovery_mgr)
@@ -70,7 +70,7 @@ partition_manager::partition_manager(
7070
, _feature_table(feature_table)
7171
, _upload_hks(upload_hks)
7272
, _partition_shutdown_timeout(std::move(partition_shutdown_timeout))
73-
, _cloud_topics_app(cloud_topics_app) {
73+
, _cloud_topics_state(cloud_topics_state) {
7474
_leader_notify_handle
7575
= _raft_manager.local().register_leadership_notification(
7676
[this](
@@ -302,7 +302,7 @@ ss::future<consensus_ptr> partition_manager::manage(
302302
_feature_table,
303303
_upload_hks,
304304
read_replica_bucket,
305-
_cloud_topics_app);
305+
_cloud_topics_state);
306306

307307
_ntp_table.emplace(log->config().ntp(), p);
308308
_raft_table.emplace(group, p);

src/v/cluster/partition_manager.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
#include <chrono>
3434

3535
namespace experimental::cloud_topics {
36-
class app;
36+
class state_accessors;
3737
}
3838

3939
namespace cluster {
@@ -53,7 +53,7 @@ class partition_manager
5353
ss::sharded<features::feature_table>&,
5454
ss::sharded<archival::upload_housekeeping_service>&,
5555
config::binding<std::chrono::milliseconds>,
56-
ss::sharded<experimental::cloud_topics::app>&);
56+
ss::sharded<experimental::cloud_topics::state_accessors>*);
5757

5858
~partition_manager();
5959

@@ -308,7 +308,8 @@ class partition_manager
308308
state_machine_registry _stm_registry;
309309

310310
// The sharded app may not be initialized if cloud topics isn't enabled.
311-
ss::sharded<experimental::cloud_topics::app>& _cloud_topics_app;
311+
ss::sharded<experimental::cloud_topics::state_accessors>*
312+
_cloud_topics_state;
312313

313314
friend std::ostream& operator<<(std::ostream&, const partition_manager&);
314315
friend std::ostream& operator<<(

0 commit comments

Comments
 (0)