Skip to content

Commit 4b2c47d

Browse files
authored
scheduler: Speed ​​up region scanning before table splitting (#12222)
close #12221
1 parent b144e40 commit 4b2c47d

File tree

2 files changed

+34
-50
lines changed

2 files changed

+34
-50
lines changed

cdc/scheduler/internal/v3/keyspan/mock.go

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ package keyspan
1515

1616
import (
1717
"bytes"
18+
"unsafe"
1819

20+
"github.com/pingcap/kvproto/pkg/metapb"
1921
"github.com/pingcap/tiflow/cdc/model"
2022
"github.com/pingcap/tiflow/cdc/processor/tablepb"
2123
"github.com/pingcap/tiflow/pkg/config"
@@ -26,12 +28,10 @@ import (
2628
// RegionCache is a simplified interface of tikv.RegionCache.
2729
// It is useful to restrict RegionCache usage and mocking in tests.
2830
type RegionCache interface {
29-
// ListRegionIDsInKeyRange lists ids of regions in [startKey,endKey].
30-
ListRegionIDsInKeyRange(
31+
// LoadRegionsInKeyRange loads regions in [startKey,endKey].
32+
LoadRegionsInKeyRange(
3133
bo *tikv.Backoffer, startKey, endKey []byte,
32-
) (regionIDs []uint64, err error)
33-
// LocateRegionByID searches for the region with ID.
34-
LocateRegionByID(bo *tikv.Backoffer, regionID uint64) (*tikv.KeyLocation, error)
34+
) (regions []*tikv.Region, err error)
3535
}
3636

3737
// mockCache mocks tikv.RegionCache.
@@ -42,34 +42,29 @@ type mockCache struct {
4242
// NewMockRegionCache returns a new MockCache.
4343
func NewMockRegionCache() *mockCache { return &mockCache{regions: spanz.NewBtreeMap[uint64]()} }
4444

45-
// ListRegionIDsInKeyRange lists ids of regions in [startKey,endKey].
46-
func (m *mockCache) ListRegionIDsInKeyRange(
45+
func (m *mockCache) LoadRegionsInKeyRange(
4746
bo *tikv.Backoffer, startKey, endKey []byte,
48-
) (regionIDs []uint64, err error) {
47+
) (regions []*tikv.Region, err error) {
4948
m.regions.Ascend(func(loc tablepb.Span, id uint64) bool {
5049
if bytes.Compare(loc.StartKey, endKey) >= 0 ||
5150
bytes.Compare(loc.EndKey, startKey) <= 0 {
5251
return true
5352
}
54-
regionIDs = append(regionIDs, id)
55-
return true
56-
})
57-
return
58-
}
59-
60-
// LocateRegionByID searches for the region with ID.
61-
func (m *mockCache) LocateRegionByID(
62-
bo *tikv.Backoffer, regionID uint64,
63-
) (loc *tikv.KeyLocation, err error) {
64-
m.regions.Ascend(func(span tablepb.Span, id uint64) bool {
65-
if id != regionID {
66-
return true
67-
}
68-
loc = &tikv.KeyLocation{
69-
StartKey: span.StartKey,
70-
EndKey: span.EndKey,
53+
region := &tikv.Region{}
54+
meta := &metapb.Region{
55+
Id: id,
56+
StartKey: loc.StartKey,
57+
EndKey: loc.EndKey,
7158
}
72-
return false
59+
60+
// region.meta is not exported, so we use unsafe to set it for testing.
61+
regionPtr := (*struct {
62+
meta *metapb.Region
63+
})(unsafe.Pointer(region))
64+
regionPtr.meta = meta
65+
66+
regions = append(regions, region)
67+
return true
7368
})
7469
return
7570
}

cdc/scheduler/internal/v3/keyspan/splitter_region_count.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ package keyspan
1616
import (
1717
"bytes"
1818
"context"
19+
"encoding/hex"
1920
"math"
21+
"time"
2022

2123
"github.com/pingcap/log"
2224
"github.com/pingcap/tiflow/cdc/model"
@@ -44,8 +46,9 @@ func newRegionCountSplitter(
4446
func (m *regionCountSplitter) split(
4547
ctx context.Context, span tablepb.Span, captureNum int,
4648
) []tablepb.Span {
49+
startTimestamp := time.Now()
4750
bo := tikv.NewBackoffer(ctx, 500)
48-
regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey)
51+
regions, err := m.regionCache.LoadRegionsInKeyRange(bo, span.StartKey, span.EndKey)
4952
if err != nil {
5053
log.Warn("schedulerv3: list regions failed, skip split span",
5154
zap.String("namespace", m.changefeedID.Namespace),
@@ -72,38 +75,23 @@ func (m *regionCountSplitter) split(
7275
spans := make([]tablepb.Span, 0, stepper.SpanCount())
7376
start, end := 0, stepper.Step()
7477
for {
75-
startRegion, err := m.regionCache.LocateRegionByID(bo, regions[start])
76-
if err != nil {
77-
log.Warn("schedulerv3: get regions failed, skip split span",
78-
zap.String("namespace", m.changefeedID.Namespace),
79-
zap.String("changefeed", m.changefeedID.ID),
80-
zap.String("span", span.String()),
81-
zap.Error(err))
82-
return []tablepb.Span{span}
83-
}
84-
endRegion, err := m.regionCache.LocateRegionByID(bo, regions[end-1])
85-
if err != nil {
86-
log.Warn("schedulerv3: get regions failed, skip split span",
87-
zap.String("namespace", m.changefeedID.Namespace),
88-
zap.String("changefeed", m.changefeedID.ID),
89-
zap.String("span", span.String()),
90-
zap.Error(err))
91-
return []tablepb.Span{span}
92-
}
78+
startKey := regions[start].StartKey()
79+
endKey := regions[end-1].EndKey()
9380
if len(spans) > 0 &&
94-
bytes.Compare(spans[len(spans)-1].EndKey, startRegion.StartKey) > 0 {
81+
bytes.Compare(spans[len(spans)-1].EndKey, startKey) > 0 {
9582
log.Warn("schedulerv3: list region out of order detected",
9683
zap.String("namespace", m.changefeedID.Namespace),
9784
zap.String("changefeed", m.changefeedID.ID),
9885
zap.String("span", span.String()),
9986
zap.Stringer("lastSpan", &spans[len(spans)-1]),
100-
zap.Stringer("region", startRegion))
87+
zap.String("startKey", hex.EncodeToString(startKey)),
88+
zap.String("endKey", hex.EncodeToString(endKey)))
10189
return []tablepb.Span{span}
10290
}
10391
spans = append(spans, tablepb.Span{
10492
TableID: span.TableID,
105-
StartKey: startRegion.StartKey,
106-
EndKey: endRegion.EndKey,
93+
StartKey: startKey,
94+
EndKey: endKey,
10795
})
10896

10997
if end == len(regions) {
@@ -128,7 +116,8 @@ func (m *regionCountSplitter) split(
128116
zap.Int("totalCaptures", captureNum),
129117
zap.Int("regionCount", len(regions)),
130118
zap.Int("regionThreshold", m.regionThreshold),
131-
zap.Int("spanRegionLimit", spanRegionLimit))
119+
zap.Int("spanRegionLimit", spanRegionLimit),
120+
zap.Duration("time", time.Since(startTimestamp)))
132121
return spans
133122
}
134123

0 commit comments

Comments
 (0)