Skip to content

Commit 4175bb0

Browse files
tangentati-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#53720
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 42669ad commit 4175bb0

File tree

8 files changed

+1420
-8
lines changed

8 files changed

+1420
-8
lines changed

ddl/ddl.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/pingcap/errors"
3636
"github.com/pingcap/failpoint"
3737
"github.com/pingcap/kvproto/pkg/kvrpcpb"
38+
<<<<<<< HEAD:ddl/ddl.go
3839
"github.com/pingcap/tidb/config"
3940
"github.com/pingcap/tidb/ddl/ingest"
4041
"github.com/pingcap/tidb/ddl/syncer"
@@ -64,6 +65,40 @@ import (
6465
"github.com/pingcap/tidb/util/logutil"
6566
"github.com/pingcap/tidb/util/mathutil"
6667
"github.com/pingcap/tidb/util/sqlexec"
68+
=======
69+
"github.com/pingcap/tidb/pkg/config"
70+
"github.com/pingcap/tidb/pkg/ddl/ingest"
71+
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
72+
"github.com/pingcap/tidb/pkg/ddl/logutil"
73+
"github.com/pingcap/tidb/pkg/ddl/syncer"
74+
"github.com/pingcap/tidb/pkg/ddl/util"
75+
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
76+
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
77+
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
78+
"github.com/pingcap/tidb/pkg/domain/infosync"
79+
"github.com/pingcap/tidb/pkg/infoschema"
80+
"github.com/pingcap/tidb/pkg/kv"
81+
"github.com/pingcap/tidb/pkg/meta"
82+
"github.com/pingcap/tidb/pkg/meta/autoid"
83+
"github.com/pingcap/tidb/pkg/metrics"
84+
"github.com/pingcap/tidb/pkg/owner"
85+
"github.com/pingcap/tidb/pkg/parser/ast"
86+
"github.com/pingcap/tidb/pkg/parser/model"
87+
"github.com/pingcap/tidb/pkg/parser/mysql"
88+
"github.com/pingcap/tidb/pkg/parser/terror"
89+
"github.com/pingcap/tidb/pkg/sessionctx"
90+
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
91+
"github.com/pingcap/tidb/pkg/sessionctx/variable"
92+
"github.com/pingcap/tidb/pkg/statistics/handle"
93+
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
94+
"github.com/pingcap/tidb/pkg/table"
95+
pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client"
96+
tidbutil "github.com/pingcap/tidb/pkg/util"
97+
"github.com/pingcap/tidb/pkg/util/dbterror"
98+
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
99+
"github.com/pingcap/tidb/pkg/util/gcutil"
100+
"github.com/pingcap/tidb/pkg/util/generic"
101+
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/ddl.go
67102
"github.com/tikv/client-go/v2/tikvrpc"
68103
clientv3 "go.etcd.io/etcd/client/v3"
69104
atomicutil "go.uber.org/atomic"
@@ -361,11 +396,15 @@ type ddlCtx struct {
361396
runningJobs *runningJobs
362397

363398
// reorgCtx is used for reorganization.
399+
<<<<<<< HEAD:ddl/ddl.go
364400
reorgCtx struct {
365401
sync.RWMutex
366402
// reorgCtxMap maps job ID to reorg context.
367403
reorgCtxMap map[int64]*reorgCtx
368404
}
405+
=======
406+
reorgCtx reorgContexts
407+
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/ddl.go
369408

370409
jobCtx struct {
371410
sync.RWMutex

ddl/ddl_api_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ import (
2929
"github.com/pingcap/tidb/testkit"
3030
"github.com/pingcap/tidb/util/chunk"
3131
"github.com/stretchr/testify/require"
32+
<<<<<<< HEAD:ddl/ddl_api_test.go
3233
"golang.org/x/exp/slices"
34+
=======
35+
"golang.org/x/sync/errgroup"
36+
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/ddl_api_test.go
3337
)
3438

3539
func TestGetDDLJobs(t *testing.T) {
@@ -218,6 +222,46 @@ func enQueueDDLJobs(t *testing.T, sess session.Session, txn kv.Transaction, jobT
218222
}
219223
}
220224

225+
func TestCreateViewConcurrently(t *testing.T) {
226+
store := testkit.CreateMockStore(t)
227+
tk := testkit.NewTestKit(t, store)
228+
tk.MustExec("use test")
229+
230+
tk.MustExec("create table t (a int);")
231+
tk.MustExec("create view v as select * from t;")
232+
var (
233+
counterErr error
234+
counter int
235+
)
236+
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/onDDLCreateView", func(job *model.Job) {
237+
counter++
238+
if counter > 1 {
239+
counterErr = fmt.Errorf("create view job should not run concurrently")
240+
return
241+
}
242+
})
243+
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) {
244+
if job.Type == model.ActionCreateView {
245+
counter--
246+
}
247+
})
248+
var eg errgroup.Group
249+
for i := 0; i < 5; i++ {
250+
eg.Go(func() error {
251+
newTk := testkit.NewTestKit(t, store)
252+
_, err := newTk.Exec("use test")
253+
if err != nil {
254+
return err
255+
}
256+
_, err = newTk.Exec("create or replace view v as select * from t;")
257+
return err
258+
})
259+
}
260+
err := eg.Wait()
261+
require.NoError(t, err)
262+
require.NoError(t, counterErr)
263+
}
264+
221265
func TestCreateDropCreateTable(t *testing.T) {
222266
store, dom := testkit.CreateMockStoreAndDomain(t)
223267
tk := testkit.NewTestKit(t, store)

ddl/placement_policy.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func getPlacementPolicyByName(d *ddlCtx, t *meta.Meta, policyName model.CIStr) (
118118
}
119119

120120
is := d.infoCache.GetLatest()
121-
if is.SchemaMetaVersion() == currVer {
121+
if is != nil && is.SchemaMetaVersion() == currVer {
122122
// Use cached policy.
123123
policy, ok := is.PolicyByName(policyName)
124124
if ok {
@@ -319,8 +319,15 @@ func checkPlacementPolicyNotInUse(d *ddlCtx, t *meta.Meta, policy *model.PolicyI
319319
return err
320320
}
321321
is := d.infoCache.GetLatest()
322+
<<<<<<< HEAD:ddl/placement_policy.go
322323
if is.SchemaMetaVersion() == currVer {
323324
return CheckPlacementPolicyNotInUseFromInfoSchema(is, policy)
325+
=======
326+
if is != nil && is.SchemaMetaVersion() == currVer {
327+
err = CheckPlacementPolicyNotInUseFromInfoSchema(is, policy)
328+
} else {
329+
err = CheckPlacementPolicyNotInUseFromMeta(t, policy)
330+
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/placement_policy.go
324331
}
325332

326333
return CheckPlacementPolicyNotInUseFromMeta(t, policy)

ddl/schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model
7676
return err
7777
}
7878
is := d.infoCache.GetLatest()
79-
if is.SchemaMetaVersion() == currVer {
79+
if is != nil && is.SchemaMetaVersion() == currVer {
8080
return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo)
8181
}
8282
return checkSchemaNotExistsFromStore(t, schemaID, dbInfo)

ddl/table.go

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -273,14 +273,19 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
273273
schemaID := job.SchemaID
274274
tbInfo := &model.TableInfo{}
275275
var orReplace bool
276-
var oldTbInfoID int64
277-
if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil {
276+
var _placeholder int64 // oldTblInfoID
277+
if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil {
278278
// Invalid arguments, cancel this job.
279279
job.State = model.JobStateCancelled
280280
return ver, errors.Trace(err)
281281
}
282282
tbInfo.State = model.StateNone
283-
err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L)
283+
284+
oldTableID, err := findTableIDByName(d, t, schemaID, tbInfo.Name.L)
285+
if infoschema.ErrTableNotExists.Equal(err) {
286+
err = nil
287+
}
288+
failpoint.InjectCall("onDDLCreateView", job)
284289
if err != nil {
285290
if infoschema.ErrDatabaseNotExists.Equal(err) {
286291
job.State = model.JobStateCancelled
@@ -303,13 +308,18 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
303308
// none -> public
304309
tbInfo.State = model.StatePublic
305310
tbInfo.UpdateTS = t.StartTS
311+
<<<<<<< HEAD:ddl/table.go
306312
if oldTbInfoID > 0 && orReplace {
307313
err = t.DropTableOrView(schemaID, oldTbInfoID)
314+
=======
315+
if oldTableID > 0 && orReplace {
316+
err = t.DropTableOrView(schemaID, job.SchemaName, oldTableID, tbInfo.Name.L)
317+
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/table.go
308318
if err != nil {
309319
job.State = model.JobStateCancelled
310320
return ver, errors.Trace(err)
311321
}
312-
err = t.GetAutoIDAccessors(schemaID, oldTbInfoID).Del()
322+
err = t.GetAutoIDAccessors(schemaID, oldTableID).Del()
313323
if err != nil {
314324
job.State = model.JobStateCancelled
315325
return ver, errors.Trace(err)
@@ -1454,13 +1464,51 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri
14541464
return err
14551465
}
14561466
is := d.infoCache.GetLatest()
1457-
if is.SchemaMetaVersion() == currVer {
1467+
if is != nil && is.SchemaMetaVersion() == currVer {
14581468
return checkTableNotExistsFromInfoSchema(is, schemaID, tableName)
14591469
}
14601470

14611471
return checkTableNotExistsFromStore(t, schemaID, tableName)
14621472
}
14631473

1474+
<<<<<<< HEAD:ddl/table.go
1475+
=======
1476+
func checkTableNotExistsByName(d *ddlCtx, t *meta.Meta, schemaID int64, schemaName, tableName string) error {
1477+
// Try to use memory schema info to check first.
1478+
currVer, err := t.GetSchemaVersion()
1479+
if err != nil {
1480+
return err
1481+
}
1482+
is := d.infoCache.GetLatest()
1483+
if is != nil && is.SchemaMetaVersion() == currVer {
1484+
return checkTableNotExistsFromInfoSchema(is, schemaID, tableName)
1485+
}
1486+
return t.CheckTableNameNotExists(t.TableNameKey(schemaName, tableName))
1487+
}
1488+
1489+
func checkConstraintNamesNotExists(t *meta.Meta, schemaID int64, constraints []*model.ConstraintInfo) error {
1490+
if len(constraints) == 0 {
1491+
return nil
1492+
}
1493+
tbInfos, err := t.ListTables(schemaID)
1494+
if err != nil {
1495+
return err
1496+
}
1497+
1498+
for _, tb := range tbInfos {
1499+
for _, constraint := range constraints {
1500+
if constraint.State != model.StateWriteOnly {
1501+
if constraintInfo := tb.FindConstraintInfoByName(constraint.Name.L); constraintInfo != nil {
1502+
return infoschema.ErrCheckConstraintDupName.GenWithStackByArgs(constraint.Name.L)
1503+
}
1504+
}
1505+
}
1506+
}
1507+
1508+
return nil
1509+
}
1510+
1511+
>>>>>>> 44c9096efbc (ddl: get latest old table ID before replace view (#53720)):pkg/ddl/table.go
14641512
func checkTableIDNotExists(t *meta.Meta, schemaID, tableID int64) error {
14651513
tbl, err := t.GetTable(schemaID, tableID)
14661514
if err != nil {
@@ -1507,6 +1555,48 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string
15071555
return nil
15081556
}
15091557

1558+
func findTableIDByName(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) {
1559+
// Try to use memory schema info to check first.
1560+
currVer, err := t.GetSchemaVersion()
1561+
if err != nil {
1562+
return 0, err
1563+
}
1564+
is := d.infoCache.GetLatest()
1565+
if is != nil && is.SchemaMetaVersion() == currVer {
1566+
return findTableIDFromInfoSchema(is, schemaID, tableName)
1567+
}
1568+
1569+
return findTableIDFromStore(t, schemaID, tableName)
1570+
}
1571+
1572+
func findTableIDFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) (int64, error) {
1573+
schema, ok := is.SchemaByID(schemaID)
1574+
if !ok {
1575+
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
1576+
}
1577+
tbl, err := is.TableByName(schema.Name, model.NewCIStr(tableName))
1578+
if err != nil {
1579+
return 0, err
1580+
}
1581+
return tbl.Meta().ID, nil
1582+
}
1583+
1584+
func findTableIDFromStore(t *meta.Meta, schemaID int64, tableName string) (int64, error) {
1585+
tbls, err := t.ListSimpleTables(schemaID)
1586+
if err != nil {
1587+
if meta.ErrDBNotExists.Equal(err) {
1588+
return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
1589+
}
1590+
return 0, errors.Trace(err)
1591+
}
1592+
for _, tbl := range tbls {
1593+
if tbl.Name.L == tableName {
1594+
return tbl.ID, nil
1595+
}
1596+
}
1597+
return 0, infoschema.ErrTableNotExists.FastGenByArgs(tableName)
1598+
}
1599+
15101600
// updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information
15111601
func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) (
15121602
ver int64, err error) {

ddl/table_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,8 @@ func TestCreateView(t *testing.T) {
308308
}
309309
ctx.SetValue(sessionctx.QueryString, "skip")
310310
err = d.DoDDLJob(ctx, job)
311-
require.Error(t, err)
311+
// The non-existing table id in job args will not be considered anymore.
312+
require.NoError(t, err)
312313
}
313314

314315
func checkTableCacheTest(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {

0 commit comments

Comments
 (0)