Skip to content

Commit 0f26142

Browse files
Rishabh MittalGitHub Enterprise
authored andcommitted
Merge pull request pingcap#2 from airbnb/rish_6.5.4
Cherry-picking lightning 6.5.4
2 parents 6e5334d + 360a34b commit 0f26142

27 files changed

+276
-107
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ build_lightning_for_web:
272272
CGO_ENABLED=1 $(GOBUILD) -tags dev $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o $(LIGHTNING_BIN) br/cmd/tidb-lightning/main.go
273273

274274
build_lightning:
275-
CGO_ENABLED=1 $(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o $(LIGHTNING_BIN) br/cmd/tidb-lightning/main.go
275+
GOOS=linux $(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o $(LIGHTNING_BIN) br/cmd/tidb-lightning/main.go
276276

277277
build_lightning-ctl:
278278
CGO_ENABLED=1 $(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o $(LIGHTNING_CTL_BIN) br/cmd/tidb-lightning-ctl/main.go

br/pkg/lightning/backend/local/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1242,7 +1242,7 @@ func newSSTWriter(path string) (*sstable.Writer, error) {
12421242
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
12431243
newRangePropertiesCollector,
12441244
},
1245-
BlockSize: 16 * 1024,
1245+
BlockSize: 128 * 1024,
12461246
})
12471247
return writer, nil
12481248
}

br/pkg/lightning/backend/local/local.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,7 @@ func (local *local) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*pebble.D
768768
opt.Levels = []pebble.LevelOptions{
769769
{
770770
TargetFileSize: 16 * units.GiB,
771+
BlockSize: 128 * units.KiB,
771772
},
772773
}
773774

@@ -1657,6 +1658,20 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
16571658

16581659
failpoint.Inject("ReadyForImportEngine", func() {})
16591660

1661+
needSplit := len(ranges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
1662+
for i := 0; i < maxRetryTimes; i++ {
1663+
err = local.SplitAndScatterRegionInBatches(ctx, ranges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
1664+
if err == nil || common.IsContextCanceledError(err) {
1665+
break
1666+
}
1667+
1668+
log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engineUUID),
1669+
log.ShortError(err), zap.Int("retry", i))
1670+
}
1671+
if err != nil {
1672+
log.FromContext(ctx).Error("split & scatter ranges failed", zap.Stringer("uuid", engineUUID), log.ShortError(err))
1673+
return err
1674+
}
16601675
for {
16611676
unfinishedRanges := lf.unfinishedRanges(ranges)
16621677
if len(unfinishedRanges) == 0 {
@@ -1666,24 +1681,10 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
16661681

16671682
// if all the kv can fit in one region, skip split regions. TiDB will split one region for
16681683
// the table when table is created.
1669-
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
16701684
// split region by given ranges
16711685
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
16721686
needSplit = true
16731687
})
1674-
for i := 0; i < maxRetryTimes; i++ {
1675-
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
1676-
if err == nil || common.IsContextCanceledError(err) {
1677-
break
1678-
}
1679-
1680-
log.FromContext(ctx).Warn("split and scatter failed in retry", zap.Stringer("uuid", engineUUID),
1681-
log.ShortError(err), zap.Int("retry", i))
1682-
}
1683-
if err != nil {
1684-
log.FromContext(ctx).Error("split & scatter ranges failed", zap.Stringer("uuid", engineUUID), log.ShortError(err))
1685-
return err
1686-
}
16871688

16881689
// start to write to kv and ingest
16891690
err = local.writeAndIngestByRanges(ctx, lf, unfinishedRanges, regionSplitSize, regionSplitKeys)

br/pkg/lightning/backend/local/localhelper.go

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,7 @@ func (local *local) SplitAndScatterRegionByRanges(
9898
if len(ranges) == 0 {
9999
return nil
100100
}
101-
102-
db, err := local.g.GetDB()
103-
if err != nil {
104-
return errors.Trace(err)
105-
}
101+
var err error
106102

107103
minKey := codec.EncodeBytes([]byte{}, ranges[0].start)
108104
maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end)
@@ -111,13 +107,16 @@ func (local *local) SplitAndScatterRegionByRanges(
111107
var retryKeys [][]byte
112108
waitTime := splitRegionBaseBackOffTime
113109
skippedKeys := 0
110+
var skipped_scatter bool
111+
skipped_scatter = false
114112
for i := 0; i < splitRetryTimes; i++ {
115113
log.FromContext(ctx).Info("split and scatter region",
116114
logutil.Key("minKey", minKey),
117115
logutil.Key("maxKey", maxKey),
118116
zap.Int("retry", i),
119117
)
120118
err = nil
119+
// wait time
121120
if i > 0 {
122121
select {
123122
case <-time.After(waitTime):
@@ -130,6 +129,7 @@ func (local *local) SplitAndScatterRegionByRanges(
130129
}
131130
}
132131
var regions []*split.RegionInfo
132+
//scan all regions
133133
regions, err = split.PaginateScanRegion(ctx, local.splitCli, minKey, maxKey, 128)
134134
log.FromContext(ctx).Info("paginate scan regions", zap.Int("count", len(regions)),
135135
logutil.Key("start", minKey), logutil.Key("end", maxKey))
@@ -147,6 +147,25 @@ func (local *local) SplitAndScatterRegionByRanges(
147147
break
148148
}
149149

150+
if !skipped_scatter {
151+
log.FromContext(ctx).Info("scattering initial scan region", zap.Int("regions", len(regions)))
152+
for _, region := range regions {
153+
local.ScatterRegions(ctx, region)
154+
}
155+
scatterCount, err := local.waitForScatterRegions(ctx, regions)
156+
if scatterCount == len(regions) {
157+
log.FromContext(ctx).Info("waiting for scattering initial regions done",
158+
zap.Int("regions", len(regions)))
159+
} else {
160+
log.FromContext(ctx).Warn("waiting for scattering initial regions timeout",
161+
zap.Int("skipped_keys", skippedKeys),
162+
zap.Int("scatterCount", scatterCount),
163+
zap.Int("regions", len(regions)),
164+
zap.Error(err))
165+
}
166+
skipped_scatter = true
167+
}
168+
150169
needSplitRanges := make([]Range, 0, len(ranges))
151170
startKey := make([]byte, 0)
152171
endKey := make([]byte, 0)
@@ -172,16 +191,6 @@ func (local *local) SplitAndScatterRegionByRanges(
172191
return nil
173192
}
174193

175-
var tableRegionStats map[uint64]int64
176-
if tableInfo != nil {
177-
tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID)
178-
if err != nil {
179-
log.FromContext(ctx).Warn("fetch table region size statistics failed",
180-
zap.String("table", tableInfo.Name), zap.Error(err))
181-
tableRegionStats, err = make(map[uint64]int64), nil
182-
}
183-
}
184-
185194
regionMap := make(map[uint64]*split.RegionInfo)
186195
for _, region := range regions {
187196
regionMap[region.Region.GetId()] = region
@@ -291,15 +300,6 @@ func (local *local) SplitAndScatterRegionByRanges(
291300
}
292301
sendLoop:
293302
for regionID, keys := range splitKeyMap {
294-
// if region not in tableRegionStats, that means this region is newly split, so
295-
// we can skip split it again.
296-
regionSize, ok := tableRegionStats[regionID]
297-
if !ok {
298-
log.FromContext(ctx).Warn("region stats not found", zap.Uint64("region", regionID))
299-
}
300-
if len(keys) == 1 && regionSize < regionSplitSize {
301-
skippedKeys++
302-
}
303303
select {
304304
case ch <- &splitInfo{region: regionMap[regionID], keys: keys}:
305305
case <-ctx.Done():
@@ -335,10 +335,9 @@ func (local *local) SplitAndScatterRegionByRanges(
335335
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
336336
if scatterCount == len(scatterRegions) {
337337
log.FromContext(ctx).Info("waiting for scattering regions done",
338-
zap.Int("skipped_keys", skippedKeys),
339338
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
340339
} else {
341-
log.FromContext(ctx).Info("waiting for scattering regions timeout",
340+
log.FromContext(ctx).Warn("waiting for scattering regions timeout",
342341
zap.Int("skipped_keys", skippedKeys),
343342
zap.Int("scatterCount", scatterCount),
344343
zap.Int("regions", len(scatterRegions)),
@@ -380,6 +379,28 @@ func fetchTableRegionSizeStats(ctx context.Context, db *sql.DB, tableID int64) (
380379
return stats, errors.Trace(err)
381380
}
382381

382+
func (local *local) ScatterRegions(ctx context.Context, regionInfo *split.RegionInfo) error {
383+
var err error
384+
waitTime := time.Second
385+
for i := 0; i < 10; i++ {
386+
myArray := []*split.RegionInfo{regionInfo}
387+
if err = local.splitCli.ScatterRegions(ctx, myArray); err != nil {
388+
select {
389+
case <-time.After(waitTime):
390+
case <-ctx.Done():
391+
log.FromContext(ctx).Warn("scatter region failed", zap.Error(ctx.Err()), zap.Int("retry", i))
392+
return ctx.Err()
393+
}
394+
} else {
395+
break
396+
}
397+
}
398+
if err != nil {
399+
log.FromContext(ctx).Warn("scatter region failed", zap.Error(err))
400+
}
401+
return err
402+
}
403+
383404
func (local *local) BatchSplitRegions(
384405
ctx context.Context,
385406
region *split.RegionInfo,
@@ -400,7 +421,7 @@ func (local *local) BatchSplitRegions(
400421
for _, region := range scatterRegions {
401422
// Wait for a while until the regions successfully splits.
402423
local.waitForSplit(ctx, region.Region.Id)
403-
if err = local.splitCli.ScatterRegion(ctx, region); err != nil {
424+
if err = local.ScatterRegions(ctx, region); err != nil {
404425
failedErr = err
405426
retryRegions = append(retryRegions, region)
406427
}
@@ -410,7 +431,7 @@ func (local *local) BatchSplitRegions(
410431
}
411432
// the scatter operation likely fails because region replicate not finish yet
412433
// pack them to one log to avoid printing a lot warn logs.
413-
log.FromContext(ctx).Warn("scatter region failed", zap.Int("regionCount", len(newRegions)),
434+
log.FromContext(ctx).Warn("scatter region failed", zap.Int("regionCount", len(scatterRegions)),
414435
zap.Int("failedCount", len(retryRegions)), zap.Error(failedErr), zap.Int("retry", i))
415436
scatterRegions = retryRegions
416437
retryRegions = make([]*split.RegionInfo, 0)
@@ -509,7 +530,7 @@ func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionI
509530
default:
510531
log.FromContext(ctx).Debug("scatter-region operator status is abnormal, will scatter region again",
511532
logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus()))
512-
return false, local.splitCli.ScatterRegion(ctx, regionInfo)
533+
return false, local.ScatterRegions(ctx, regionInfo)
513534
}
514535
}
515536

@@ -618,18 +639,10 @@ type storeWriteLimiter struct {
618639
}
619640

620641
func newStoreWriteLimiter(limit int) *storeWriteLimiter {
621-
var burst int
622-
// Allow burst of at most 20% of the limit.
623-
if limit <= math.MaxInt-limit/5 {
624-
burst = limit + limit/5
625-
} else {
626-
// If overflowed, set burst to math.MaxInt.
627-
burst = math.MaxInt
628-
}
629642
return &storeWriteLimiter{
630643
limiters: make(map[uint64]*rate.Limiter),
631644
limit: limit,
632-
burst: burst,
645+
burst: limit,
633646
}
634647
}
635648

br/pkg/lightning/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ type Lightning struct {
185185

186186
MaxError MaxError `toml:"max-error" json:"max-error"`
187187
TaskInfoSchemaName string `toml:"task-info-schema-name" json:"task-info-schema-name"`
188+
189+
MaxDeliverBytes uint64 `toml:"max-deliver-bytes" json:"max-deliver-bytes"`
190+
MaxDeliverRows uint64 `toml:"max-deliver-rows" json:"max-deliver-rows"`
188191
}
189192

190193
type PostOpLevel int

br/pkg/lightning/restore/check_info_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) {
412412
dbMetas,
413413
preInfoGetter,
414414
nil,
415+
nil,
415416
)
416417
preInfoGetter.dbInfosCache = rc.dbInfos
417418
err = rc.checkCSVHeader(ctx)
@@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) {
465466
dbMetas,
466467
preInfoGetter,
467468
nil,
469+
nil,
468470
)
469471

470472
rc := &Controller{
@@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) {
622624
nil,
623625
preInfoGetter,
624626
nil,
627+
nil,
625628
)
626629
rc := &Controller{
627630
cfg: cfg,

br/pkg/lightning/restore/precheck.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,21 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte
5252
}
5353

5454
type PrecheckItemBuilder struct {
55-
cfg *config.Config
56-
dbMetas []*mydump.MDDatabaseMeta
57-
preInfoGetter PreRestoreInfoGetter
58-
checkpointsDB checkpoints.DB
55+
cfg *config.Config
56+
dbMetas []*mydump.MDDatabaseMeta
57+
preInfoGetter PreRestoreInfoGetter
58+
checkpointsDB checkpoints.DB
59+
pdLeaderAddrGetter func() string
5960
}
6061

61-
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
62+
// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
63+
// pdCli **must not** be nil for local backend
64+
func NewPrecheckItemBuilderFromConfig(
65+
ctx context.Context,
66+
cfg *config.Config,
67+
pdCli pd.Client,
68+
opts ...ropts.PrecheckItemBuilderOption,
69+
) (*PrecheckItemBuilder, error) {
6270
var gerr error
6371
builderCfg := new(ropts.PrecheckItemBuilderConfig)
6472
for _, o := range opts {
@@ -98,20 +106,29 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p
98106
if err != nil {
99107
return nil, errors.Trace(err)
100108
}
101-
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr
109+
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr
102110
}
103111

104112
func NewPrecheckItemBuilder(
105113
cfg *config.Config,
106114
dbMetas []*mydump.MDDatabaseMeta,
107115
preInfoGetter PreRestoreInfoGetter,
108116
checkpointsDB checkpoints.DB,
117+
pdCli pd.Client,
109118
) *PrecheckItemBuilder {
119+
leaderAddrGetter := func() string {
120+
return cfg.TiDB.PdAddr
121+
}
122+
// in tests we may not have a pdCli
123+
if pdCli != nil {
124+
leaderAddrGetter = pdCli.GetLeaderAddr
125+
}
110126
return &PrecheckItemBuilder{
111-
cfg: cfg,
112-
dbMetas: dbMetas,
113-
preInfoGetter: preInfoGetter,
114-
checkpointsDB: checkpointsDB,
127+
cfg: cfg,
128+
dbMetas: dbMetas,
129+
preInfoGetter: preInfoGetter,
130+
checkpointsDB: checkpointsDB,
131+
pdLeaderAddrGetter: leaderAddrGetter,
115132
}
116133
}
117134

@@ -142,7 +159,7 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
142159
case CheckLocalTempKVDir:
143160
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
144161
case CheckTargetUsingCDCPITR:
145-
return NewCDCPITRCheckItem(b.cfg), nil
162+
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
146163
default:
147164
return nil, errors.Errorf("unsupported check item: %v", checkID)
148165
}

0 commit comments

Comments
 (0)