@@ -141,6 +141,8 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d
141
141
if unlikely (acquire_num == 0.0 && remaining_ru == 0.0 )
142
142
acquire_num = DEFAULT_BUFFER_TOKENS;
143
143
144
+ // The purpose of subtracting remaining_ru is try to ensure that the number of local tokens
145
+ // always stays same with the amount consumed.
144
146
acquire_num -= remaining_ru;
145
147
acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0 );
146
148
return acquire_num;
@@ -364,11 +366,9 @@ void LocalAdmissionController::mainLoop()
364
366
static_assert (
365
367
tick_interval <= ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL && tick_interval <= DEGRADE_MODE_DURATION
366
368
&& tick_interval <= DEFAULT_TARGET_PERIOD);
367
- auto cur_tick_beg = current_tick;
368
- auto cur_tick_end = cur_tick_beg + tick_interval;
369
+ auto cur_tick_end = current_tick + tick_interval;
369
370
while (!stopped.load ())
370
371
{
371
- if (current_tick < cur_tick_end)
372
372
{
373
373
std::unique_lock<std::mutex> lock (mu);
374
374
if (keyspace_low_token_resource_groups.empty ())
@@ -388,19 +388,16 @@ void LocalAdmissionController::mainLoop()
388
388
try
389
389
{
390
390
while (current_tick >= cur_tick_end)
391
- {
392
- updateRUConsumptionSpeed ();
393
- cur_tick_beg = cur_tick_end;
394
391
cur_tick_end += tick_interval;
395
- }
396
392
393
+ updateRUConsumptionSpeed ();
397
394
if (const auto gac_req_opt = buildGACRequest (/* is_final_report=*/ false ); gac_req_opt.has_value ())
398
395
{
399
396
std::lock_guard lock (gac_requests_mu);
400
397
gac_requests.push_back (gac_req_opt.value ());
401
398
gac_requests_cv.notify_all ();
402
399
}
403
- clearCPUTimeWithoutLock (current_tick);
400
+ clearCPUTime (current_tick);
404
401
checkDegradeMode ();
405
402
}
406
403
catch (...)
@@ -437,13 +434,17 @@ std::optional<resource_manager::TokenBucketsRequest> LocalAdmissionController::b
437
434
else
438
435
{
439
436
std::unordered_set<std::pair<KeyspaceID, std::string>, LACPairHash> local_keyspace_low_token_resource_groups;
437
+ std::unordered_map<std::pair<KeyspaceID, std::string>, ResourceGroupPtr, LACPairHash>
438
+ local_keyspace_resource_groups;
440
439
{
441
440
std::lock_guard lock (mu);
442
441
local_keyspace_low_token_resource_groups = keyspace_low_token_resource_groups;
443
442
keyspace_low_token_resource_groups.clear ();
443
+
444
+ local_keyspace_resource_groups = keyspace_resource_groups;
444
445
}
445
446
446
- for (const auto & ele : keyspace_resource_groups )
447
+ for (const auto & ele : local_keyspace_resource_groups )
447
448
{
448
449
const bool need_fetch_token = local_keyspace_low_token_resource_groups.contains (ele.first );
449
450
const bool need_report = ele.second ->shouldReportRUConsumption (current_tick);
@@ -644,7 +645,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
644
645
645
646
const String err_msg = fmt::format (" handle acquire token resp failed: rg: {}(keyspace={})" , name, keyspace_id);
646
647
// It's possible for one_resp.granted_r_u_tokens() to be empty
647
- // when the acquire_token_req is only for report RU consumption.
648
+ // when the acquire_token_req is only for report RU consumption or GAC got error(like nan token) .
648
649
if (one_resp.granted_r_u_tokens ().empty ())
649
650
{
650
651
resource_group->endRequest ();
@@ -653,6 +654,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
653
654
654
655
if unlikely (one_resp.granted_r_u_tokens ().size () != 1 )
655
656
{
657
+ resource_group->endRequest ();
656
658
LOG_ERROR (
657
659
log,
658
660
" {} unexpected resp.granted_r_u_tokens().size(): {} one_resp: {}" ,
@@ -665,13 +667,15 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
665
667
const resource_manager::GrantedRUTokenBucket & granted_token_bucket = one_resp.granted_r_u_tokens ()[0 ];
666
668
if unlikely (granted_token_bucket.type () != resource_manager::RequestUnitType::RU)
667
669
{
670
+ resource_group->endRequest ();
668
671
LOG_ERROR (log, " {} unexpected request type, one_resp: {}" , err_msg, one_resp.ShortDebugString ());
669
672
continue ;
670
673
}
671
674
672
675
const auto trickle_ms = granted_token_bucket.trickle_time_ms ();
673
676
if unlikely (trickle_ms < 0 )
674
677
{
678
+ resource_group->endRequest ();
675
679
LOG_ERROR (
676
680
log,
677
681
" {} unexpected trickle_ms: {} one_resp: {}" ,
@@ -686,6 +690,7 @@ std::vector<std::pair<KeyspaceID, std::string>> LocalAdmissionController::handle
686
690
double added_tokens = granted_token_bucket.granted_tokens ().tokens ();
687
691
if unlikely (!std::isfinite (added_tokens) || added_tokens < 0.0 )
688
692
{
693
+ resource_group->endRequest ();
689
694
LOG_ERROR (
690
695
log,
691
696
" {} invalid added_tokens: {} one_resp: {}" ,
@@ -858,7 +863,7 @@ bool LocalAdmissionController::handleDeleteEvent(
858
863
std::lock_guard lock (mu);
859
864
erase_num = deleteResourceGroupWithoutLock (keyspace_id, name);
860
865
}
861
- LOG_DEBUG (log, " delete resource group {}(keyspace={}), erase_num: {}" , name, keyspace_id, erase_num);
866
+ LOG_INFO (log, " delete resource group {}(keyspace={}), erase_num: {}" , name, keyspace_id, erase_num);
862
867
return true ;
863
868
}
864
869
@@ -896,7 +901,7 @@ bool LocalAdmissionController::handlePutEvent(
896
901
updateMaxRUPerSecAfterDeleteWithoutLock (rg->user_ru_per_sec );
897
902
}
898
903
}
899
- LOG_DEBUG (log, " modify resource group {}(keyspace={}) to: {}" , name, keyspace_id, group_pb.ShortDebugString ());
904
+ LOG_INFO (log, " modify resource group {}(keyspace={}) to: {}" , name, keyspace_id, group_pb.ShortDebugString ());
900
905
return true ;
901
906
}
902
907
0 commit comments