Skip to content

Commit 00f1ba5

Browse files
committed
Merge commit '5ec4f36c855574b64be98e4c9f6b4485ceb4143d' into wenxuan/fix-scalar-func-init
* commit '5ec4f36c855574b64be98e4c9f6b4485ceb4143d': (48 commits) dxf: use correct store in add-index, checksum by copr in import-into, and enable DXF service (pingcap#62224) infoschema: Add new fields in tiflash system table (pingcap#62296) lightning: fix panic on checksum manager close when checksum is off (pingcap#62300) *: Add owners cfg file for pkg/metrics module (pingcap#62292) autoid: reserve a table ID range for downstream fork (pingcap#62157) planner: Fix expression rewriting and method signature mismatch in plan cache (pingcap#58506) planner: use prop based noCopPushDown mechanism to replace aggregation field. (pingcap#62249) fix: close issue 59457 by trim compare first (pingcap#61915) planner: add skew risk ratio for range pred (pingcap#62035) meta: unify definition of system or memory DB name (pingcap#62247) ddl: add retry for updateSelfVersion (pingcap#62190) *: upgrade client go to add config valid function (pingcap#62246) executor: fix the inappropriate RequiredRows set by `TopNExec` (pingcap#62154) planner: RegardNULLAsPoint should be true as default (pingcap#62194) planner: apply predicate simplification before extract condition (pingcap#62211) test: fix failed test caused by new version of `mc` (pingcap#61356) planner: deprecate the logical interface CanPushToCop and its implementation canPushToCopImpl. (pingcap#62235) fix: shut down test without error (pingcap#61921) planner: lift the canPushToCop check of logical join/window/selection (pingcap#62206) backend/local: add rate limiter for split region and ingest data (pingcap#61555) ...
2 parents 0174e10 + 5ec4f36 commit 00f1ba5

File tree

412 files changed

+10160
-4304
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

412 files changed

+10160
-4304
lines changed

DEPS.bzl

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7272,27 +7272,27 @@ def go_deps():
72727272
build_tags = ["nextgen"],
72737273
build_file_proto_mode = "disable_global",
72747274
importpath = "github.com/tikv/client-go/v2",
7275-
sha256 = "5ee9e7df77d4f356c0aaed1aa8c1d67627d3afefb1e1cfc553cd47bb50a9fb1a",
7276-
strip_prefix = "github.com/tikv/client-go/[email protected].20250627074109-b7e019d31519",
7275+
sha256 = "a911629d5e7c8421e2a69d9f47eb1ad6cb286ed0c13a5a8d201291d4b1739fd9",
7276+
strip_prefix = "github.com/tikv/client-go/[email protected].20250707065624-97382455050a",
72777277
urls = [
7278-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250627074109-b7e019d31519.zip",
7279-
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250627074109-b7e019d31519.zip",
7280-
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250627074109-b7e019d31519.zip",
7281-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250627074109-b7e019d31519.zip",
7278+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
7279+
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
7280+
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
7281+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
72827282
],
72837283
)
72847284
go_repository(
72857285
name = "com_github_tikv_pd_client",
72867286
build_tags = ["nextgen"],
72877287
build_file_proto_mode = "disable_global",
72887288
importpath = "github.com/tikv/pd/client",
7289-
sha256 = "a2f49c06aa0ecd911ad983a6aa54a9b958575bb215bcb61d0c1641e9182d7996",
7290-
strip_prefix = "github.com/tikv/pd/[email protected]20250701094812-6002a2eaf02b",
7289+
sha256 = "f5e502897005e01f77da34f6a687a584bf11bad405a2d6ff94e1fe0225925bd1",
7290+
strip_prefix = "github.com/tikv/pd/[email protected]20250703091733-dfd345b89500",
72917291
urls = [
7292-
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250701094812-6002a2eaf02b.zip",
7293-
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250701094812-6002a2eaf02b.zip",
7294-
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250701094812-6002a2eaf02b.zip",
7295-
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250701094812-6002a2eaf02b.zip",
7292+
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250703091733-dfd345b89500.zip",
7293+
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250703091733-dfd345b89500.zip",
7294+
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250703091733-dfd345b89500.zip",
7295+
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20250703091733-dfd345b89500.zip",
72967296
],
72977297
)
72987298
go_repository(

OWNERS_ALIASES

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ aliases:
119119
- henrybw
120120
- wddevries
121121
- King-Dylan
122+
sig-approvers-metrics: # approvers for metrics module
123+
- XuHuaiyu
124+
- zimulala
125+
- yibin87
126+
- nolouch
122127
sig-community-reviewers:
123128
- CabinfeverB
124129
- ChenPeng2013

br/pkg/backup/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"//pkg/distsql",
3030
"//pkg/kv",
3131
"//pkg/meta",
32+
"//pkg/meta/metadef",
3233
"//pkg/meta/model",
3334
"//pkg/statistics/handle",
3435
"//pkg/statistics/util",

br/pkg/backup/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ import (
3434
"github.com/pingcap/tidb/pkg/distsql"
3535
"github.com/pingcap/tidb/pkg/kv"
3636
"github.com/pingcap/tidb/pkg/meta"
37+
"github.com/pingcap/tidb/pkg/meta/metadef"
3738
"github.com/pingcap/tidb/pkg/meta/model"
3839
"github.com/pingcap/tidb/pkg/tablecodec"
39-
"github.com/pingcap/tidb/pkg/util"
4040
filter "github.com/pingcap/tidb/pkg/util/table-filter"
4141
"github.com/tikv/client-go/v2/oracle"
4242
"github.com/tikv/client-go/v2/txnkv/txnlock"
@@ -758,7 +758,7 @@ func BuildBackupRangeAndInitSchema(
758758

759759
for _, dbInfo := range dbs {
760760
// skip system databases
761-
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
761+
if !tableFilter.MatchSchema(dbInfo.Name.O) || metadef.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
762762
continue
763763
}
764764

@@ -832,7 +832,7 @@ func BuildBackupSchemas(
832832

833833
for _, dbInfo := range dbs {
834834
// skip system databases
835-
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
835+
if !tableFilter.MatchSchema(dbInfo.Name.O) || metadef.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
836836
continue
837837
}
838838

br/pkg/restore/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//pkg/domain",
2626
"//pkg/kv",
2727
"//pkg/meta",
28+
"//pkg/meta/metadef",
2829
"//pkg/meta/model",
2930
"//pkg/parser/ast",
3031
"//pkg/util",

br/pkg/restore/log_client/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ go_library(
6767
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
6868
"@com_github_pingcap_kvproto//pkg/metapb",
6969
"@com_github_pingcap_log//:log",
70+
"@com_github_prometheus_client_golang//prometheus",
7071
"@com_github_tikv_client_go_v2//config",
7172
"@com_github_tikv_client_go_v2//kv",
7273
"@com_github_tikv_client_go_v2//util",

br/pkg/restore/log_client/client.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -887,8 +887,16 @@ func (rc *LogClient) RestoreKVFiles(
887887
ctx = opentracing.ContextWithSpan(ctx, span1)
888888
}
889889

890-
var applyWg sync.WaitGroup
891-
eg, ectx := errgroup.WithContext(ctx)
890+
var (
891+
applyWg sync.WaitGroup
892+
eg, ectx = errgroup.WithContext(ctx)
893+
894+
skipped = metrics.KVApplyTasksEvents.WithLabelValues("skipped")
895+
submitted = metrics.KVApplyTasksEvents.WithLabelValues("submitted")
896+
started = metrics.KVApplyTasksEvents.WithLabelValues("started")
897+
finished = metrics.KVApplyTasksEvents.WithLabelValues("finished")
898+
memApplied = metrics.KVLogFileEmittedMemory.WithLabelValues("2-applied")
899+
)
892900
applyFunc := func(files []*LogDataFileInfo, kvCount int64, size uint64) {
893901
if len(files) == 0 {
894902
return
@@ -897,16 +905,23 @@ func (rc *LogClient) RestoreKVFiles(
897905
// because the tableID of files is the same.
898906
rule, ok := rules[files[0].TableId]
899907
if !ok {
908+
skipped.Add(float64(len(files)))
900909
onProgress(kvCount)
901910
summary.CollectInt("FileSkip", len(files))
902911
log.Debug("skip file due to table id not matched", zap.Int64("table-id", files[0].TableId))
903912
skipFile += len(files)
904913
} else {
914+
submitted.Add(float64(len(files)))
905915
applyWg.Add(1)
906916
rc.logRestoreManager.workerPool.ApplyOnErrorGroup(eg, func() (err error) {
917+
started.Add(float64(len(files)))
907918
fileStart := time.Now()
908919
defer applyWg.Done()
909920
defer func() {
921+
for _, file := range files {
922+
memApplied.Add(float64(file.Size()))
923+
}
924+
finished.Add(float64(len(files)))
910925
onProgress(kvCount)
911926
updateStats(uint64(kvCount), size)
912927
summary.CollectInt("File", len(files))

br/pkg/restore/log_client/import.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,20 @@ func (importer *LogFileImporter) ImportKVFiles(
137137
log.Debug("rewrite file keys",
138138
logutil.Key("startKey", startKey), logutil.Key("endKey", endKey))
139139

140-
var numRegions int64
140+
var (
141+
numRegions int64
142+
143+
listener = RangeCtlMetricListener{
144+
RequestRegion: metrics.KVApplyRunOverRegionsEvents.WithLabelValues("request-region"),
145+
RetryRegion: metrics.KVApplyRunOverRegionsEvents.WithLabelValues("retry-region"),
146+
RetryRange: metrics.KVApplyRunOverRegionsEvents.WithLabelValues("retry-range"),
147+
RegionSuccess: metrics.KVApplyRunOverRegionsEvents.WithLabelValues("region-success"),
148+
}
149+
)
141150
// This RetryState will retry 45 time, about 10 min.
142151
rs := utils.InitialRetryState(45, 100*time.Millisecond, 15*time.Second)
143152
ctl := CreateRangeController(startKey, endKey, importer.metaClient, &rs)
153+
ctl.SetEventListener(&listener)
144154
err = ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
145155
atomic.AddInt64(&numRegions, 1)
146156
subfiles, errFilter := filterFilesByRegion(files, ranges, r)

br/pkg/restore/log_client/import_retry.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/pingcap/tidb/br/pkg/restore/split"
1818
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
1919
"github.com/pingcap/tidb/br/pkg/utils"
20+
"github.com/prometheus/client_golang/prometheus"
2021
"github.com/tikv/client-go/v2/kv"
2122
"go.uber.org/multierr"
2223
"google.golang.org/grpc/codes"
@@ -25,6 +26,23 @@ import (
2526

2627
type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult
2728

29+
// RangeCtlEventListener defines the interface for handling events during range operations
30+
type RangeCtlEventListener interface {
31+
OnRequestRegion(ctx context.Context, region *split.RegionInfo)
32+
OnRetryRegion(ctx context.Context, region *split.RegionInfo, err error)
33+
OnRetryRange(ctx context.Context, err error)
34+
OnRegionSuccess(ctx context.Context, region *split.RegionInfo)
35+
}
36+
37+
// RangeCtlNopListener is a no-op implementation of EventListener
38+
type RangeCtlNopListener struct{}
39+
40+
func (n *RangeCtlNopListener) OnRequestRegion(ctx context.Context, region *split.RegionInfo) {}
41+
func (n *RangeCtlNopListener) OnRetryRegion(ctx context.Context, region *split.RegionInfo, err error) {
42+
}
43+
func (n *RangeCtlNopListener) OnRetryRange(ctx context.Context, err error) {}
44+
func (n *RangeCtlNopListener) OnRegionSuccess(ctx context.Context, region *split.RegionInfo) {}
45+
2846
// RangeController manages the execution of operations over a range of regions.
2947
// It provides functionality to scan regions within a specified key range and
3048
// apply a given function to each region, handling errors and retries automatically.
@@ -33,8 +51,36 @@ type RangeController struct {
3351
end []byte
3452
metaClient split.SplitClient
3553

36-
errors error
37-
rs *utils.RetryState
54+
errors error
55+
rs *utils.RetryState
56+
listener RangeCtlEventListener
57+
}
58+
59+
type RangeCtlMetricListener struct {
60+
RequestRegion prometheus.Counter
61+
RetryRegion prometheus.Counter
62+
RetryRange prometheus.Counter
63+
RegionSuccess prometheus.Counter
64+
}
65+
66+
// OnRequestRegion implements EventListener interface
67+
func (m *RangeCtlMetricListener) OnRequestRegion(ctx context.Context, region *split.RegionInfo) {
68+
m.RequestRegion.Inc()
69+
}
70+
71+
// OnRetryRegion implements EventListener interface
72+
func (m *RangeCtlMetricListener) OnRetryRegion(ctx context.Context, region *split.RegionInfo, err error) {
73+
m.RetryRegion.Inc()
74+
}
75+
76+
// OnRetryRange implements EventListener interface
77+
func (m *RangeCtlMetricListener) OnRetryRange(ctx context.Context, err error) {
78+
m.RetryRange.Inc()
79+
}
80+
81+
// OnRegionSuccess implements EventListener interface
82+
func (m *RangeCtlMetricListener) OnRegionSuccess(ctx context.Context, region *split.RegionInfo) {
83+
m.RegionSuccess.Inc()
3884
}
3985

4086
// CreateRangeController creates a controller that cloud be used to scan regions in a range and
@@ -53,9 +99,15 @@ func CreateRangeController(start, end []byte, metaClient split.SplitClient, retr
5399
end: end,
54100
metaClient: metaClient,
55101
rs: retryStatus,
102+
listener: &RangeCtlNopListener{},
56103
}
57104
}
58105

106+
// SetEventListener sets the event listener for the range controller
107+
func (o *RangeController) SetEventListener(listener RangeCtlEventListener) {
108+
o.listener = listener
109+
}
110+
59111
func (o *RangeController) onError(_ context.Context, result RPCResult, region *split.RegionInfo) {
60112
o.errors = multierr.Append(o.errors, errors.Annotatef(&result, "execute over region %v failed", region.Region))
61113
// TODO: Maybe handle some of region errors like `epoch not match`?
@@ -141,8 +193,9 @@ func (o *RangeController) ApplyFuncToRange(ctx context.Context, f RegionFunc) er
141193
if errScanRegion != nil {
142194
return errors.Trace(errScanRegion)
143195
}
144-
145196
for _, region := range regionInfos {
197+
o.listener.OnRequestRegion(adjustedCtx, region)
198+
146199
cont, err := o.applyFuncToRegion(adjustedCtx, f, region)
147200
if err != nil {
148201
return err
@@ -170,17 +223,21 @@ func (o *RangeController) applyFuncToRegion(ctx context.Context, f RegionFunc, r
170223
case StrategyFromThisRegion:
171224
logutil.CL(ctx).Warn("retry for region", logutil.Region(region.Region), logutil.ShortError(&result))
172225
if !o.handleRegionError(ctx, result, region) {
226+
o.listener.OnRetryRange(ctx, &result)
173227
return false, o.ApplyFuncToRange(ctx, f)
174228
}
229+
o.listener.OnRetryRegion(ctx, region, &result)
175230
return o.applyFuncToRegion(ctx, f, region)
176231
case StrategyFromStart:
177232
logutil.CL(ctx).Warn("retry for execution over regions", logutil.ShortError(&result))
233+
o.listener.OnRetryRange(ctx, &result)
178234
// TODO: make a backoffer considering more about the error info,
179235
// instead of ingore the result and retry.
180236
time.Sleep(o.rs.ExponentialBackoff())
181237
return false, o.ApplyFuncToRange(ctx, f)
182238
}
183239
}
240+
o.listener.OnRegionSuccess(ctx, region)
184241
return true, nil
185242
}
186243

br/pkg/restore/log_client/log_split_strategy.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,24 @@ package logclient
44

55
import (
66
"context"
7+
"time"
78

89
"github.com/pingcap/errors"
910
"github.com/pingcap/log"
1011
"github.com/pingcap/tidb/br/pkg/checkpoint"
1112
"github.com/pingcap/tidb/br/pkg/restore/split"
1213
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
1314
"github.com/pingcap/tidb/br/pkg/summary"
15+
"github.com/pingcap/tidb/pkg/metrics"
1416
"go.uber.org/zap"
1517
)
1618

1719
type LogSplitStrategy struct {
1820
*split.BaseSplitStrategy
1921
checkpointSkipMap *LogFilesSkipMap
2022
checkpointFileProgressFn func(uint64, uint64)
23+
24+
lastMemUsageUpdate time.Time
2125
}
2226

2327
var _ split.SplitStrategy[*LogDataFileInfo] = &LogSplitStrategy{}
@@ -84,6 +88,8 @@ func (ls *LogSplitStrategy) Accumulate(file *LogDataFileInfo) {
8488
},
8589
})
8690
}
91+
92+
ls.maybeUpdateMemUsage()
8793
}
8894

8995
func (ls *LogSplitStrategy) ShouldSplit() bool {
@@ -107,3 +113,20 @@ func (ls *LogSplitStrategy) ShouldSkip(file *LogDataFileInfo) bool {
107113
}
108114
return false
109115
}
116+
117+
func (ls *LogSplitStrategy) maybeUpdateMemUsage() {
118+
if time.Since(ls.lastMemUsageUpdate) < 30*time.Second {
119+
return
120+
}
121+
122+
ls.lastMemUsageUpdate = time.Now()
123+
memUsed := 0
124+
for _, hlp := range ls.TableSplitter {
125+
hlp.Traverse(func(v split.Valued) bool {
126+
memUsed += v.MemSize()
127+
return true
128+
})
129+
}
130+
131+
metrics.KVSplitHelperMemUsage.Set(float64(memUsed))
132+
}

0 commit comments

Comments
 (0)