Skip to content

Commit 972e642

Browse files
authored
server/handler: register missing http handlers related to ingest param (#63099) (#63121)
ref #61553
1 parent ea82535 commit 972e642

File tree

5 files changed

+80
-16
lines changed

5 files changed

+80
-16
lines changed

br/pkg/lightning/backend/local/rate_limiter_param.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,29 @@ var (
4646
// InitializeRateLimiterParam initializes the rate limiter params.
4747
func InitializeRateLimiterParam(m *meta.Meta, logger *zap.Logger) error {
4848
err := initializeVariables(
49-
m.GetIngestMaxBatchSplitRanges,
50-
defaultMaxBatchSplitRanges,
51-
&CurrentMaxBatchSplitRanges,
49+
m.GetIngestMaxBatchSplitRanges, m.SetIngestMaxBatchSplitRanges,
50+
defaultMaxBatchSplitRanges, &CurrentMaxBatchSplitRanges,
5251
logger, "maxBatchSplitRanges")
5352
if err != nil {
5453
return err
5554
}
5655
err = initializeVariables(
57-
m.GetIngestMaxSplitRangesPerSec,
58-
defaultSplitRangesPerSec,
59-
&CurrentMaxSplitRangesPerSec,
56+
m.GetIngestMaxSplitRangesPerSec, m.SetIngestMaxSplitRangesPerSec,
57+
defaultSplitRangesPerSec, &CurrentMaxSplitRangesPerSec,
6058
logger, "maxSplitRangesPerSec")
6159
if err != nil {
6260
return err
6361
}
6462
err = initializeVariables(
65-
m.GetIngestMaxInflight,
66-
defaultMaxIngestInflight,
67-
&CurrentMaxIngestInflight,
63+
m.GetIngestMaxInflight, m.SetIngestMaxInflight,
64+
defaultMaxIngestInflight, &CurrentMaxIngestInflight,
6865
logger, "maxIngestInflight")
6966
if err != nil {
7067
return err
7168
}
7269
err = initializeVariables(
73-
m.GetIngestMaxPerSec,
74-
defaultMaxIngestPerSec,
75-
&CurrentMaxIngestPerSec,
70+
m.GetIngestMaxPerSec, m.SetIngestMaxPerSec,
71+
defaultMaxIngestPerSec, &CurrentMaxIngestPerSec,
7672
logger, "maxIngestPerSec")
7773
if err != nil {
7874
return err
@@ -82,6 +78,7 @@ func InitializeRateLimiterParam(m *meta.Meta, logger *zap.Logger) error {
8278

8379
func initializeVariables[T comparable](
8480
metaGetter func() (v T, isNull bool, err error),
81+
metaSetter func(v T) error,
8582
defaultVal T,
8683
globalVar *atomic.Pointer[T],
8784
logger *zap.Logger,
@@ -92,11 +89,17 @@ func initializeVariables[T comparable](
9289
return errors.Annotatef(err, "failed to read %s value from meta store", varName)
9390
}
9491
var zero T
95-
if isNull || val == zero {
92+
if isNull {
93+
err = metaSetter(defaultVal)
94+
if err != nil {
95+
return errors.Annotatef(err, "failed to set %s value to meta store", varName)
96+
}
9697
val = defaultVal
9798
logger.Info("meta kv not found in meta store, initialized to default and persisted",
9899
zap.String("key", varName),
99100
zap.Any("value", defaultVal))
101+
} else if val == zero {
102+
val = defaultVal
100103
} else {
101104
logger.Info("loaded value from meta store",
102105
zap.String("key", varName),

pkg/server/handler/tests/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ go_test(
1010
],
1111
flaky = True,
1212
race = "on",
13-
shard_count = 38,
13+
shard_count = 39,
1414
deps = [
1515
"//pkg/config",
1616
"//pkg/ddl",

pkg/server/handler/tests/http_handler_serial_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,3 +684,53 @@ func TestTTL(t *testing.T) {
684684
require.Nil(t, obj)
685685
require.EqualError(t, err, "http status: 400 Bad Request, table test_ttl.t2 not exists")
686686
}
687+
688+
func TestIngestParam(t *testing.T) {
689+
ts := createBasicHTTPHandlerTestSuite()
690+
ts.startServer(t)
691+
defer ts.stopServer(t)
692+
693+
testCases := []struct {
694+
url string
695+
defaultVal any
696+
modVal any
697+
expectedVal any
698+
}{
699+
{"/ingest/max-batch-split-ranges", float64(2048), 1000, float64(1000)},
700+
{"/ingest/max-split-ranges-per-sec", float64(0), 2000, float64(2000)},
701+
{"/ingest/max-ingest-inflight", float64(0), 1000, float64(1000)},
702+
{"/ingest/max-ingest-per-sec", float64(0), 2000, float64(2000)},
703+
}
704+
705+
for _, tc := range testCases {
706+
t.Run(tc.url, func(t *testing.T) {
707+
resp, err := ts.FetchStatus(tc.url)
708+
require.NoError(t, err)
709+
defer func() { require.NoError(t, resp.Body.Close()) }()
710+
require.Equal(t, http.StatusOK, resp.StatusCode)
711+
decoder := json.NewDecoder(resp.Body)
712+
var payload struct {
713+
Value float64 `json:"value"`
714+
IsNull bool `json:"is_null"`
715+
}
716+
err = decoder.Decode(&payload)
717+
require.NoError(t, err)
718+
require.Equal(t, tc.defaultVal, payload.Value)
719+
720+
resp, err = ts.PostStatus(tc.url, "", bytes.NewBuffer([]byte(fmt.Sprintf(`{"value": %v}`, tc.modVal))))
721+
require.NoError(t, err)
722+
require.NotNil(t, resp)
723+
defer func() { require.NoError(t, resp.Body.Close()) }()
724+
require.Equal(t, http.StatusOK, resp.StatusCode)
725+
726+
resp, err = ts.FetchStatus(tc.url)
727+
require.NoError(t, err)
728+
defer func() { require.NoError(t, resp.Body.Close()) }()
729+
require.Equal(t, http.StatusOK, resp.StatusCode)
730+
decoder = json.NewDecoder(resp.Body)
731+
err = decoder.Decode(&payload)
732+
require.NoError(t, err)
733+
require.Equal(t, tc.expectedVal, payload.Value)
734+
})
735+
}
736+
}

pkg/server/handler/tikvhandler/tikv_handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2114,11 +2114,12 @@ func (h IngestConcurrencyHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
21142114
default:
21152115
handler.WriteError(w, errors.Errorf("unsupported ingest parameter: %s", h.param))
21162116
}
2117+
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)
21172118
switch req.Method {
21182119
case http.MethodGet:
21192120
var respValue float64
21202121
var respIsNull bool
2121-
err := kv.RunInNewTxn(context.Background(), h.Store.(kv.Storage), false, func(_ context.Context, txn kv.Transaction) error {
2122+
err := kv.RunInNewTxn(ctx, h.Store.(kv.Storage), false, func(_ context.Context, txn kv.Transaction) error {
21222123
m := meta.NewMeta(txn)
21232124
var getErr error
21242125
respValue, respIsNull, getErr = getter(m)
@@ -2148,7 +2149,7 @@ func (h IngestConcurrencyHandler) ServeHTTP(w http.ResponseWriter, req *http.Req
21482149
handler.WriteError(w, errors.New("value must be >= 0"))
21492150
return
21502151
}
2151-
err := kv.RunInNewTxn(context.Background(), h.Store.(kv.Storage), true, func(_ context.Context, txn kv.Transaction) error {
2152+
err := kv.RunInNewTxn(ctx, h.Store.(kv.Storage), true, func(_ context.Context, txn kv.Transaction) error {
21522153
m := meta.NewMeta(txn)
21532154
return setter(m, newValue)
21542155
})

pkg/server/http_status.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,16 @@ func (s *Server) startHTTPServer() {
257257
// HTTP path for upgrade operations.
258258
router.Handle("/upgrade/{op}", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations")
259259

260+
// HTTP path for ingest configurations
261+
router.Handle("/ingest/max-batch-split-ranges", tikvhandler.NewIngestConcurrencyHandler(
262+
tikvHandlerTool, tikvhandler.IngestParamMaxBatchSplitRanges)).Name("IngestMaxBatchSplitRanges")
263+
router.Handle("/ingest/max-split-ranges-per-sec", tikvhandler.NewIngestConcurrencyHandler(
264+
tikvHandlerTool, tikvhandler.IngestParamMaxSplitRangesPerSec)).Name("IngestMaxSplitRangesPerSec")
265+
router.Handle("/ingest/max-ingest-inflight", tikvhandler.NewIngestConcurrencyHandler(
266+
tikvHandlerTool, tikvhandler.IngestParamMaxInflight)).Name("IngestMaxInflight")
267+
router.Handle("/ingest/max-ingest-per-sec", tikvhandler.NewIngestConcurrencyHandler(
268+
tikvHandlerTool, tikvhandler.IngestParamMaxPerSecond)).Name("IngestMaxPerSec")
269+
260270
if s.cfg.Store == "tikv" {
261271
// HTTP path for tikv.
262272
router.Handle("/tables/{db}/{table}/regions", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRegions))

0 commit comments

Comments
 (0)