Skip to content

Commit b23937e

Browse files
authored
resource_control: fetch cpu quota metrics from store instead of prometheus (#49176) (#49256)
close #49174
1 parent 13a9c37 commit b23937e

File tree

3 files changed

+226
-81
lines changed

3 files changed

+226
-81
lines changed

pkg/executor/internal/calibrateresource/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ go_library(
1616
"//pkg/sessionctx",
1717
"//pkg/sessionctx/variable",
1818
"//pkg/sessiontxn/staleread",
19+
"//pkg/util",
1920
"//pkg/util/chunk",
2021
"//pkg/util/mathutil",
2122
"//pkg/util/sqlexec",

pkg/executor/internal/calibrateresource/calibrate_resource.go

Lines changed: 178 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,17 @@
1515
package calibrateresource
1616

1717
import (
18+
"bufio"
1819
"context"
20+
"encoding/base64"
1921
"fmt"
22+
"io"
2023
"math"
24+
"net/http"
25+
"runtime"
2126
"sort"
27+
"strconv"
28+
"strings"
2229
"time"
2330

2431
"github.com/docker/go-units"
@@ -34,6 +41,7 @@ import (
3441
"github.com/pingcap/tidb/pkg/sessionctx"
3542
"github.com/pingcap/tidb/pkg/sessionctx/variable"
3643
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
44+
"github.com/pingcap/tidb/pkg/util"
3745
"github.com/pingcap/tidb/pkg/util/chunk"
3846
"github.com/pingcap/tidb/pkg/util/mathutil"
3947
"github.com/pingcap/tidb/pkg/util/sqlexec"
@@ -82,6 +90,15 @@ var (
8290
}
8391
)
8492

93+
const (
94+
// serverTypeTiDB is tidb's instance type name
95+
serverTypeTiDB = "tidb"
96+
// serverTypeTiKV is tikv's instance type name
97+
serverTypeTiKV = "tikv"
98+
// serverTypeTiFlash is tiflash's instance type name
99+
serverTypeTiFlash = "tiflash"
100+
)
101+
85102
// the resource cost rate of a specified workload per 1 tikv cpu.
86103
type baseResourceCost struct {
87104
// represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu
@@ -236,43 +253,55 @@ func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error {
236253
return nil
237254
}
238255
e.done = true
239-
240-
exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
256+
if !variable.EnableResourceControl.Load() {
257+
return infoschema.ErrResourceGroupSupportDisabled
258+
}
241259
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
242260
if len(e.OptionList) > 0 {
243-
return e.dynamicCalibrate(ctx, req, exec)
261+
return e.dynamicCalibrate(ctx, req)
244262
}
245-
return e.staticCalibrate(ctx, req, exec)
263+
return e.staticCalibrate(req)
246264
}
247265

248266
var (
249267
errLowUsage = errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
250268
errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
251269
)
252270

253-
func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
271+
func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error {
272+
exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
254273
startTs, endTs, err := e.parseCalibrateDuration(ctx)
255274
if err != nil {
256275
return err
257276
}
258-
tidbQuota, err1 := e.getTiDBQuota(ctx, exec, startTs, endTs)
259-
tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, startTs, endTs)
277+
clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx())
278+
if err != nil {
279+
return err
280+
}
281+
tidbQuota, err1 := e.getTiDBQuota(ctx, exec, clusterInfo, startTs, endTs)
282+
tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, clusterInfo, startTs, endTs)
260283
if err1 != nil && err2 != nil {
261284
return err1
262285
}
286+
263287
req.AppendUint64(0, uint64(tidbQuota+tiflashQuota))
264288
return nil
265289
}
266290

267-
func (e *Executor) getTiDBQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) {
291+
func (e *Executor) getTiDBQuota(
292+
ctx context.Context,
293+
exec sqlexec.RestrictedSQLExecutor,
294+
serverInfos []infoschema.ServerInfo,
295+
startTs, endTs time.Time,
296+
) (float64, error) {
268297
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
269298
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
270299

271-
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
300+
totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos)
272301
if err != nil {
273302
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
274303
}
275-
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
304+
totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos)
276305
if err != nil {
277306
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
278307
}
@@ -369,12 +398,17 @@ func setupQuotas(quotas []float64) (float64, error) {
369398
return sum / float64(upperBound-lowerBound), nil
370399
}
371400

372-
func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) {
401+
func (e *Executor) getTiFlashQuota(
402+
ctx context.Context,
403+
exec sqlexec.RestrictedSQLExecutor,
404+
serverInfos []infoschema.ServerInfo,
405+
startTs, endTs time.Time,
406+
) (float64, error) {
373407
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
374408
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
375409

376410
quotas := make([]float64, 0)
377-
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec)
411+
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(serverInfos)
378412
if err != nil {
379413
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
380414
}
@@ -408,25 +442,26 @@ func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedS
408442
return setupQuotas(quotas)
409443
}
410444

411-
func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
412-
if !variable.EnableResourceControl.Load() {
413-
return infoschema.ErrResourceGroupSupportDisabled
414-
}
445+
func (e *Executor) staticCalibrate(req *chunk.Chunk) error {
415446
resourceGroupCtl := domain.GetDomain(e.Ctx()).ResourceGroupsController()
416447
// first fetch the ru settings config.
417448
if resourceGroupCtl == nil {
418449
return errors.New("resource group controller is not initialized")
419450
}
451+
clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx())
452+
if err != nil {
453+
return err
454+
}
420455
ruCfg := resourceGroupCtl.GetConfig()
421456
if e.WorkloadType == ast.TPCH10 {
422-
return staticCalibrateTpch10(ctx, req, exec, ruCfg)
457+
return staticCalibrateTpch10(req, clusterInfo, ruCfg)
423458
}
424459

425-
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
460+
totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo)
426461
if err != nil {
427462
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
428463
}
429-
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
464+
totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo)
430465
if err != nil {
431466
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
432467
}
@@ -440,8 +475,8 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s
440475
return errors.Errorf("unknown workload '%T'", e.WorkloadType)
441476
}
442477

443-
if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
444-
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio
478+
if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
479+
totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio
445480
}
446481
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) +
447482
float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms
@@ -453,14 +488,14 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s
453488
return nil
454489
}
455490

456-
func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor, ruCfg *resourceControlClient.RUConfig) error {
491+
func staticCalibrateTpch10(req *chunk.Chunk, clusterInfo []infoschema.ServerInfo, ruCfg *resourceControlClient.RUConfig) error {
457492
// TPCH10 only considers the resource usage of the TiFlash including cpu and read bytes. Others are ignored.
458493
// cpu usage: 105494.666484 / 20 / 20 = 263.74
459494
// read bytes: 401799161689.0 / 20 / 20 = 1004497904.22
460495
const cpuTimePerCPUPerSec float64 = 263.74
461496
const readBytesPerCPUPerSec float64 = 1004497904.22
462497
ruPerCPU := float64(ruCfg.CPUMsCost)*cpuTimePerCPUPerSec + float64(ruCfg.ReadBytesCost)*readBytesPerCPUPerSec
463-
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec)
498+
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(clusterInfo)
464499
if err != nil {
465500
return err
466501
}
@@ -469,19 +504,39 @@ func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.R
469504
return nil
470505
}
471506

472-
func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
473-
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
474-
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
507+
func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
508+
cpuQuota := float64(runtime.GOMAXPROCS(0))
509+
failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) {
510+
if val != nil {
511+
cpuQuota = float64(val.(int))
512+
}
513+
})
514+
instanceNum := count(clusterInfo, serverTypeTiDB)
515+
return cpuQuota * float64(instanceNum), nil
475516
}
476517

477-
func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
478-
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
479-
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
518+
func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
519+
instanceNum := count(clusterInfo, serverTypeTiKV)
520+
if instanceNum == 0 {
521+
return 0.0, errors.New("no server with type 'tikv' is found")
522+
}
523+
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota")
524+
if err != nil {
525+
return 0.0, err
526+
}
527+
return cpuQuota * float64(instanceNum), nil
480528
}
481529

482-
func getTiFlashLogicalCores(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
483-
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tiflash_cpu_quota GROUP BY time ORDER BY time desc limit 1"
484-
return getNumberFromMetrics(ctx, exec, query, "tiflash_cpu_quota")
530+
func getTiFlashLogicalCores(clusterInfo []infoschema.ServerInfo) (float64, error) {
531+
instanceNum := count(clusterInfo, serverTypeTiFlash)
532+
if instanceNum == 0 {
533+
return 0.0, nil
534+
}
535+
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiFlash, "tiflash_proxy_tikv_server_cpu_cores_quota")
536+
if err != nil {
537+
return 0.0, err
538+
}
539+
return cpuQuota * float64(instanceNum), nil
485540
}
486541

487542
func getTiFlashRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) {
@@ -569,3 +624,94 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql
569624
}
570625
return &timeSeriesValues{idx: 0, vals: ret}, nil
571626
}
627+
628+
func count(clusterInfo []infoschema.ServerInfo, ty string) int {
629+
num := 0
630+
for _, e := range clusterInfo {
631+
if e.ServerType == ty {
632+
num++
633+
}
634+
}
635+
return num
636+
}
637+
638+
func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) {
639+
var cpuQuota float64
640+
err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error {
641+
if resp.StatusCode != http.StatusOK {
642+
return errors.Errorf("request %s failed: %s", addr, resp.Status)
643+
}
644+
scanner := bufio.NewScanner(resp.Body)
645+
for scanner.Scan() {
646+
line := scanner.Text()
647+
if !strings.HasPrefix(line, metricName) {
648+
continue
649+
}
650+
// the metrics format is like following:
651+
// tikv_server_cpu_cores_quota 8
652+
quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64)
653+
if err == nil {
654+
cpuQuota = quota
655+
}
656+
return errors.Trace(err)
657+
}
658+
return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr)
659+
})
660+
return cpuQuota, err
661+
}
662+
663+
func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error {
664+
var firstErr error
665+
for _, srv := range serversInfo {
666+
if srv.ServerType != serverType {
667+
continue
668+
}
669+
if len(srv.StatusAddr) == 0 {
670+
continue
671+
}
672+
url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr)
673+
req, err := http.NewRequest(http.MethodGet, url, nil)
674+
if err != nil {
675+
return err
676+
}
677+
var resp *http.Response
678+
failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) {
679+
if val != nil {
680+
data, _ := base64.StdEncoding.DecodeString(val.(string))
681+
resp = &http.Response{
682+
StatusCode: http.StatusOK,
683+
Body: noopCloserWrapper{
684+
Reader: strings.NewReader(string(data)),
685+
},
686+
}
687+
}
688+
})
689+
if resp == nil {
690+
var err1 error
691+
// ignore false positive go line, can't use defer here because it's in a loop.
692+
//nolint:bodyclose
693+
resp, err1 = util.InternalHTTPClient().Do(req)
694+
if err1 != nil {
695+
if firstErr == nil {
696+
firstErr = err1
697+
}
698+
continue
699+
}
700+
}
701+
err = onResp(srv.Address, resp)
702+
resp.Body.Close()
703+
return err
704+
}
705+
if firstErr == nil {
706+
firstErr = errors.Errorf("no server with type '%s' is found", serverType)
707+
}
708+
return firstErr
709+
}
710+
711+
type noopCloserWrapper struct {
712+
io.Reader
713+
}
714+
715+
func (noopCloserWrapper) Close() error {
716+
return nil
717+
}

0 commit comments

Comments
 (0)