Skip to content

Commit ca395fa

Browse files
authored
lightning: add precheck that PD and TiDB must belongs to same cluster (#57709)
close #57704
1 parent b2e829c commit ca395fa

File tree

14 files changed

+234
-16
lines changed

14 files changed

+234
-16
lines changed

errors.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,11 @@ error = '''
526526
check local storage resource error
527527
'''
528528

529+
["Lightning:PreCheck:ErrCheckPDTiDBSameCluster"]
530+
error = '''
531+
check PD and TiDB in the same cluster error
532+
'''
533+
529534
["Lightning:PreCheck:ErrCheckTableEmpty"]
530535
error = '''
531536
check table empty error

lightning/pkg/importer/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ go_library(
6666
"//pkg/table/tables",
6767
"//pkg/tablecodec",
6868
"//pkg/types",
69+
"//pkg/util",
6970
"//pkg/util/cdcutil",
7071
"//pkg/util/codec",
7172
"//pkg/util/collate",

lightning/pkg/importer/check_info.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,3 +159,10 @@ func (rc *Controller) checkCDCPiTR(ctx context.Context) error {
159159
}
160160
return rc.doPreCheckOnItem(ctx, precheck.CheckTargetUsingCDCPITR)
161161
}
162+
163+
func (rc *Controller) checkPDTiDBFromSameCluster(ctx context.Context) error {
164+
if rc.cfg.TikvImporter.Backend == config.BackendTiDB {
165+
return nil
166+
}
167+
return rc.doPreCheckOnItem(ctx, precheck.CheckPDTiDBFromSameCluster)
168+
}

lightning/pkg/importer/check_info_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ func TestCheckCSVHeader(t *testing.T) {
413413
preInfoGetter,
414414
nil,
415415
nil,
416+
nil,
416417
)
417418
preInfoGetter.dbInfosCache = rc.dbInfos
418419
err = rc.checkCSVHeader(ctx)
@@ -467,6 +468,7 @@ func TestCheckTableEmpty(t *testing.T) {
467468
preInfoGetter,
468469
nil,
469470
nil,
471+
nil,
470472
)
471473

472474
rc := &Controller{
@@ -625,6 +627,7 @@ func TestLocalResource(t *testing.T) {
625627
preInfoGetter,
626628
nil,
627629
nil,
630+
nil,
628631
)
629632
rc := &Controller{
630633
cfg: cfg,

lightning/pkg/importer/import.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func NewImportControllerWithPauser(
479479
}
480480

481481
preCheckBuilder := NewPrecheckItemBuilder(
482-
cfg, p.DBMetas, preInfoGetter, cpdb, pdHTTPCli,
482+
cfg, p.DBMetas, preInfoGetter, cpdb, pdHTTPCli, db,
483483
)
484484

485485
rc := &Controller{
@@ -1908,6 +1908,9 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
19081908
if err := rc.checkClusterRegion(ctx); err != nil {
19091909
return common.ErrCheckClusterRegion.Wrap(err).GenWithStackByArgs()
19101910
}
1911+
if err := rc.checkPDTiDBFromSameCluster(ctx); err != nil {
1912+
return common.ErrCheckPDTiDBFromSameCluster.Wrap(err).GenWithStackByArgs()
1913+
}
19111914
}
19121915
// even if checkpoint exists, we still need to make sure CDC/PiTR task is not running.
19131916
if err := rc.checkCDCPiTR(ctx); err != nil {

lightning/pkg/importer/import_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func TestPreCheckFailed(t *testing.T) {
221221
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
222222
}
223223
cpdb := panicCheckpointDB{}
224-
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
224+
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil, db)
225225
ctl := &Controller{
226226
cfg: cfg,
227227
saveCpCh: make(chan saveCp),

lightning/pkg/importer/precheck.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package importer
1616

1717
import (
1818
"context"
19+
"database/sql"
1920

2021
"github.com/pingcap/errors"
2122
ropts "github.com/pingcap/tidb/lightning/pkg/importer/opts"
@@ -42,6 +43,7 @@ type PrecheckItemBuilder struct {
4243
preInfoGetter PreImportInfoGetter
4344
checkpointsDB checkpoints.DB
4445
pdAddrsGetter func(context.Context) []string
46+
targetDB *sql.DB
4547
}
4648

4749
// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
@@ -91,7 +93,7 @@ func NewPrecheckItemBuilderFromConfig(
9193
if err != nil {
9294
return nil, errors.Trace(err)
9395
}
94-
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdHTTPCli), gerr
96+
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdHTTPCli, targetDB), gerr
9597
}
9698

9799
// NewPrecheckItemBuilder creates a new PrecheckItemBuilder
@@ -101,6 +103,7 @@ func NewPrecheckItemBuilder(
101103
preInfoGetter PreImportInfoGetter,
102104
checkpointsDB checkpoints.DB,
103105
pdHTTPCli pdhttp.Client,
106+
targetDB *sql.DB,
104107
) *PrecheckItemBuilder {
105108
pdAddrsGetter := func(context.Context) []string {
106109
return []string{cfg.TiDB.PdAddr}
@@ -125,6 +128,7 @@ func NewPrecheckItemBuilder(
125128
preInfoGetter: preInfoGetter,
126129
checkpointsDB: checkpointsDB,
127130
pdAddrsGetter: pdAddrsGetter,
131+
targetDB: targetDB,
128132
}
129133
}
130134

@@ -157,6 +161,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p
157161
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
158162
case precheck.CheckTargetUsingCDCPITR:
159163
return NewCDCPITRCheckItem(b.cfg, b.pdAddrsGetter), nil
164+
case precheck.CheckPDTiDBFromSameCluster:
165+
return NewPDTiDBFromSameClusterCheckItem(b.targetDB, b.pdAddrsGetter), nil
160166
default:
161167
return nil, errors.Errorf("unsupported check item: %v", checkID)
162168
}

lightning/pkg/importer/precheck_impl.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ package importer
1717
import (
1818
"cmp"
1919
"context"
20+
"database/sql"
2021
"fmt"
22+
"net/url"
2123
"path/filepath"
2224
"reflect"
2325
"slices"
@@ -43,6 +45,7 @@ import (
4345
"github.com/pingcap/tidb/pkg/parser/mysql"
4446
"github.com/pingcap/tidb/pkg/table"
4547
"github.com/pingcap/tidb/pkg/types"
48+
"github.com/pingcap/tidb/pkg/util"
4649
"github.com/pingcap/tidb/pkg/util/cdcutil"
4750
"github.com/pingcap/tidb/pkg/util/engine"
4851
"github.com/pingcap/tidb/pkg/util/set"
@@ -1433,3 +1436,74 @@ func hasDefault(col *model.ColumnInfo) bool {
14331436
return col.DefaultIsExpr || col.DefaultValue != nil || !mysql.HasNotNullFlag(col.GetFlag()) ||
14341437
col.IsGenerated() || mysql.HasAutoIncrementFlag(col.GetFlag())
14351438
}
1439+
1440+
// pdTiDBFromSameClusterCheckItem provides two sources of PD addresses and use
1441+
// util.CheckIfSameCluster to check if they are from the same cluster.
1442+
//
1443+
// The first source stands for PD leader's all etcd client URL addresses in most
1444+
// time, the second source stands for all PD nodes' first etcd client URL
1445+
// addresses.
1446+
//
1447+
// If we can't reach PD leader, the first source will be replaced by the PD
1448+
// address set in lightning's task configuration, or in TiDB's configuration.
1449+
// Then it may have false alert if PD has multiple endpoints and above
1450+
// configuration uses one of them, while etcd information uses another one, and
1451+
// there are no common addresses passed to util.CheckIfSameCluster.
1452+
type pdTiDBFromSameClusterCheckItem struct {
1453+
db *sql.DB
1454+
pdAddrsGetter func(context.Context) []string
1455+
}
1456+
1457+
// NewPDTiDBFromSameClusterCheckItem creates a new pdTiDBFromSameClusterCheckItem.
1458+
func NewPDTiDBFromSameClusterCheckItem(
1459+
db *sql.DB,
1460+
pdAddrsGetter func(context.Context) []string,
1461+
) precheck.Checker {
1462+
return &pdTiDBFromSameClusterCheckItem{
1463+
db: db,
1464+
pdAddrsGetter: pdAddrsGetter,
1465+
}
1466+
}
1467+
1468+
func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.CheckResult, error) {
1469+
theResult := &precheck.CheckResult{
1470+
Item: i.GetCheckItemID(),
1471+
Severity: precheck.Critical,
1472+
Passed: true,
1473+
Message: "PD and TiDB in configuration are from the same cluster",
1474+
}
1475+
1476+
pdLeaderAddrsGetter := func(ctx context.Context) ([]string, error) {
1477+
addrs := i.pdAddrsGetter(ctx)
1478+
for idx, addrURL := range addrs {
1479+
u, err2 := url.Parse(addrURL)
1480+
if err2 != nil {
1481+
return nil, errors.Trace(err2)
1482+
}
1483+
addrs[idx] = u.Host
1484+
}
1485+
return addrs, nil
1486+
}
1487+
1488+
sameCluster, pdAddrs, pdAddrsFromTiDB, err := util.CheckIfSameCluster(
1489+
ctx, pdLeaderAddrsGetter, util.GetPDsAddrWithoutScheme(i.db),
1490+
)
1491+
if err != nil {
1492+
return nil, errors.Trace(err)
1493+
}
1494+
if sameCluster {
1495+
return theResult, nil
1496+
}
1497+
1498+
theResult.Passed = false
1499+
theResult.Message = fmt.Sprintf(
1500+
"PD and TiDB in configuration are not from the same cluster, "+
1501+
"PD addresses read from PD are: %v, PD addresses read from TiDB are %v",
1502+
pdAddrs, pdAddrsFromTiDB,
1503+
)
1504+
return theResult, nil
1505+
}
1506+
1507+
func (*pdTiDBFromSameClusterCheckItem) GetCheckItemID() precheck.CheckItemID {
1508+
return precheck.CheckPDTiDBFromSameCluster
1509+
}

lightning/pkg/importer/precheck_impl_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"testing"
2121

22+
"github.com/DATA-DOG/go-sqlmock"
2223
"github.com/docker/go-units"
2324
"github.com/pingcap/tidb/br/pkg/storage"
2425
"github.com/pingcap/tidb/br/pkg/streamhelper"
@@ -683,3 +684,49 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
683684
s.Require().True(result.Passed)
684685
s.Require().Equal("TiDB Lightning is not using local backend, skip this check", result.Message)
685686
}
687+
688+
func (s *precheckImplSuite) TestPDTiDBFromSameCluster() {
689+
ctx := context.Background()
690+
db, mock, err := sqlmock.New()
691+
s.Require().NoError(err)
692+
pdAddrGetter := func(ctx context.Context) []string {
693+
return []string{"https://1.2.3.4:2379", "http://127.0.0.1:2379"}
694+
}
695+
696+
// check wrong host and port
697+
mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`).
698+
WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}).
699+
AddRow("1.2.3.4:2380").AddRow("10.20.30.40:2379"),
700+
)
701+
702+
checker := NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter)
703+
result, err := checker.Check(ctx)
704+
s.Require().NoError(err)
705+
s.Require().False(result.Passed)
706+
s.Require().Equal(
707+
"PD and TiDB in configuration are not from the same cluster, "+
708+
"PD addresses read from PD are: [1.2.3.4:2379 127.0.0.1:2379], "+
709+
"PD addresses read from TiDB are [1.2.3.4:2380 10.20.30.40:2379]",
710+
result.Message)
711+
712+
// check partial match is enough
713+
mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`).
714+
WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}).
715+
AddRow("1.2.3.4:2379"),
716+
)
717+
checker = NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter)
718+
result, err = checker.Check(ctx)
719+
s.Require().NoError(err)
720+
s.Require().True(result.Passed)
721+
722+
mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`).
723+
WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}).
724+
AddRow("2.3.4.5:2379").AddRow("3.4.5.6:2379").AddRow("1.2.3.4:2379"),
725+
)
726+
checker = NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter)
727+
result, err = checker.Check(ctx)
728+
s.Require().NoError(err)
729+
s.Require().True(result.Passed)
730+
731+
s.Require().NoError(mock.ExpectationsWereMet())
732+
}

lightning/pkg/importer/precheck_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {
3333

3434
preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
3535
require.NoError(t, err)
36-
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
36+
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil, nil)
3737
for _, checkItemID := range []precheck.CheckItemID{
3838
precheck.CheckLargeDataFile,
3939
precheck.CheckSourcePermission,

0 commit comments

Comments
 (0)