Skip to content

Commit 99478c8

Browse files
authored
ddl: fix rollback reorganize partition left intermediate state (#51631) (#53469)
close #51090
1 parent d4ec23a commit 99478c8

File tree

7 files changed

+127
-14
lines changed

7 files changed

+127
-14
lines changed

pkg/ddl/partition.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,18 +1961,20 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
19611961
job.State = model.JobStateCancelled
19621962
return ver, err
19631963
}
1964-
if partInfo.DDLType != model.PartitionTypeNone {
1964+
// ALTER TABLE ... PARTITION BY
1965+
if partInfo.Type != model.PartitionTypeNone {
19651966
// Also remove anything with the new table id
1966-
physicalTableIDs = append(physicalTableIDs, tblInfo.Partition.NewTableID)
1967+
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
19671968
// Reset if it was normal table before
1968-
if tblInfo.Partition.Type == model.PartitionTypeNone {
1969+
if tblInfo.Partition.Type == model.PartitionTypeNone ||
1970+
tblInfo.Partition.DDLType == model.PartitionTypeNone {
19691971
tblInfo.Partition = nil
19701972
} else {
1971-
tblInfo.Partition.NewTableID = 0
1972-
tblInfo.Partition.DDLExpr = ""
1973-
tblInfo.Partition.DDLColumns = nil
1974-
tblInfo.Partition.DDLType = model.PartitionTypeNone
1973+
tblInfo.Partition.ClearReorgIntermediateInfo()
19751974
}
1975+
} else {
1976+
// REMOVE PARTITIONING
1977+
tblInfo.Partition.ClearReorgIntermediateInfo()
19761978
}
19771979

19781980
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
@@ -3017,10 +3019,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
30173019
tblInfo.Partition = nil
30183020
} else {
30193021
// ALTER TABLE ... PARTITION BY
3020-
tblInfo.Partition.DDLType = model.PartitionTypeNone
3021-
tblInfo.Partition.DDLExpr = ""
3022-
tblInfo.Partition.DDLColumns = nil
3023-
tblInfo.Partition.NewTableID = 0
3022+
tblInfo.Partition.ClearReorgIntermediateInfo()
30243023
}
30253024
err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs)
30263025
if err != nil {

pkg/ddl/rollingback.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,19 @@ func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model
339339
for _, pd := range addingDefinitions {
340340
partNames = append(partNames, pd.Name.L)
341341
}
342-
job.Args = []interface{}{partNames}
342+
if job.Type == model.ActionReorganizePartition ||
343+
job.Type == model.ActionAlterTablePartitioning ||
344+
job.Type == model.ActionRemovePartitioning {
345+
partInfo := &model.PartitionInfo{}
346+
var pNames []string
347+
err = job.DecodeArgs(&pNames, &partInfo)
348+
if err != nil {
349+
return ver, err
350+
}
351+
job.Args = []any{partNames, partInfo}
352+
} else {
353+
job.Args = []any{partNames}
354+
}
343355
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
344356
if err != nil {
345357
return ver, errors.Trace(err)

pkg/parser/model/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ go_test(
3030
],
3131
embed = [":model"],
3232
flaky = True,
33-
shard_count = 20,
33+
shard_count = 21,
3434
deps = [
3535
"//pkg/parser/charset",
3636
"//pkg/parser/mysql",

pkg/parser/model/model.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,14 @@ func (pi *PartitionInfo) HasTruncatingPartitionID(pid int64) bool {
13081308
return false
13091309
}
13101310

1311+
// ClearReorgIntermediateInfo remove intermediate information used during reorganize partition.
1312+
func (pi *PartitionInfo) ClearReorgIntermediateInfo() {
1313+
pi.DDLType = PartitionTypeNone
1314+
pi.DDLExpr = ""
1315+
pi.DDLColumns = nil
1316+
pi.NewTableID = 0
1317+
}
1318+
13111319
// PartitionState is the state of the partition.
13121320
type PartitionState struct {
13131321
ID int64 `json:"id"`

pkg/parser/model/model_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,3 +818,16 @@ func TestTTLJobInterval(t *testing.T) {
818818
require.NoError(t, err)
819819
require.Equal(t, time.Hour*200, interval)
820820
}
821+
822+
func TestClearReorgIntermediateInfo(t *testing.T) {
823+
ptInfo := &PartitionInfo{}
824+
ptInfo.DDLType = PartitionTypeHash
825+
ptInfo.DDLExpr = "Test DDL Expr"
826+
ptInfo.NewTableID = 1111
827+
828+
ptInfo.ClearReorgIntermediateInfo()
829+
require.Equal(t, PartitionTypeNone, ptInfo.DDLType)
830+
require.Equal(t, "", ptInfo.DDLExpr)
831+
require.Equal(t, true, ptInfo.DDLColumns == nil)
832+
require.Equal(t, int64(0), ptInfo.NewTableID)
833+
}

pkg/table/tables/test/partition/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ go_test(
88
"partition_test.go",
99
],
1010
flaky = True,
11-
shard_count = 18,
11+
shard_count = 19,
1212
deps = [
1313
"//pkg/ddl",
1414
"//pkg/domain",

pkg/table/tables/test/partition/partition_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3057,3 +3057,84 @@ func TestPointGetKeyPartitioning(t *testing.T) {
30573057
tk.MustExec(`INSERT INTO t VALUES ('Aa', 'Ab', 'Ac'), ('Ba', 'Bb', 'Bc')`)
30583058
tk.MustQuery(`SELECT * FROM t WHERE b = 'Ab'`).Check(testkit.Rows("Aa Ab Ac"))
30593059
}
3060+
3061+
// Issue TiDB #51090.
3062+
func TestAlterTablePartitionRollback(t *testing.T) {
3063+
store := testkit.CreateMockStore(t)
3064+
3065+
tk := testkit.NewTestKit(t, store)
3066+
tk2 := testkit.NewTestKit(t, store)
3067+
tk3 := testkit.NewTestKit(t, store)
3068+
tk4 := testkit.NewTestKit(t, store)
3069+
tk5 := testkit.NewTestKit(t, store)
3070+
tk.MustExec(`use test;`)
3071+
tk2.MustExec(`use test;`)
3072+
tk3.MustExec(`use test;`)
3073+
tk4.MustExec(`use test;`)
3074+
tk5.MustExec(`use test;`)
3075+
tk.MustExec(`create table t(a int);`)
3076+
tk.MustExec(`insert into t values(1), (2), (3);`)
3077+
3078+
alterChan := make(chan error)
3079+
alterPartition := func() {
3080+
err := tk4.ExecToErr(`alter table t partition by hash(a) partitions 3;`)
3081+
alterChan <- err
3082+
}
3083+
waitFor := func(s string) {
3084+
for {
3085+
select {
3086+
case alterErr := <-alterChan:
3087+
require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr)
3088+
default:
3089+
// Alter still running
3090+
}
3091+
res := tk5.MustQuery(`admin show ddl jobs where db_name = 'test' and table_name = 't' and job_type = 'alter table partition by'`).Rows()
3092+
if len(res) > 0 && res[0][4] == s {
3093+
logutil.BgLogger().Info("Got state", zap.String("State", s))
3094+
break
3095+
}
3096+
gotime.Sleep(10 * gotime.Millisecond)
3097+
}
3098+
dom := domain.GetDomain(tk5.Session())
3099+
// Make sure the table schema is the new schema.
3100+
require.NoError(t, dom.Reload())
3101+
}
3102+
3103+
testFunc := func(states []string) {
3104+
for i, s := range states {
3105+
if i%2 == 0 {
3106+
tk2.MustExec(`begin;`)
3107+
tk2.MustExec(`select 1 from t;`)
3108+
if i > 0 {
3109+
tk3.MustExec(`commit;`)
3110+
}
3111+
} else {
3112+
tk3.MustExec(`begin;`)
3113+
tk3.MustExec(`select 1 from t;`)
3114+
tk2.MustExec(`commit;`)
3115+
}
3116+
if i == 0 {
3117+
go alterPartition()
3118+
}
3119+
waitFor(s)
3120+
if i == len(states)-1 {
3121+
break
3122+
}
3123+
}
3124+
res := tk.MustQuery(`admin show ddl jobs where table_name = 't' and job_type = 'alter table partition by'`).Rows()
3125+
tk.MustExec(fmt.Sprintf("admin cancel ddl jobs %v", res[0][0]))
3126+
tk2.MustExec(`commit;`)
3127+
tk3.MustExec(`commit;`)
3128+
require.ErrorContains(t, <-alterChan, "[ddl:8214]Cancelled DDL job")
3129+
tk.MustQuery(`show create table t;`).Check(testkit.Rows(
3130+
"t CREATE TABLE `t` (\n" +
3131+
" `a` int(11) DEFAULT NULL\n" +
3132+
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
3133+
tk.MustQuery(`select a from t order by a;`).Check(testkit.Rows("1", "2", "3"))
3134+
}
3135+
3136+
states := []string{"delete only", "write only", "write reorganization", "delete reorganization"}
3137+
for i := range states {
3138+
testFunc(states[:i+1])
3139+
}
3140+
}

0 commit comments

Comments
 (0)