Skip to content

Commit acdac74

Browse files
authored
lightning: fix pd http request using old address (pingcap#45680) (pingcap#45726)
close pingcap#43436
1 parent b003558 commit acdac74

File tree

9 files changed

+67
-30
lines changed

9 files changed

+67
-30
lines changed

br/pkg/lightning/backend/local/local.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,15 +267,15 @@ func (*encodingBuilder) MakeEmptyRows() encode.Rows {
267267
type targetInfoGetter struct {
268268
tls *common.TLS
269269
targetDB *sql.DB
270-
pdAddr string
270+
pdCli pd.Client
271271
}
272272

273273
// NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
274-
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdAddr string) backend.TargetInfoGetter {
274+
func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.TargetInfoGetter {
275275
return &targetInfoGetter{
276276
tls: tls,
277277
targetDB: db,
278-
pdAddr: pdAddr,
278+
pdCli: pdCli,
279279
}
280280
}
281281

@@ -296,10 +296,10 @@ func (g *targetInfoGetter) CheckRequirements(ctx context.Context, checkCtx *back
296296
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
297297
return err
298298
}
299-
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
299+
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinPDVersion, localMaxPDVersion); err != nil {
300300
return err
301301
}
302-
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
302+
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdCli.GetLeaderAddr(), localMinTiKVVersion, localMaxTiKVVersion); err != nil {
303303
return err
304304
}
305305

br/pkg/lightning/importer/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ go_test(
164164
"@com_github_stretchr_testify//require",
165165
"@com_github_stretchr_testify//suite",
166166
"@com_github_tikv_client_go_v2//config",
167+
"@com_github_tikv_client_go_v2//testutils",
168+
"@com_github_tikv_pd_client//:client",
167169
"@com_github_xitongsys_parquet_go//writer",
168170
"@com_github_xitongsys_parquet_go_source//buffer",
169171
"@io_etcd_go_etcd_client_v3//:client",

br/pkg/lightning/importer/checksum_helper.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/pingcap/tidb/br/pkg/lightning/metric"
2727
"github.com/pingcap/tidb/br/pkg/pdutil"
2828
"github.com/pingcap/tidb/kv"
29-
pd "github.com/tikv/pd/client"
3029
"go.uber.org/zap"
3130
)
3231

@@ -37,21 +36,14 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
3736
return nil, nil
3837
}
3938

40-
pdAddr := rc.cfg.TiDB.PdAddr
41-
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, pdAddr)
39+
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, rc.pdCli.GetLeaderAddr())
4240
if err != nil {
4341
return nil, errors.Trace(err)
4442
}
4543

4644
// for v4.0.0 or upper, we can use the gc ttl api
4745
var manager local.ChecksumManager
4846
if pdVersion.Major >= 4 && !rc.cfg.PostRestore.ChecksumViaSQL {
49-
tlsOpt := rc.tls.ToPDSecurityOption()
50-
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
51-
if err != nil {
52-
return nil, errors.Trace(err)
53-
}
54-
5547
backoffWeight, err := common.GetBackoffWeightFromDB(ctx, rc.db)
5648
// only set backoff weight when it's smaller than default value
5749
if err == nil && backoffWeight >= local.DefaultBackoffWeight {
@@ -60,7 +52,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
6052
log.FromContext(ctx).Info("set tidb_backoff_weight to default", zap.Int("backoff_weight", local.DefaultBackoffWeight))
6153
backoffWeight = local.DefaultBackoffWeight
6254
}
63-
manager = local.NewTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
55+
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight)
6456
} else {
6557
manager = local.NewTiDBChecksumExecutor(rc.db)
6658
}

br/pkg/lightning/importer/get_pre_info.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/pingcap/tidb/types"
5151
"github.com/pingcap/tidb/util/dbterror"
5252
"github.com/pingcap/tidb/util/mock"
53+
pd "github.com/tikv/pd/client"
5354
"go.uber.org/zap"
5455
"golang.org/x/exp/maps"
5556
)
@@ -123,12 +124,14 @@ type TargetInfoGetterImpl struct {
123124
db *sql.DB
124125
tls *common.TLS
125126
backend backend.TargetInfoGetter
127+
pdCli pd.Client
126128
}
127129

128130
// NewTargetInfoGetterImpl creates a TargetInfoGetterImpl object.
129131
func NewTargetInfoGetterImpl(
130132
cfg *config.Config,
131133
targetDB *sql.DB,
134+
pdCli pd.Client,
132135
) (*TargetInfoGetterImpl, error) {
133136
tls, err := cfg.ToTLS()
134137
if err != nil {
@@ -139,7 +142,10 @@ func NewTargetInfoGetterImpl(
139142
case config.BackendTiDB:
140143
backendTargetInfoGetter = tidb.NewTargetInfoGetter(targetDB)
141144
case config.BackendLocal:
142-
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, cfg.TiDB.PdAddr)
145+
if pdCli == nil {
146+
return nil, common.ErrUnknown.GenWithStack("pd client is required when using local backend")
147+
}
148+
backendTargetInfoGetter = local.NewTargetInfoGetter(tls, targetDB, pdCli)
143149
default:
144150
return nil, common.ErrUnknownBackend.GenWithStackByArgs(cfg.TikvImporter.Backend)
145151
}
@@ -148,6 +154,7 @@ func NewTargetInfoGetterImpl(
148154
tls: tls,
149155
db: targetDB,
150156
backend: backendTargetInfoGetter,
157+
pdCli: pdCli,
151158
}, nil
152159
}
153160

@@ -229,7 +236,7 @@ func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Contex
229236
// It uses the PD interface through TLS to get the information.
230237
func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error) {
231238
result := new(pdtypes.ReplicationConfig)
232-
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdReplicate, &result); err != nil {
239+
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdReplicate, &result); err != nil {
233240
return nil, errors.Trace(err)
234241
}
235242
return result, nil
@@ -240,7 +247,7 @@ func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtyp
240247
// It uses the PD interface through TLS to get the information.
241248
func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.StoresInfo, error) {
242249
result := new(pdtypes.StoresInfo)
243-
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil {
250+
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdStores, result); err != nil {
244251
return nil, errors.Trace(err)
245252
}
246253
return result, nil
@@ -251,7 +258,7 @@ func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.Sto
251258
// It uses the PD interface through TLS to get the information.
252259
func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtypes.RegionsInfo, error) {
253260
result := new(pdtypes.RegionsInfo)
254-
if err := g.tls.WithHost(g.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
261+
if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
255262
return nil, errors.Trace(err)
256263
}
257264
return result, nil

br/pkg/lightning/importer/get_pre_info_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,10 @@ func TestGetPreInfoIsTableEmpty(t *testing.T) {
757757
require.NoError(t, err)
758758
lnConfig := config.NewConfig()
759759
lnConfig.TikvImporter.Backend = config.BackendLocal
760-
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db)
760+
_, err = NewTargetInfoGetterImpl(lnConfig, db, nil)
761+
require.ErrorContains(t, err, "pd client is required when using local backend")
762+
lnConfig.TikvImporter.Backend = config.BackendTiDB
763+
targetGetter, err := NewTargetInfoGetterImpl(lnConfig, db, nil)
761764
require.NoError(t, err)
762765
require.Equal(t, lnConfig, targetGetter.cfg)
763766

br/pkg/lightning/importer/import.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ type Controller struct {
203203
engineMgr backend.EngineManager
204204
backend backend.Backend
205205
db *sql.DB
206+
pdCli pd.Client
206207

207208
alterTableLock sync.Mutex
208209
sysVars map[string]string
@@ -332,6 +333,7 @@ func NewImportControllerWithPauser(
332333

333334
var encodingBuilder encode.EncodingBuilder
334335
var backendObj backend.Backend
336+
var pdCli pd.Client
335337
switch cfg.TikvImporter.Backend {
336338
case config.BackendTiDB:
337339
encodingBuilder = tidb.NewEncodingBuilder()
@@ -347,9 +349,13 @@ func NewImportControllerWithPauser(
347349
if maxOpenFiles < 0 {
348350
maxOpenFiles = math.MaxInt32
349351
}
352+
pdCli, err = pd.NewClientWithContext(ctx, []string{cfg.TiDB.PdAddr}, tls.ToPDSecurityOption())
353+
if err != nil {
354+
return nil, errors.Trace(err)
355+
}
350356

351357
if cfg.TikvImporter.DuplicateResolution != config.DupeResAlgNone {
352-
if err := tikv.CheckTiKVVersion(ctx, tls, cfg.TiDB.PdAddr, minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
358+
if err := tikv.CheckTiKVVersion(ctx, tls, pdCli.GetLeaderAddr(), minTiKVVersionForDuplicateResolution, maxTiKVVersionForDuplicateResolution); err != nil {
353359
if berrors.Is(err, berrors.ErrVersionMismatch) {
354360
log.FromContext(ctx).Warn("TiKV version doesn't support duplicate resolution. The resolution algorithm will fall back to 'none'", zap.Error(err))
355361
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone
@@ -399,7 +405,7 @@ func NewImportControllerWithPauser(
399405

400406
var wrapper backend.TargetInfoGetter
401407
if cfg.TikvImporter.Backend == config.BackendLocal {
402-
wrapper = local.NewTargetInfoGetter(tls, db, cfg.TiDB.PdAddr)
408+
wrapper = local.NewTargetInfoGetter(tls, db, pdCli)
403409
} else {
404410
wrapper = tidb.NewTargetInfoGetter(db)
405411
}
@@ -409,6 +415,7 @@ func NewImportControllerWithPauser(
409415
db: db,
410416
tls: tls,
411417
backend: wrapper,
418+
pdCli: pdCli,
412419
}
413420
preInfoGetter, err := NewPreImportInfoGetter(
414421
cfg,
@@ -438,6 +445,7 @@ func NewImportControllerWithPauser(
438445
pauser: p.Pauser,
439446
engineMgr: backend.MakeEngineManager(backendObj),
440447
backend: backendObj,
448+
pdCli: pdCli,
441449
db: db,
442450
sysVars: common.DefaultImportantVariables,
443451
tls: tls,
@@ -473,6 +481,9 @@ func NewImportControllerWithPauser(
473481
func (rc *Controller) Close() {
474482
rc.backend.Close()
475483
_ = rc.db.Close()
484+
if rc.pdCli != nil {
485+
rc.pdCli.Close()
486+
}
476487
}
477488

478489
// Run starts the restore task.
@@ -1925,7 +1936,7 @@ func (rc *Controller) fullCompact(ctx context.Context) error {
19251936
}
19261937

19271938
func (rc *Controller) doCompact(ctx context.Context, level int32) error {
1928-
tls := rc.tls.WithHost(rc.cfg.TiDB.PdAddr)
1939+
tls := rc.tls.WithHost(rc.pdCli.GetLeaderAddr())
19291940
return tikv.ForAllStores(
19301941
ctx,
19311942
tls,

br/pkg/lightning/importer/precheck.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
ropts "github.com/pingcap/tidb/br/pkg/lightning/importer/opts"
1010
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
1111
"github.com/pingcap/tidb/br/pkg/lightning/precheck"
12+
pd "github.com/tikv/pd/client"
1213
)
1314

1415
type precheckContextKey string
@@ -29,7 +30,8 @@ type PrecheckItemBuilder struct {
2930
}
3031

3132
// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
32-
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
33+
// pdCli **must not** be nil for local backend
34+
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
3335
var gerr error
3436
builderCfg := new(ropts.PrecheckItemBuilderConfig)
3537
for _, o := range opts {
@@ -39,7 +41,7 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, o
3941
if err != nil {
4042
return nil, errors.Trace(err)
4143
}
42-
targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB)
44+
targetInfoGetter, err := NewTargetInfoGetterImpl(cfg, targetDB, pdCli)
4345
if err != nil {
4446
return nil, errors.Trace(err)
4547
}

br/pkg/lightning/importer/table_import_test.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ import (
7070
filter "github.com/pingcap/tidb/util/table-filter"
7171
"github.com/stretchr/testify/require"
7272
"github.com/stretchr/testify/suite"
73+
"github.com/tikv/client-go/v2/testutils"
74+
pd "github.com/tikv/pd/client"
7375
)
7476

7577
const (
@@ -1162,6 +1164,8 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
11621164
require.NoError(s.T(), err)
11631165
mockStore, err := storage.NewLocalStorage(dir)
11641166
require.NoError(s.T(), err)
1167+
_, _, pdClient, err := testutils.NewMockTiKV("", nil)
1168+
require.NoError(s.T(), err)
11651169
for _, ca := range cases {
11661170
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
11671171
var err error
@@ -1178,9 +1182,11 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
11781182

11791183
url := strings.TrimPrefix(server.URL, "https://")
11801184
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
1185+
pdCli := &mockPDClient{Client: pdClient, leaderAddr: url}
11811186
targetInfoGetter := &TargetInfoGetterImpl{
1182-
cfg: cfg,
1183-
tls: tls,
1187+
cfg: cfg,
1188+
tls: tls,
1189+
pdCli: pdCli,
11841190
}
11851191
preInfoGetter := &PreImportInfoGetterImpl{
11861192
cfg: cfg,
@@ -1195,6 +1201,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
11951201
checkTemplate: template,
11961202
preInfoGetter: preInfoGetter,
11971203
precheckItemBuilder: theCheckBuilder,
1204+
pdCli: pdCli,
11981205
}
11991206
var sourceSize int64
12001207
err = rc.store.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
@@ -1231,6 +1238,15 @@ func (mockTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(ta
12311238
return err
12321239
}
12331240

1241+
type mockPDClient struct {
1242+
pd.Client
1243+
leaderAddr string
1244+
}
1245+
1246+
func (m *mockPDClient) GetLeaderAddr() string {
1247+
return m.leaderAddr
1248+
}
1249+
12341250
func (s *tableRestoreSuite) TestCheckClusterRegion() {
12351251
type testCase struct {
12361252
stores pdtypes.StoresInfo
@@ -1246,6 +1262,8 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
12461262
}
12471263
return regions
12481264
}
1265+
_, _, pdClient, err := testutils.NewMockTiKV("", nil)
1266+
require.NoError(s.T(), err)
12491267

12501268
testCases := []testCase{
12511269
{
@@ -1321,10 +1339,12 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
13211339

13221340
url := strings.TrimPrefix(server.URL, "https://")
13231341
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
1342+
pdCli := &mockPDClient{Client: pdClient, leaderAddr: url}
13241343

13251344
targetInfoGetter := &TargetInfoGetterImpl{
1326-
cfg: cfg,
1327-
tls: tls,
1345+
cfg: cfg,
1346+
tls: tls,
1347+
pdCli: pdCli,
13281348
}
13291349
dbMetas := []*mydump.MDDatabaseMeta{}
13301350
preInfoGetter := &PreImportInfoGetterImpl{
@@ -1341,6 +1361,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
13411361
preInfoGetter: preInfoGetter,
13421362
dbInfos: make(map[string]*checkpoints.TidbDBInfo),
13431363
precheckItemBuilder: theCheckBuilder,
1364+
pdCli: pdCli,
13441365
}
13451366

13461367
preInfoGetter.dbInfosCache = rc.dbInfos

executor/importer/table_import.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ func prepareSortDir(e *LoadDataController, jobID int64) (string, error) {
5252
tidbCfg := tidb.GetGlobalConfig()
5353
sortPathSuffix := "import-" + strconv.Itoa(int(tidbCfg.Port))
5454
sortPath := filepath.Join(tidbCfg.TempDir, sortPathSuffix, strconv.FormatInt(jobID, 10))
55-
5655
if info, err := os.Stat(sortPath); err != nil {
5756
if !os.IsNotExist(err) {
5857
e.logger.Error("stat sort dir failed", zap.String("path", sortPath), zap.Error(err))

0 commit comments

Comments
 (0)