Skip to content

Commit ec2cbbc

Browse files
authored
[enhance](RateLimit) Add bvar to monitor object storage rate limit sleep time and failure time (#38294)
Add one bvar to monitor the failed request due to exceeds rate limit.
1 parent 571eafb commit ec2cbbc

File tree

11 files changed

+275
-115
lines changed

11 files changed

+275
-115
lines changed

be/src/io/fs/azure_obj_storage_client.cpp

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "common/logging.h"
4343
#include "common/status.h"
4444
#include "io/fs/obj_storage_client.h"
45+
#include "util/bvar_helper.h"
4546
#include "util/s3_util.h"
4647

4748
using namespace Azure::Storage::Blobs;
@@ -57,6 +58,28 @@ auto base64_encode_part_num(int part_num) {
5758
{reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
5859
}
5960

61+
template <typename Func>
62+
auto s3_rate_limit(doris::S3RateLimitType op, Func callback) -> decltype(callback()) {
63+
if (!doris::config::enable_s3_rate_limiter) {
64+
return callback();
65+
}
66+
auto sleep_duration = doris::S3ClientFactory::instance().rate_limiter(op)->add(1);
67+
if (sleep_duration < 0) {
68+
throw std::runtime_error("Azure exceeds request limit");
69+
}
70+
return callback();
71+
}
72+
73+
template <typename Func>
74+
auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
75+
return s3_rate_limit(doris::S3RateLimitType::GET, std::move(callback));
76+
}
77+
78+
template <typename Func>
79+
auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
80+
return s3_rate_limit(doris::S3RateLimitType::PUT, std::move(callback));
81+
}
82+
6083
constexpr char SAS_TOKEN_URL_TEMPLATE[] = "https://{}.blob.core.windows.net/{}/{}{}";
6184
constexpr char BlobNotFound[] = "BlobNotFound";
6285
} // namespace
@@ -101,7 +124,14 @@ struct AzureBatchDeleter {
101124
if (deferred_resps.empty()) {
102125
return ObjectStorageResponse::OK();
103126
}
104-
auto resp = do_azure_client_call([&]() { _client->SubmitBatch(_batch); }, _opts);
127+
auto resp = do_azure_client_call(
128+
[&]() {
129+
s3_put_rate_limit([&]() {
130+
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
131+
_client->SubmitBatch(_batch);
132+
});
133+
},
134+
_opts);
105135
if (resp.status.code != ErrorCode::OK) {
106136
return resp;
107137
}
@@ -156,7 +186,11 @@ ObjectStorageResponse AzureObjStorageClient::put_object(const ObjectStoragePathO
156186
auto client = _client->GetBlockBlobClient(opts.key);
157187
return do_azure_client_call(
158188
[&]() {
159-
client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), stream.size());
189+
s3_put_rate_limit([&]() {
190+
SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
191+
client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()),
192+
stream.size());
193+
});
160194
},
161195
opts);
162196
}
@@ -169,7 +203,10 @@ ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const ObjectStora
169203
Azure::Core::IO::MemoryBodyStream memory_body(
170204
reinterpret_cast<const uint8_t*>(stream.data()), stream.size());
171205
// The blockId must be base64 encoded
172-
client.StageBlock(base64_encode_part_num(part_num), memory_body);
206+
s3_put_rate_limit([&]() {
207+
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
208+
client.StageBlock(base64_encode_part_num(part_num), memory_body);
209+
});
173210
} catch (Azure::Core::RequestFailedException& e) {
174211
auto msg = fmt::format(
175212
"Azure request failed because {}, error msg {}, http code {}, path msg {}",
@@ -200,13 +237,22 @@ ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(
200237
std::ranges::transform(
201238
completed_parts, std::back_inserter(string_block_ids),
202239
[](const ObjectCompleteMultiPart& i) { return base64_encode_part_num(i.part_num); });
203-
return do_azure_client_call([&]() { client.CommitBlockList(string_block_ids); }, opts);
240+
return do_azure_client_call(
241+
[&]() {
242+
s3_put_rate_limit([&]() {
243+
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
244+
client.CommitBlockList(string_block_ids);
245+
});
246+
},
247+
opts);
204248
}
205249

206250
ObjectStorageHeadResponse AzureObjStorageClient::head_object(const ObjectStoragePathOptions& opts) {
207251
try {
208-
Models::BlobProperties properties =
209-
_client->GetBlockBlobClient(opts.key).GetProperties().Value;
252+
Models::BlobProperties properties = s3_get_rate_limit([&]() {
253+
SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
254+
return _client->GetBlockBlobClient(opts.key).GetProperties().Value;
255+
});
210256
return {.file_size = properties.BlobSize};
211257
} catch (Azure::Core::RequestFailedException& e) {
212258
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
@@ -238,8 +284,11 @@ ObjectStorageResponse AzureObjStorageClient::get_object(const ObjectStoragePathO
238284
DownloadBlobToOptions download_opts;
239285
Azure::Core::Http::HttpRange range {static_cast<int64_t>(offset), bytes_read};
240286
download_opts.Range = range;
241-
auto resp = client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read,
242-
download_opts);
287+
auto resp = s3_get_rate_limit([&]() {
288+
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
289+
return client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read,
290+
download_opts);
291+
});
243292
*size_return = resp.Value.ContentRange.Length.Value();
244293
},
245294
opts);
@@ -257,11 +306,17 @@ ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePat
257306
[&]() {
258307
ListBlobsOptions list_opts;
259308
list_opts.Prefix = opts.prefix;
260-
auto resp = _client->ListBlobs(list_opts);
309+
auto resp = s3_get_rate_limit([&]() {
310+
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
311+
return _client->ListBlobs(list_opts);
312+
});
261313
get_file_file(resp);
262314
while (!resp.NextPageToken->empty()) {
263315
list_opts.ContinuationToken = resp.NextPageToken;
264-
resp = _client->ListBlobs(list_opts);
316+
resp = s3_get_rate_limit([&]() {
317+
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
318+
return _client->ListBlobs(list_opts);
319+
});
265320
get_file_file(resp);
266321
}
267322
},
@@ -297,7 +352,10 @@ ObjectStorageResponse AzureObjStorageClient::delete_objects(const ObjectStorageP
297352
ObjectStorageResponse AzureObjStorageClient::delete_object(const ObjectStoragePathOptions& opts) {
298353
return do_azure_client_call(
299354
[&]() {
300-
auto resp = _client->DeleteBlob(opts.key);
355+
auto resp = s3_put_rate_limit([&]() {
356+
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
357+
return _client->DeleteBlob(opts.key);
358+
});
301359
if (!resp.Value.Deleted) {
302360
throw Exception(Status::IOError<false>("Delete azure blob failed"));
303361
}
@@ -321,14 +379,20 @@ ObjectStorageResponse AzureObjStorageClient::delete_objects_recursively(
321379
}
322380
return ObjectStorageResponse::OK();
323381
};
324-
auto resp = _client->ListBlobs(list_opts);
382+
auto resp = s3_get_rate_limit([&]() {
383+
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
384+
return _client->ListBlobs(list_opts);
385+
});
325386
if (auto response = delete_func(resp.Blobs); response.status.code != ErrorCode::OK) {
326387
return response;
327388
}
328389

329390
while (!resp.NextPageToken->empty()) {
330391
list_opts.ContinuationToken = resp.NextPageToken;
331-
resp = _client->ListBlobs(list_opts);
392+
resp = s3_get_rate_limit([&]() {
393+
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
394+
return _client->ListBlobs(list_opts);
395+
});
332396

333397
if (auto response = delete_func(resp.Blobs); response.status.code != ErrorCode::OK) {
334398
return response;

be/src/io/fs/s3_file_reader.cpp

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,6 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
120120
if (!client) {
121121
return Status::InternalError("init s3 client error");
122122
}
123-
// // clang-format off
124-
// auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
125-
// to, offset, bytes_req, bytes_read);
126-
// // clang-format on
127-
// if (resp.status.code != ErrorCode::OK) {
128-
// return std::move(Status(resp.status.code, std::move(resp.status.msg))
129-
// .append(fmt::format("failed to read from {}", _path.native())));
130-
// }
131-
// if (*bytes_read != bytes_req) {
132-
// return Status::InternalError("failed to read from {}(bytes read: {}, bytes req: {})",
133-
// _path.native(), *bytes_read, bytes_req);
134-
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
135123

136124
int retry_count = 0;
137125
const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds

be/src/io/fs/s3_obj_storage_client.cpp

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,35 @@
7171
#include "io/fs/s3_common.h"
7272
#include "util/bvar_helper.h"
7373

74+
namespace {
75+
inline ::Aws::Client::AWSError<::Aws::S3::S3Errors> s3_error_factory() {
76+
return {::Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds limit", false};
77+
}
78+
79+
template <typename Func>
80+
auto s3_rate_limit(doris::S3RateLimitType op, Func callback) -> decltype(callback()) {
81+
using T = decltype(callback());
82+
if (!doris::config::enable_s3_rate_limiter) {
83+
return callback();
84+
}
85+
auto sleep_duration = doris::S3ClientFactory::instance().rate_limiter(op)->add(1);
86+
if (sleep_duration < 0) {
87+
return T(s3_error_factory());
88+
}
89+
return callback();
90+
}
91+
92+
template <typename Func>
93+
auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
94+
return s3_rate_limit(doris::S3RateLimitType::GET, std::move(callback));
95+
}
96+
97+
template <typename Func>
98+
auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
99+
return s3_rate_limit(doris::S3RateLimitType::PUT, std::move(callback));
100+
}
101+
} // namespace
102+
74103
namespace Aws::S3::Model {
75104
class DeleteObjectRequest;
76105
} // namespace Aws::S3::Model
@@ -92,9 +121,9 @@ ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload(
92121
create_request.SetContentType("application/octet-stream");
93122

94123
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
95-
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(_client->CreateMultipartUpload(create_request),
96-
"s3_file_writer::create_multi_part_upload",
97-
std::cref(create_request).get());
124+
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
125+
s3_put_rate_limit([&]() { return _client->CreateMultipartUpload(create_request); }),
126+
"s3_file_writer::create_multi_part_upload", std::cref(create_request).get());
98127
SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome);
99128

100129
if (outcome.IsSuccess()) {
@@ -122,9 +151,9 @@ ObjectStorageResponse S3ObjStorageClient::put_object(const ObjectStoragePathOpti
122151
request.SetContentLength(stream.size());
123152
request.SetContentType("application/octet-stream");
124153
SCOPED_BVAR_LATENCY(s3_bvar::s3_put_latency);
125-
auto response =
126-
SYNC_POINT_HOOK_RETURN_VALUE(_client->PutObject(request), "s3_file_writer::put_object",
127-
std::cref(request).get(), &stream);
154+
auto response = SYNC_POINT_HOOK_RETURN_VALUE(
155+
s3_put_rate_limit([&]() { return _client->PutObject(request); }),
156+
"s3_file_writer::put_object", std::cref(request).get(), &stream);
128157
if (!response.IsSuccess()) {
129158
auto st = s3fs_error(response.GetError(),
130159
fmt::format("failed to put object {}", opts.path.native()));
@@ -157,8 +186,8 @@ ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const ObjectStorageP
157186
{
158187
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
159188
upload_part_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
160-
_client->UploadPart(upload_request), "s3_file_writer::upload_part",
161-
std::cref(upload_request).get(), &stream);
189+
s3_put_rate_limit([&]() { return _client->UploadPart(upload_request); }),
190+
"s3_file_writer::upload_part", std::cref(upload_request).get(), &stream);
162191
}
163192
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_upload_one_part", &upload_part_outcome);
164193
if (!upload_part_outcome.IsSuccess()) {
@@ -199,7 +228,7 @@ ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
199228
TEST_SYNC_POINT_RETURN_WITH_VALUE("S3FileWriter::_complete:3", ObjectStorageResponse(), this);
200229
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
201230
auto complete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
202-
_client->CompleteMultipartUpload(complete_request),
231+
s3_put_rate_limit([&]() { return _client->CompleteMultipartUpload(complete_request); }),
203232
"s3_file_writer::complete_multi_part", std::cref(complete_request).get());
204233

205234
if (!complete_outcome.IsSuccess()) {
@@ -220,7 +249,8 @@ ObjectStorageHeadResponse S3ObjStorageClient::head_object(const ObjectStoragePat
220249

221250
SCOPED_BVAR_LATENCY(s3_bvar::s3_head_latency);
222251
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
223-
_client->HeadObject(request), "s3_file_system::head_object", std::ref(request).get());
252+
s3_get_rate_limit([&]() { return _client->HeadObject(request); }),
253+
"s3_file_system::head_object", std::ref(request).get());
224254
if (outcome.IsSuccess()) {
225255
return {.resp = {convert_to_obj_response(Status::OK())},
226256
.file_size = outcome.GetResult().GetContentLength()};
@@ -247,7 +277,7 @@ ObjectStorageResponse S3ObjStorageClient::get_object(const ObjectStoragePathOpti
247277
request.SetResponseStreamFactory(AwsWriteableStreamFactory(buffer, bytes_read));
248278

249279
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
250-
auto outcome = _client->GetObject(request);
280+
auto outcome = s3_get_rate_limit([&]() { return _client->GetObject(request); });
251281
if (!outcome.IsSuccess()) {
252282
return {convert_to_obj_response(
253283
s3fs_error(outcome.GetError(),
@@ -273,7 +303,7 @@ ObjectStorageResponse S3ObjStorageClient::list_objects(const ObjectStoragePathOp
273303
Aws::S3::Model::ListObjectsV2Outcome outcome;
274304
{
275305
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
276-
outcome = _client->ListObjectsV2(request);
306+
outcome = s3_get_rate_limit([&]() { return _client->ListObjectsV2(request); });
277307
}
278308
if (!outcome.IsSuccess()) {
279309
files->clear();
@@ -310,8 +340,9 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects(const ObjectStoragePath
310340
});
311341
del.WithObjects(std::move(objects)).SetQuiet(true);
312342
delete_request.SetDelete(std::move(del));
313-
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
314-
auto delete_outcome = _client->DeleteObjects(delete_request);
343+
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
344+
auto delete_outcome =
345+
s3_put_rate_limit([&]() { return _client->DeleteObjects(delete_request); });
315346
if (!delete_outcome.IsSuccess()) {
316347
return {convert_to_obj_response(
317348
s3fs_error(delete_outcome.GetError(),
@@ -331,8 +362,8 @@ ObjectStorageResponse S3ObjStorageClient::delete_object(const ObjectStoragePathO
331362
Aws::S3::Model::DeleteObjectRequest request;
332363
request.WithBucket(opts.bucket).WithKey(opts.key);
333364

334-
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
335-
auto outcome = _client->DeleteObject(request);
365+
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_object_latency);
366+
auto outcome = s3_put_rate_limit([&]() { return _client->DeleteObject(request); });
336367
if (outcome.IsSuccess() ||
337368
outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
338369
return ObjectStorageResponse::OK();
@@ -354,7 +385,7 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively(
354385
Aws::S3::Model::ListObjectsV2Outcome outcome;
355386
{
356387
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
357-
outcome = _client->ListObjectsV2(request);
388+
outcome = s3_get_rate_limit([&]() { return _client->ListObjectsV2(request); });
358389
}
359390
if (!outcome.IsSuccess()) {
360391
return {convert_to_obj_response(s3fs_error(
@@ -373,8 +404,9 @@ ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively(
373404
Aws::S3::Model::Delete del;
374405
del.WithObjects(std::move(objects)).SetQuiet(true);
375406
delete_request.SetDelete(std::move(del));
376-
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
377-
auto delete_outcome = _client->DeleteObjects(delete_request);
407+
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_objects_latency);
408+
auto delete_outcome =
409+
s3_put_rate_limit([&]() { return _client->DeleteObjects(delete_request); });
378410
if (!delete_outcome.IsSuccess()) {
379411
return {convert_to_obj_response(
380412
s3fs_error(delete_outcome.GetError(),

0 commit comments

Comments
 (0)