Skip to content

Commit ba8e845

Browse files
authored
ddl: fix concurrent column type changes(with changing data) that cause schema and data inconsistencies (#31051) (#31070)
close #31048
1 parent abb012f commit ba8e845

File tree

2 files changed

+59
-4
lines changed

2 files changed

+59
-4
lines changed

ddl/column.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
845845
if job.IsRollingback() {
846846
// For those column-type-change jobs which don't reorg the data.
847847
if !needChangeColumnData(oldCol, jobParam.newCol) {
848-
return rollbackModifyColumnJob(t, tblInfo, job, oldCol, jobParam.modifyColumnTp)
848+
return rollbackModifyColumnJob(t, tblInfo, job, jobParam.newCol, oldCol, jobParam.modifyColumnTp)
849849
}
850850
// For those column-type-change jobs which reorg the data.
851851
return rollbackModifyColumnJobWithData(t, tblInfo, job, oldCol, jobParam)
@@ -1459,6 +1459,10 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind
14591459
func (w *worker) doModifyColumn(
14601460
d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
14611461
newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) {
1462+
if oldCol.ID != newCol.ID {
1463+
job.State = model.JobStateRollingback
1464+
return ver, errKeyColumnDoesNotExits.GenWithStack("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", oldCol.Name, newCol.ID)
1465+
}
14621466
// Column from null to not null.
14631467
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
14641468
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag)
@@ -1767,9 +1771,9 @@ func checkAddColumnTooManyColumns(colNum int) error {
17671771
}
17681772

17691773
// rollbackModifyColumnJob rollbacks the job when an error occurs.
1770-
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
1774+
func rollbackModifyColumnJob(t *meta.Meta, tblInfo *model.TableInfo, job *model.Job, newCol, oldCol *model.ColumnInfo, modifyColumnTp byte) (ver int64, _ error) {
17711775
var err error
1772-
if modifyColumnTp == mysql.TypeNull {
1776+
if oldCol.ID == newCol.ID && modifyColumnTp == mysql.TypeNull {
17731777
// field NotNullFlag flag reset.
17741778
tblInfo.Columns[oldCol.Offset].Flag = oldCol.Flag &^ mysql.NotNullFlag
17751779
// field PreventNullInsertFlag flag reset.

ddl/db_change_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,54 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
10591059
s.testControlParallelExecSQL(c, sql, sql, f)
10601060
}
10611061

1062+
func (s *testStateChangeSuite) TestParallelAlterModifyColumnWithData(c *C) {
1063+
sql := "ALTER TABLE t MODIFY COLUMN c int;"
1064+
f := func(c *C, err1, err2 error) {
1065+
c.Assert(err1, IsNil)
1066+
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
1067+
rs, err := s.se.Execute(context.Background(), "select * from t")
1068+
c.Assert(err, IsNil)
1069+
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
1070+
c.Assert(err, IsNil)
1071+
c.Assert(sRows[0][2], Equals, "3")
1072+
c.Assert(rs[0].Close(), IsNil)
1073+
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
1074+
c.Assert(err, IsNil)
1075+
rs, err = s.se.Execute(context.Background(), "select * from t")
1076+
c.Assert(err, IsNil)
1077+
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
1078+
c.Assert(err, IsNil)
1079+
c.Assert(sRows[1][2], Equals, "33")
1080+
c.Assert(rs[0].Close(), IsNil)
1081+
}
1082+
s.testControlParallelExecSQL(c, sql, sql, f)
1083+
}
1084+
1085+
func (s *testStateChangeSuite) TestParallelAlterModifyColumnToNotNullWithData(c *C) {
1086+
sql := "ALTER TABLE t MODIFY COLUMN c int not null;"
1087+
f := func(c *C, err1, err2 error) {
1088+
c.Assert(err1, IsNil)
1089+
c.Assert(err2.Error(), Equals, "[ddl:1072]column c id 3 does not exist, this column may have been updated by other DDL ran in parallel")
1090+
rs, err := s.se.Execute(context.Background(), "select * from t")
1091+
c.Assert(err, IsNil)
1092+
sRows, err := session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
1093+
c.Assert(err, IsNil)
1094+
c.Assert(sRows[0][2], Equals, "3")
1095+
c.Assert(rs[0].Close(), IsNil)
1096+
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, null, 44, 55)")
1097+
c.Assert(err, NotNil)
1098+
_, err = s.se.Execute(context.Background(), "insert into t values(11, 22, 33.3, 44, 55)")
1099+
c.Assert(err, IsNil)
1100+
rs, err = s.se.Execute(context.Background(), "select * from t")
1101+
c.Assert(err, IsNil)
1102+
sRows, err = session.ResultSetToStringSlice(context.Background(), s.se, rs[0])
1103+
c.Assert(err, IsNil)
1104+
c.Assert(sRows[1][2], Equals, "33")
1105+
c.Assert(rs[0].Close(), IsNil)
1106+
}
1107+
s.testControlParallelExecSQL(c, sql, sql, f)
1108+
}
1109+
10621110
func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColumn(c *C) {
10631111
sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);"
10641112
sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;"
@@ -1334,12 +1382,15 @@ func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (sess
13341382
func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
13351383
_, err := s.se.Execute(context.Background(), "use test_db_state")
13361384
c.Assert(err, IsNil)
1337-
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
1385+
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c double default null, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
13381386
c.Assert(err, IsNil)
13391387
if len(s.preSQL) != 0 {
13401388
_, err := s.se.Execute(context.Background(), s.preSQL)
13411389
c.Assert(err, IsNil)
13421390
}
1391+
_, err = s.se.Execute(context.Background(), "insert into t values(1, 2, 3.1234, 4, 5)")
1392+
c.Assert(err, IsNil)
1393+
13431394
defer func() {
13441395
_, err := s.se.Execute(context.Background(), "drop table t")
13451396
c.Assert(err, IsNil)

0 commit comments

Comments
 (0)