Skip to content

Commit 7665c32

Browse files
committed
k/quotas: throttling on in/out static shard wide TP quotas &ut
Record in and out traffic in quota_manager Store throttle-until time in connection_context (only for shard quotas) Aggregate throttling delays (both requested and enforced) from shard quotas and from per-client quotas Test effective static throughput through one connection
1 parent 447ab4c commit 7665c32

File tree

3 files changed

+222
-20
lines changed

3 files changed

+222
-20
lines changed

src/v/kafka/server/connection_context.cc

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -193,33 +193,70 @@ bool connection_context::is_finished_parsing() const {
193193
return conn->input().eof() || _server.abort_requested();
194194
}
195195

196+
connection_context::delay_t
197+
connection_context::record_tp_and_calculate_throttle(
198+
const request_header& hdr, const size_t request_size) {
199+
using clock = quota_manager::clock;
200+
static_assert(std::is_same_v<clock, delay_t::clock>);
201+
const auto now = clock::now();
202+
203+
// Throttle on client based quotas
204+
quota_manager::throttle_delay client_quota_delay{};
205+
if (hdr.key == fetch_api::key) {
206+
client_quota_delay = _server.quota_mgr().throttle_fetch_tp(
207+
hdr.client_id, now);
208+
} else if (hdr.key == produce_api::key) {
209+
client_quota_delay = _server.quota_mgr().record_produce_tp_and_throttle(
210+
hdr.client_id, request_size, now);
211+
}
212+
213+
// Throttle on shard wide quotas
214+
_server.quota_mgr().record_request_tp(request_size, now);
215+
const quota_manager::shard_delays_t shard_delays
216+
= _server.quota_mgr().get_shard_delays(_throttled_until, now);
217+
218+
// Sum up
219+
const clock::duration delay_enforce = std::max(
220+
shard_delays.enforce, client_quota_delay.enforce_duration());
221+
const clock::duration delay_request = std::max(
222+
{shard_delays.request,
223+
client_quota_delay.duration,
224+
clock::duration::zero()});
225+
if (
226+
delay_enforce != clock::duration::zero()
227+
|| delay_request != clock::duration::zero()) {
228+
vlog(
229+
klog.trace,
230+
"[{}:{}] throttle request:{{shard:{}, client:{}}}, "
231+
"enforce:{{shard:{}, client:{}}}",
232+
_client_addr,
233+
client_port(),
234+
shard_delays.request,
235+
client_quota_delay.duration,
236+
shard_delays.enforce,
237+
client_quota_delay.enforce_duration());
238+
}
239+
return delay_t{.request = delay_request, .enforce = delay_enforce};
240+
}
241+
196242
ss::future<session_resources> connection_context::throttle_request(
197243
const request_header& hdr, size_t request_size) {
198-
// update the throughput tracker for this client using the
199-
// size of the current request and return any computed delay
200-
// to apply for quota throttling.
201-
//
202244
// note that when throttling is first determined, the request is
203245
// allowed to pass through, and only subsequent requests are
204246
// delayed. this is a similar strategy used by kafka 2.0: the
205247
// response is important because it allows clients to
206248
// distinguish throttling delays from real delays. delays
207249
// applied to subsequent messages allow backpressure to take
208250
// affect.
209-
quota_manager::throttle_delay delay{};
210-
if (hdr.key == fetch_api::key) {
211-
delay = _server.quota_mgr().throttle_fetch_tp(hdr.client_id);
212-
} else if (hdr.key == produce_api::key) {
213-
delay = _server.quota_mgr().record_produce_tp_and_throttle(
214-
hdr.client_id, request_size);
215-
}
251+
252+
const delay_t delay = record_tp_and_calculate_throttle(hdr, request_size);
216253
request_data r_data = request_data{
217254
.request_key = hdr.key,
218255
.client_id = ss::sstring{hdr.client_id.value_or("")}};
219256
auto tracker = std::make_unique<request_tracker>(_server.probe());
220257
auto fut = ss::now();
221-
if (delay.enforce && delay.duration > ss::lowres_clock::duration::zero()) {
222-
fut = ss::sleep_abortable(delay.duration, _server.abort_source());
258+
if (delay.enforce > delay_t::clock::duration::zero()) {
259+
fut = ss::sleep_abortable(delay.enforce, _server.abort_source());
223260
}
224261
auto track = track_latency(hdr.key);
225262
return fut
@@ -228,7 +265,7 @@ ss::future<session_resources> connection_context::throttle_request(
228265
})
229266
.then([this,
230267
r_data = std::move(r_data),
231-
delay,
268+
delay = delay.request,
232269
track,
233270
tracker = std::move(tracker)](ssx::semaphore_units units) mutable {
234271
return server().get_request_unit().then(
@@ -240,7 +277,7 @@ ss::future<session_resources> connection_context::throttle_request(
240277
tracker = std::move(tracker)](
241278
ssx::semaphore_units qd_units) mutable {
242279
session_resources r{
243-
.backpressure_delay = delay.duration,
280+
.backpressure_delay = delay,
244281
.memlocks = std::move(mem_units),
245282
.queue_units = std::move(qd_units),
246283
.tracker = std::move(tracker),
@@ -466,6 +503,14 @@ ss::future<> connection_context::maybe_process_responses() {
466503
_server.quota_mgr().record_fetch_tp(
467504
resp_and_res.resources->request_data.client_id, msg.size());
468505
}
506+
// Respose sizes only take effect on throttling at the next request
507+
// processing. The better way was to measure throttle delay right here
508+
// and apply it to the immediate response, but that would require
509+
// drastic changes to kafka message processing framework - because
510+
// throttle_ms has been serialized long ago already. With the current
511+
// approach, egress token bucket level will always be an extra burst
512+
// into the negative while under pressure.
513+
_server.quota_mgr().record_response_tp(msg.size());
469514
try {
470515
return conn->write(std::move(msg))
471516
.then([] {

src/v/kafka/server/connection_context.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,26 @@ class connection_context final
207207
ss::future<ssx::semaphore_units>
208208
reserve_request_units(api_key key, size_t size);
209209

210+
/// Calculated throttle delay pair.
211+
/// \p request is the primary throttle delay that should be applied now.
212+
/// In Kafka 2.0 compliant behaviour, it is only reported to the clients in
213+
/// the throttle_ms field, so that they can do the throttling on client
214+
/// side.
215+
/// \p enforce is the delay value that has not been implemented by the
216+
/// client on the last response, and has to be implemented here in the
217+
/// broker.
218+
struct delay_t {
219+
using clock = ss::lowres_clock;
220+
clock::duration request{};
221+
clock::duration enforce{};
222+
};
223+
224+
/// Update throughput trackers (per-client, per-shard, and whatever are
225+
/// going to emerge) on ingress traffic and claculate aggregated throttle
226+
/// delays from all of them.
227+
delay_t record_tp_and_calculate_throttle(
228+
const request_header& hdr, size_t request_size);
229+
210230
// Apply backpressure sequence, where the request processing may be
211231
// delayed for various reasons, including throttling but also because
212232
// too few server resources are available to accomodate the request
@@ -309,6 +329,7 @@ class connection_context final
309329
ctx_log _authlog;
310330
std::optional<security::tls::mtls_state> _mtls_state;
311331
config::binding<uint32_t> _max_request_size;
332+
ss::lowres_clock::time_point _throttled_until;
312333
};
313334

314335
} // namespace kafka

src/v/kafka/server/tests/produce_consume_test.cc

Lines changed: 141 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,24 @@ struct prod_consume_fixture : public redpanda_thread_fixture {
6666
return res;
6767
}
6868

69-
template<typename T>
70-
ss::future<model::offset> produce(T&& batch_factory) {
69+
ss::future<kafka::produce_response>
70+
produce_raw(std::vector<kafka::produce_request::partition>&& partitions) {
7171
kafka::produce_request::topic tp;
72-
size_t count = random_generators::get_int(1, 20);
73-
tp.partitions = batch_factory(count);
72+
tp.partitions = std::move(partitions);
7473
tp.name = test_topic;
7574
std::vector<kafka::produce_request::topic> topics;
7675
topics.push_back(std::move(tp));
7776
kafka::produce_request req(std::nullopt, 1, std::move(topics));
7877
req.data.timeout_ms = std::chrono::seconds(2);
7978
req.has_idempotent = false;
8079
req.has_transactional = false;
81-
return producer->dispatch(std::move(req))
80+
return producer->dispatch(std::move(req));
81+
}
82+
83+
template<typename T>
84+
ss::future<model::offset> produce(T&& batch_factory) {
85+
const size_t count = random_generators::get_int(1, 20);
86+
return produce_raw(batch_factory(count))
8287
.then([count](kafka::produce_response r) {
8388
return r.data.responses.begin()->partitions.begin()->base_offset
8489
+ model::offset(count - 1);
@@ -179,3 +184,134 @@ FIXTURE_TEST(test_version_handler, prod_consume_fixture) {
179184
.get(),
180185
kafka::client::kafka_request_disconnected_exception);
181186
}
187+
188+
static std::vector<kafka::produce_request::partition>
189+
single_batch(const size_t volume) {
190+
storage::record_batch_builder builder(
191+
model::record_batch_type::raft_data, model::offset(0));
192+
{
193+
const ss::sstring data(volume, 's');
194+
iobuf v{};
195+
v.append(data.data(), data.size());
196+
builder.add_raw_kv(iobuf{}, std::move(v));
197+
}
198+
199+
kafka::produce_request::partition partition;
200+
partition.partition_index = model::partition_id(0);
201+
partition.records.emplace(std::move(builder).build());
202+
203+
std::vector<kafka::produce_request::partition> res;
204+
res.push_back(std::move(partition));
205+
return res;
206+
}
207+
208+
FIXTURE_TEST(test_node_throughput_limits, prod_consume_fixture) {
209+
namespace ch = std::chrono;
210+
211+
// configure
212+
constexpr uint64_t pershard_rate_limit_in = 9_KiB;
213+
constexpr uint64_t pershard_rate_limit_out = 7_KiB;
214+
constexpr auto window_width = 200ms;
215+
constexpr size_t batch_size = 256;
216+
ss::smp::invoke_on_all([&] {
217+
auto& config = config::shard_local_cfg();
218+
config.get("kafka_throughput_limit_node_in_bps")
219+
.set_value(
220+
std::make_optional(pershard_rate_limit_in * ss::smp::count));
221+
config.get("kafka_throughput_limit_node_out_bps")
222+
.set_value(
223+
std::make_optional(pershard_rate_limit_out * ss::smp::count));
224+
config.get("kafka_quota_balancer_window_ms").set_value(window_width);
225+
config.get("fetch_max_bytes").set_value(batch_size);
226+
config.get("max_kafka_throttle_delay_ms").set_value(60'000ms);
227+
}).get0();
228+
wait_for_controller_leadership().get();
229+
start();
230+
231+
// PRODUCE 10 KiB in smaller batches, check throttle but do not honour it,
232+
// check that has to take 1 s
233+
size_t kafka_in_data_len = 0;
234+
{
235+
constexpr size_t kafka_packet_overhead = 127;
236+
const auto batches_cnt = pershard_rate_limit_in
237+
/ (batch_size + kafka_packet_overhead);
238+
ch::steady_clock::time_point start;
239+
ch::milliseconds throttle_time{};
240+
// warmup is the number of iterations enough to exhaust the token bucket
241+
// at least twice
242+
const int warmup
243+
= 2 * pershard_rate_limit_in
244+
* ch::duration_cast<ch::milliseconds>(window_width).count() / 1000
245+
/ (batch_size + kafka_packet_overhead)
246+
+ 1;
247+
for (int k = -warmup; k != batches_cnt; ++k) {
248+
if (k == 0) {
249+
start = ch::steady_clock::now();
250+
throttle_time = {};
251+
}
252+
throttle_time += produce_raw(single_batch(batch_size))
253+
.then([](const kafka::produce_response& r) {
254+
return r.data.throttle_time_ms;
255+
})
256+
.get0();
257+
kafka_in_data_len += batch_size;
258+
}
259+
const auto stop = ch::steady_clock::now();
260+
const auto wire_data_length = (batch_size + kafka_packet_overhead)
261+
* batches_cnt;
262+
const auto time_estimated = ch::milliseconds(
263+
wire_data_length * 1000 / pershard_rate_limit_in);
264+
BOOST_TEST_CHECK(
265+
abs(stop - start - time_estimated) < time_estimated / 25,
266+
"stop-start[" << stop - start << "] == time_estimated["
267+
<< time_estimated << "] ±4%");
268+
}
269+
270+
// CONSUME
271+
size_t kafka_out_data_len = 0;
272+
{
273+
constexpr size_t kafka_packet_overhead = 62;
274+
ch::steady_clock::time_point start;
275+
size_t total_size{};
276+
ch::milliseconds throttle_time{};
277+
const int warmup
278+
= 2 * pershard_rate_limit_out
279+
* ch::duration_cast<ch::milliseconds>(window_width).count() / 1000
280+
/ (batch_size + kafka_packet_overhead)
281+
+ 1;
282+
// consume cannot be measured by the number of fetches because the size
283+
// of fetch payload is up to redpanda, "fetch_max_bytes" is merely a
284+
// guidance. Therefore the consume test runs as long as there is data
285+
// to fetch. We only can consume almost as much as have been produced:
286+
const auto kafka_data_cap = kafka_in_data_len - batch_size * 2;
287+
for (int k = -warmup; kafka_out_data_len < kafka_data_cap; ++k) {
288+
if (k == 0) {
289+
start = ch::steady_clock::now();
290+
total_size = {};
291+
throttle_time = {};
292+
}
293+
const auto fetch_resp = fetch_next().get0();
294+
BOOST_REQUIRE_EQUAL(fetch_resp.data.topics.size(), 1);
295+
BOOST_REQUIRE_EQUAL(fetch_resp.data.topics[0].partitions.size(), 1);
296+
BOOST_TEST_REQUIRE(
297+
fetch_resp.data.topics[0].partitions[0].records.has_value());
298+
const auto kafka_data_len = fetch_resp.data.topics[0]
299+
.partitions[0]
300+
.records.value()
301+
.size_bytes();
302+
total_size += kafka_data_len + kafka_packet_overhead;
303+
throttle_time += fetch_resp.data.throttle_time_ms;
304+
kafka_out_data_len += kafka_data_len;
305+
}
306+
const auto stop = ch::steady_clock::now();
307+
const auto time_estimated = ch::milliseconds(
308+
total_size * 1000 / pershard_rate_limit_out);
309+
BOOST_TEST_CHECK(
310+
abs(stop - start - time_estimated) < time_estimated / 25,
311+
"stop-start[" << stop - start << "] == time_estimated["
312+
<< time_estimated << "] ±4%");
313+
}
314+
315+
// otherwise test is not valid:
316+
BOOST_REQUIRE_GT(kafka_in_data_len, kafka_out_data_len);
317+
}

0 commit comments

Comments
 (0)