Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type SplitClient interface {
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// ScanRegions gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*RegionInfo, error)
// GetPlacementRule loads a placement rule from PD.
GetPlacementRule(ctx context.Context, groupID, ruleID string) (*pdhttp.Rule, error)
// SetPlacementRule insert or update a placement rule to PD.
Expand Down Expand Up @@ -741,14 +741,14 @@ func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetO
return c.client.GetOperator(ctx, regionID)
}

func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) {
func (c *pdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*RegionInfo, error) {
failpoint.Inject("no-leader-error", func(_ failpoint.Value) {
logutil.CL(ctx).Debug("failpoint no-leader-error injected.")
failpoint.Return(nil, status.Error(codes.Unavailable, "not leader"))
})

//nolint:staticcheck
regions, err := c.client.ScanRegions(ctx, key, endKey, limit, opt.WithAllowFollowerHandle())
regions, err := c.client.ScanRegions(ctx, key, endKey, limit, opts...)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/split/mock_pd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (c *TestClient) GetOperator(context.Context, uint64) (*pdpb.GetOperatorResp
}, nil
}

func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) {
func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...opt.GetRegionOption) ([]*RegionInfo, error) {
if c.InjectErr && c.InjectTimes > 0 {
c.InjectTimes -= 1
return nil, status.Error(codes.Unavailable, "not leader")
Expand Down Expand Up @@ -646,6 +646,7 @@ func (f *FakeSplitClient) ScanRegions(
ctx context.Context,
startKey, endKey []byte,
limit int,
_ ...opt.GetRegionOption,
) ([]*RegionInfo, error) {
result := make([]*RegionInfo, 0)
count := 0
Expand Down
14 changes: 12 additions & 2 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/tikv/pd/client/opt"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -225,7 +226,12 @@ func PaginateScanRegion(
scanStartKey := startKey
for {
var batch []*RegionInfo
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
} else {
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit, opt.WithAllowFollowerHandle())
}

if err != nil {
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
redact.Key(scanStartKey), err.Error())
Expand Down Expand Up @@ -306,7 +312,11 @@ func ScanRegionsWithRetry(
// because it's not easy to check multierr equals normal error.
// see https://github.com/pingcap/tidb/issues/33419.
_ = utils.WithRetry(ctx, func() error {
regions, err = client.ScanRegions(ctx, startKey, endKey, limit)
if err != nil {
regions, err = client.ScanRegions(ctx, startKey, endKey, limit)
} else {
regions, err = client.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle())
}
if err != nil {
err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s",
redact.Key(startKey), err.Error())
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_library(
"//pkg/parser/mysql",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/vardef",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle",
"//pkg/tablecodec",
"//pkg/types",
Expand Down
12 changes: 12 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -808,6 +810,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
defer mgr.Close()
defer cfg.CloseCheckpointMetaManager()

if err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error {
enableFollowerHandleRegion, err := se.GetGlobalSysVar(vardef.PDEnableFollowerHandleRegion)
if err != nil {
return err
}
return mgr.SetFollowerHandle(variable.TiDBOptOn(enableFollowerHandleRegion))
}); err != nil {
return errors.Trace(err)
}

defer printRestoreMetrics()

var restoreError error
Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_incremental_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ echo "full backup start..."
run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --log-file $LOG

# when we backup, we should close domain in one shot session.
# so we can check the log count of `one shot domain closed` to be 2.
# so we can check the log count of `one shot domain closed` to be 1.
# we will call UseOneShotSession once to get the value global variable.
one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs)
one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs)
Expand Down
3 changes: 2 additions & 1 deletion pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/opt"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -167,7 +168,7 @@ func (c *testSplitClient) SplitWaitAndScatter(ctx context.Context, region *split
return newRegions, err
}

func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*split.RegionInfo, error) {
func (c *testSplitClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...opt.GetRegionOption) ([]*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down