Skip to content

Commit 3c54e26

Browse files
tangentati-chi-bot
authored andcommitted
This is an automated cherry-pick of #63099
Signed-off-by: ti-chi-bot <[email protected]>
1 parent ea82535 commit 3c54e26

File tree

4 files changed

+112
-13
lines changed

4 files changed

+112
-13
lines changed

br/pkg/lightning/backend/local/rate_limiter_param.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,29 @@ var (
4646
// InitializeRateLimiterParam initializes the rate limiter params.
4747
func InitializeRateLimiterParam(m *meta.Meta, logger *zap.Logger) error {
4848
err := initializeVariables(
49-
m.GetIngestMaxBatchSplitRanges,
50-
defaultMaxBatchSplitRanges,
51-
&CurrentMaxBatchSplitRanges,
49+
m.GetIngestMaxBatchSplitRanges, m.SetIngestMaxBatchSplitRanges,
50+
defaultMaxBatchSplitRanges, &CurrentMaxBatchSplitRanges,
5251
logger, "maxBatchSplitRanges")
5352
if err != nil {
5453
return err
5554
}
5655
err = initializeVariables(
57-
m.GetIngestMaxSplitRangesPerSec,
58-
defaultSplitRangesPerSec,
59-
&CurrentMaxSplitRangesPerSec,
56+
m.GetIngestMaxSplitRangesPerSec, m.SetIngestMaxSplitRangesPerSec,
57+
defaultSplitRangesPerSec, &CurrentMaxSplitRangesPerSec,
6058
logger, "maxSplitRangesPerSec")
6159
if err != nil {
6260
return err
6361
}
6462
err = initializeVariables(
65-
m.GetIngestMaxInflight,
66-
defaultMaxIngestInflight,
67-
&CurrentMaxIngestInflight,
63+
m.GetIngestMaxInflight, m.SetIngestMaxInflight,
64+
defaultMaxIngestInflight, &CurrentMaxIngestInflight,
6865
logger, "maxIngestInflight")
6966
if err != nil {
7067
return err
7168
}
7269
err = initializeVariables(
73-
m.GetIngestMaxPerSec,
74-
defaultMaxIngestPerSec,
75-
&CurrentMaxIngestPerSec,
70+
m.GetIngestMaxPerSec, m.SetIngestMaxPerSec,
71+
defaultMaxIngestPerSec, &CurrentMaxIngestPerSec,
7672
logger, "maxIngestPerSec")
7773
if err != nil {
7874
return err
@@ -82,6 +78,7 @@ func InitializeRateLimiterParam(m *meta.Meta, logger *zap.Logger) error {
8278

8379
func initializeVariables[T comparable](
8480
metaGetter func() (v T, isNull bool, err error),
81+
metaSetter func(v T) error,
8582
defaultVal T,
8683
globalVar *atomic.Pointer[T],
8784
logger *zap.Logger,
@@ -92,11 +89,17 @@ func initializeVariables[T comparable](
9289
return errors.Annotatef(err, "failed to read %s value from meta store", varName)
9390
}
9491
var zero T
95-
if isNull || val == zero {
92+
if isNull {
93+
err = metaSetter(defaultVal)
94+
if err != nil {
95+
return errors.Annotatef(err, "failed to set %s value to meta store", varName)
96+
}
9697
val = defaultVal
9798
logger.Info("meta kv not found in meta store, initialized to default and persisted",
9899
zap.String("key", varName),
99100
zap.Any("value", defaultVal))
101+
} else if val == zero {
102+
val = defaultVal
100103
} else {
101104
logger.Info("loaded value from meta store",
102105
zap.String("key", varName),

pkg/server/handler/tests/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@ go_test(
99
"main_test.go",
1010
],
1111
flaky = True,
12+
<<<<<<< HEAD
1213
race = "on",
1314
shard_count = 38,
15+
=======
16+
shard_count = 41,
17+
>>>>>>> 63614029658 (server/handler: register missing http handlers related to ingest param (#63099))
1418
deps = [
1519
"//pkg/config",
1620
"//pkg/ddl",

pkg/server/handler/tests/http_handler_serial_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,3 +684,81 @@ func TestTTL(t *testing.T) {
684684
require.Nil(t, obj)
685685
require.EqualError(t, err, "http status: 400 Bad Request, table test_ttl.t2 not exists")
686686
}
687+
<<<<<<< HEAD
688+
=======
689+
690+
func TestGC(t *testing.T) {
691+
ts := createBasicHTTPHandlerTestSuite()
692+
ts.startServer(t)
693+
defer ts.stopServer(t)
694+
695+
var data url.Values
696+
resp, err := ts.FormStatus("/txn-gc-states", data)
697+
require.NoError(t, err)
698+
require.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
699+
700+
resp, err = ts.FetchStatus("/txn-gc-states")
701+
require.NoError(t, err)
702+
defer func() { require.NoError(t, resp.Body.Close()) }()
703+
require.Equal(t, http.StatusOK, resp.StatusCode)
704+
705+
// Verify the resp body.
706+
decoder := json.NewDecoder(resp.Body)
707+
var state gc.GCState
708+
err = decoder.Decode(&state)
709+
require.NoError(t, err)
710+
711+
var empty gc.GCState
712+
require.NotEqual(t, empty, state)
713+
}
714+
715+
func TestIngestParam(t *testing.T) {
716+
ts := createBasicHTTPHandlerTestSuite()
717+
ts.startServer(t)
718+
defer ts.stopServer(t)
719+
720+
testCases := []struct {
721+
url string
722+
defaultVal any
723+
modVal any
724+
expectedVal any
725+
}{
726+
{"/ingest/max-batch-split-ranges", float64(2048), 1000, float64(1000)},
727+
{"/ingest/max-split-ranges-per-sec", float64(0), 2000, float64(2000)},
728+
{"/ingest/max-ingest-inflight", float64(0), 1000, float64(1000)},
729+
{"/ingest/max-ingest-per-sec", float64(0), 2000, float64(2000)},
730+
}
731+
732+
for _, tc := range testCases {
733+
t.Run(tc.url, func(t *testing.T) {
734+
resp, err := ts.FetchStatus(tc.url)
735+
require.NoError(t, err)
736+
defer func() { require.NoError(t, resp.Body.Close()) }()
737+
require.Equal(t, http.StatusOK, resp.StatusCode)
738+
decoder := json.NewDecoder(resp.Body)
739+
var payload struct {
740+
Value float64 `json:"value"`
741+
IsNull bool `json:"is_null"`
742+
}
743+
err = decoder.Decode(&payload)
744+
require.NoError(t, err)
745+
require.Equal(t, tc.defaultVal, payload.Value)
746+
747+
resp, err = ts.PostStatus(tc.url, "", bytes.NewBuffer([]byte(fmt.Sprintf(`{"value": %v}`, tc.modVal))))
748+
require.NoError(t, err)
749+
require.NotNil(t, resp)
750+
defer func() { require.NoError(t, resp.Body.Close()) }()
751+
require.Equal(t, http.StatusOK, resp.StatusCode)
752+
753+
resp, err = ts.FetchStatus(tc.url)
754+
require.NoError(t, err)
755+
defer func() { require.NoError(t, resp.Body.Close()) }()
756+
require.Equal(t, http.StatusOK, resp.StatusCode)
757+
decoder = json.NewDecoder(resp.Body)
758+
err = decoder.Decode(&payload)
759+
require.NoError(t, err)
760+
require.Equal(t, tc.expectedVal, payload.Value)
761+
})
762+
}
763+
}
764+
>>>>>>> 63614029658 (server/handler: register missing http handlers related to ingest param (#63099))

pkg/server/http_status.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,21 @@ func (s *Server) startHTTPServer() {
257257
// HTTP path for upgrade operations.
258258
router.Handle("/upgrade/{op}", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations")
259259

260+
<<<<<<< HEAD
260261
if s.cfg.Store == "tikv" {
262+
=======
263+
// HTTP path for ingest configurations
264+
router.Handle("/ingest/max-batch-split-ranges", tikvhandler.NewIngestConcurrencyHandler(
265+
tikvHandlerTool, tikvhandler.IngestParamMaxBatchSplitRanges)).Name("IngestMaxBatchSplitRanges")
266+
router.Handle("/ingest/max-split-ranges-per-sec", tikvhandler.NewIngestConcurrencyHandler(
267+
tikvHandlerTool, tikvhandler.IngestParamMaxSplitRangesPerSec)).Name("IngestMaxSplitRangesPerSec")
268+
router.Handle("/ingest/max-ingest-inflight", tikvhandler.NewIngestConcurrencyHandler(
269+
tikvHandlerTool, tikvhandler.IngestParamMaxInflight)).Name("IngestMaxInflight")
270+
router.Handle("/ingest/max-ingest-per-sec", tikvhandler.NewIngestConcurrencyHandler(
271+
tikvHandlerTool, tikvhandler.IngestParamMaxPerSecond)).Name("IngestMaxPerSec")
272+
273+
if s.cfg.Store == config.StoreTypeTiKV {
274+
>>>>>>> 63614029658 (server/handler: register missing http handlers related to ingest param (#63099))
261275
// HTTP path for tikv.
262276
router.Handle("/tables/{db}/{table}/regions", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRegions))
263277
router.Handle("/tables/{db}/{table}/ranges", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRanges))

0 commit comments

Comments
 (0)