Skip to content

Commit 33a4052

Browse files
authored
br: Add pre-check of duplicate table in the downstream (#55044) (#59587)
close #55087
1 parent 48e8060 commit 33a4052

File tree

9 files changed

+86
-1
lines changed

9 files changed

+86
-1
lines changed

br/pkg/errors/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ var (
6868
ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys"))
6969
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))
7070
ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted"))
71+
ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted"))
7172

7273
// ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently.
7374
ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist"))

br/pkg/glue/glue.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ import (
1313
pd "github.com/tikv/pd/client"
1414
)
1515

16+
type GlueClient int
17+
18+
const (
19+
ClientCLP GlueClient = iota
20+
ClientSql
21+
)
22+
1623
// Glue is an abstraction of TiDB function calls used in BR.
1724
type Glue interface {
1825
GetDomain(store kv.Storage) (*domain.Domain, error)
@@ -36,6 +43,9 @@ type Glue interface {
3643
// we can close domain as soon as possible.
3744
// and we must reuse the exists session and don't close it in SQL backup job.
3845
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error
46+
47+
// GetClient returns the client type of the glue
48+
GetClient() GlueClient
3949
}
4050

4151
// Session is an abstraction of the session.Session interface.

br/pkg/gluetidb/glue.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue
157157
return nil
158158
}
159159

160+
func (Glue) GetClient() glue.GlueClient {
161+
return glue.ClientCLP
162+
}
163+
160164
// GetSessionCtx implements glue.Glue
161165
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
162166
return gs.se
@@ -366,3 +370,7 @@ func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func
366370
}
367371
return fn(glueSession)
368372
}
373+
374+
func (m *MockGlue) GetClient() glue.GlueClient {
375+
return glue.ClientCLP
376+
}

br/pkg/gluetikv/glue.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ func (Glue) GetVersion() string {
7373
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
7474
return nil
7575
}
76+
77+
func (Glue) GetClient() glue.GlueClient {
78+
return glue.ClientCLP
79+
}

br/pkg/task/restore.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
147147
_ = flags.MarkHidden(FlagPDConcurrency)
148148
_ = flags.MarkHidden(FlagBatchFlushInterval)
149149
_ = flags.MarkHidden(FlagDdlBatchSize)
150+
_ = flags.MarkHidden(flagUseFSR)
150151
}
151152

152153
// ParseFromFlags parses the config from the flag set.
@@ -1089,7 +1090,7 @@ func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.
10891090
}
10901091
}
10911092
if !allUnique {
1092-
return errors.Errorf("Target tables already existed: %s", message)
1093+
return errors.Annotate(berrors.ErrTablesAlreadyExisted, message)
10931094
}
10941095
return nil
10951096
}

errors.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,11 @@ error = '''
281281
failed to write and ingest
282282
'''
283283

284+
["BR:Restore:ErrTablesAlreadyExisted"]
285+
error = '''
286+
tables already existed in restored cluster
287+
'''
288+
284289
["BR:Restore:ErrUnsupportedSysTable"]
285290
error = '''
286291
the system table isn't supported for restoring yet

pkg/executor/brie.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,10 @@ func (gs *tidbGlue) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Sess
710710
return fn(glueSession)
711711
}
712712

713+
func (*tidbGlue) GetClient() glue.GlueClient {
714+
return glue.ClientSql
715+
}
716+
713717
type tidbGlueSession struct {
714718
// the session context of the brie task's subtask, such as `CREATE TABLE`.
715719
se sessionctx.Context

tests/realtikvtest/brietest/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_test(
2020
"//pkg/config",
2121
"//pkg/executor",
2222
"//pkg/parser/mysql",
23+
"//pkg/session",
2324
"//pkg/sessionctx/binloginfo",
2425
"//pkg/store/mockstore/mockcopr",
2526
"//pkg/testkit",

tests/realtikvtest/brietest/brie_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package brietest
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"os"
2021
"strings"
@@ -24,6 +25,7 @@ import (
2425
"github.com/pingcap/failpoint"
2526
"github.com/pingcap/log"
2627
"github.com/pingcap/tidb/pkg/executor"
28+
"github.com/pingcap/tidb/pkg/session"
2729
"github.com/pingcap/tidb/pkg/testkit"
2830
"github.com/stretchr/testify/require"
2931
"go.uber.org/zap/zapcore"
@@ -60,6 +62,7 @@ func TestShowBackupQuery(t *testing.T) {
6062
restoreQuery := fmt.Sprintf("RESTORE TABLE `test`.`foo` FROM 'local://%s'", sqlTmp)
6163
tk.MustQuery(restoreQuery)
6264
res = tk.MustQuery("show br job query 2;")
65+
tk.MustExec("drop table foo;")
6366
res.CheckContain(restoreQuery)
6467
}
6568

@@ -68,6 +71,7 @@ func TestShowBackupQueryRedact(t *testing.T) {
6871

6972
executor.ResetGlobalBRIEQueueForTest()
7073
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return")
74+
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie")
7175
ch := make(chan any)
7276
go func() {
7377
tk := testkit.NewTestKit(t, tk.Session().GetStore())
@@ -102,6 +106,7 @@ func TestCancel(t *testing.T) {
102106
executor.ResetGlobalBRIEQueueForTest()
103107
tk.MustExec("use test;")
104108
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return")
109+
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie")
105110

106111
req := require.New(t)
107112
ch := make(chan struct{})
@@ -126,3 +131,49 @@ func TestCancel(t *testing.T) {
126131
req.FailNow("the backup job doesn't be canceled")
127132
}
128133
}
134+
135+
func TestExistedTables(t *testing.T) {
136+
tk := initTestKit(t)
137+
tmp := makeTempDirForBackup(t)
138+
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
139+
executor.ResetGlobalBRIEQueueForTest()
140+
tk.MustExec("use test;")
141+
for i := 0; i < 10; i++ {
142+
tableName := fmt.Sprintf("foo%d", i)
143+
tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName))
144+
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
145+
}
146+
147+
done := make(chan struct{})
148+
go func() {
149+
defer close(done)
150+
backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s'", sqlTmp)
151+
_ = tk.MustQuery(backupQuery)
152+
}()
153+
select {
154+
case <-time.After(20 * time.Second):
155+
t.Fatal("Backup operation exceeded")
156+
case <-done:
157+
}
158+
159+
done = make(chan struct{})
160+
go func() {
161+
defer close(done)
162+
restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s'", sqlTmp)
163+
res, err := tk.Exec(restoreQuery)
164+
require.NoError(t, err)
165+
166+
_, err = session.ResultSetToStringSlice(context.Background(), tk.Session(), res)
167+
require.ErrorContains(t, err, "table already exists")
168+
}()
169+
select {
170+
case <-time.After(20 * time.Second):
171+
t.Fatal("Restore operation exceeded")
172+
case <-done:
173+
}
174+
175+
for i := 0; i < 10; i++ {
176+
tableName := fmt.Sprintf("foo%d", i)
177+
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
178+
}
179+
}

0 commit comments

Comments
 (0)