Skip to content

Commit f07d030

Browse files
authored
*: Fix for TRUNCATE PARTITION and Global Index (#57724)
close #57721
1 parent 793e77c commit f07d030

20 files changed

+1119
-315
lines changed

pkg/ddl/partition.go

Lines changed: 224 additions & 256 deletions
Large diffs are not rendered by default.

pkg/ddl/placement_policy_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,6 +1956,7 @@ func TestTruncateTablePartitionWithPlacement(t *testing.T) {
19561956
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
19571957
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PLACEMENT POLICY=`p3` */,\n" +
19581958
" PARTITION `p3` VALUES LESS THAN (100000))"))
1959+
dom.Reload()
19591960
checkExistTableBundlesInPD(t, dom, "test", "tp")
19601961
checkWaitingGCPartitionBundlesInPD(t, dom, checkOldPartitions)
19611962

pkg/ddl/reorg.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -932,8 +932,8 @@ func getReorgInfo(ctx *ReorgContext, jobCtx *jobContext, rh *reorgHandler, job *
932932
zap.String("startKey", hex.EncodeToString(start)),
933933
zap.String("endKey", hex.EncodeToString(end)))
934934

935-
failpoint.Inject("errorUpdateReorgHandle", func() (*reorgInfo, error) {
936-
return &info, errors.New("occur an error when update reorg handle")
935+
failpoint.Inject("errorUpdateReorgHandle", func() {
936+
failpoint.Return(&info, errors.New("occur an error when update reorg handle"))
937937
})
938938
err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0])
939939
if err != nil {

pkg/ddl/rollingback.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,41 @@ func rollingbackExchangeTablePartition(jobCtx *jobContext, job *model.Job) (ver
316316
return ver, errors.Trace(err)
317317
}
318318

319+
func rollingbackTruncateTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
320+
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
321+
if err != nil {
322+
return ver, errors.Trace(err)
323+
}
324+
325+
return convertTruncateTablePartitionJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob, tblInfo)
326+
}
327+
328+
func convertTruncateTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) {
329+
if !job.IsRollbackable() {
330+
// Only Original state and StateWrite can be rolled back, otherwise new partitions
331+
// may have been used and new data would get lost.
332+
// So we must continue to roll forward!
333+
job.State = model.JobStateRunning
334+
return ver, nil
335+
}
336+
pi := tblInfo.Partition
337+
if len(pi.NewPartitionIDs) != 0 || pi.DDLAction != model.ActionNone || pi.DDLState != model.StateNone {
338+
// Rollback the changes, note that no new partitions has been used yet!
339+
// so only metadata rollback and we can cancel the DDL
340+
tblInfo.Partition.NewPartitionIDs = nil
341+
tblInfo.Partition.DDLAction = model.ActionNone
342+
tblInfo.Partition.DDLState = model.StateNone
343+
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
344+
if err != nil {
345+
return ver, errors.Trace(err)
346+
}
347+
return ver, nil
348+
}
349+
// No change yet, just cancel the job.
350+
job.State = model.JobStateCancelled
351+
return ver, errors.Trace(otherwiseErr)
352+
}
353+
319354
func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) {
320355
addingDefinitions := tblInfo.Partition.AddingDefinitions
321356
partNames := make([]string, 0, len(addingDefinitions))
@@ -590,8 +625,10 @@ func convertJob2RollbackJob(w *worker, jobCtx *jobContext, job *model.Job) (ver
590625
ver, err = rollingbackTruncateTable(jobCtx, job)
591626
case model.ActionModifyColumn:
592627
ver, err = rollingbackModifyColumn(jobCtx, job)
593-
case model.ActionDropForeignKey, model.ActionTruncateTablePartition:
628+
case model.ActionDropForeignKey:
594629
ver, err = cancelOnlyNotHandledJob(job, model.StatePublic)
630+
case model.ActionTruncateTablePartition:
631+
ver, err = rollingbackTruncateTablePartition(jobCtx, job)
595632
case model.ActionRebaseAutoID, model.ActionShardRowID, model.ActionAddForeignKey,
596633
model.ActionRenameTable, model.ActionRenameTables,
597634
model.ActionModifyTableCharsetAndCollate,

pkg/ddl/tests/partition/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_test(
55
timeout = "short",
66
srcs = [
77
"db_partition_test.go",
8+
"error_injection_test.go",
89
"main_test.go",
910
"multi_domain_test.go",
1011
"placement_test.go",

pkg/ddl/tests/partition/db_partition_test.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,6 +1429,15 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
14291429
time.Sleep(10 * time.Millisecond)
14301430
}
14311431
}
1432+
waitFor(4, "write only")
1433+
tkTmp := testkit.NewTestKit(t, store)
1434+
tkTmp.MustExec(`begin`)
1435+
tkTmp.MustExec("use test")
1436+
tkTmp.MustQuery(`select count(*) from test_global`).Check(testkit.Rows("5"))
1437+
tk2.MustExec(`rollback`)
1438+
tk2.MustExec(`begin`)
1439+
tk2.MustExec(`insert into test_global values (5,5,5)`)
1440+
tkTmp.MustExec(`rollback`)
14321441
waitFor(4, "delete only")
14331442
tk3 := testkit.NewTestKit(t, store)
14341443
tk3.MustExec(`begin`)
@@ -1437,16 +1446,21 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
14371446
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get")
14381447
tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
14391448
tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
1440-
// Here it will fail with
1441-
// the partition is not in public.
14421449
err := tk3.ExecToErr(`insert into test_global values (15,15,15)`)
14431450
require.Error(t, err)
1444-
require.ErrorContains(t, err, "the partition is in not in public")
1451+
require.ErrorContains(t, err, "[kv:1062]Duplicate entry '15' for key 'test_global.idx_b'")
14451452
tk2.MustExec(`commit`)
1453+
waitFor(4, "delete reorganization")
1454+
tk2.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
1455+
tk2.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
1456+
err = tk2.ExecToErr(`insert into test_global values (15,15,15)`)
1457+
require.NoError(t, err)
1458+
tk2.MustExec(`begin`)
14461459
tk3.MustExec(`commit`)
1460+
tk.MustExec(`commit`)
14471461
<-syncChan
14481462
result := tk.MustQuery("select * from test_global;")
1449-
result.Sort().Check(testkit.Rows(`1 1 1`, `2 2 2`, `5 5 5`))
1463+
result.Sort().Check(testkit.Rows(`1 1 1`, `15 15 15`, `2 2 2`, `5 5 5`))
14501464

14511465
tt = external.GetTableByName(t, tk, "test", "test_global")
14521466
idxInfo := tt.Meta().FindIndexByName("idx_b")
@@ -1487,12 +1501,12 @@ func TestGlobalIndexUpdateInTruncatePartition(t *testing.T) {
14871501
tk1 := testkit.NewTestKit(t, store)
14881502
tk1.MustExec("use test")
14891503
err := tk1.ExecToErr("update test_global set a = 2 where a = 11")
1490-
assert.NotNil(t, err)
1504+
assert.NoError(t, err)
14911505
}
14921506
})
14931507

14941508
tk.MustExec("alter table test_global truncate partition p1")
1495-
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("11 11 11", "12 12 12"))
1509+
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("2 11 11", "12 12 12"))
14961510
}
14971511

14981512
func TestGlobalIndexUpdateInTruncatePartition4Hash(t *testing.T) {
@@ -1515,7 +1529,7 @@ func TestGlobalIndexUpdateInTruncatePartition4Hash(t *testing.T) {
15151529
tk1 := testkit.NewTestKit(t, store)
15161530
tk1.MustExec("use test")
15171531
err = tk1.ExecToErr("update test_global set a = 1 where a = 12")
1518-
assert.NotNil(t, err)
1532+
assert.NoError(t, err)
15191533
}
15201534
})
15211535

@@ -1577,7 +1591,7 @@ func TestGlobalIndexInsertInTruncatePartition(t *testing.T) {
15771591
tk1 := testkit.NewTestKit(t, store)
15781592
tk1.MustExec("use test")
15791593
err = tk1.ExecToErr("insert into test_global values(2, 2, 2)")
1580-
assert.NotNil(t, err)
1594+
assert.NoError(t, err)
15811595
}
15821596
})
15831597

@@ -3168,6 +3182,8 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
31683182
tk2.MustExec(`COMMIT`)
31693183

31703184
/*
3185+
// Currently there is an duplicate entry issue, so it will rollback in WriteReorganization
3186+
// instead of continuing.
31713187
waitFor(4, "t", "delete reorganization")
31723188
tk2.MustExec(`BEGIN`)
31733189
tk2.MustExec(`insert into t values (null, 24)`)
@@ -3655,3 +3671,26 @@ func checkGlobalAndPK(t *testing.T, tk *testkit.TestKit, name string, indexes in
36553671
require.True(t, idxInfo.Primary)
36563672
}
36573673
}
3674+
3675+
func TestTruncateNumberOfPhases(t *testing.T) {
3676+
store := testkit.CreateMockStore(t)
3677+
tk := testkit.NewTestKit(t, store)
3678+
tk.MustExec("use test")
3679+
tk.MustExec(`create table t (a int primary key , b varchar(255)) partition by hash(a) partitions 3`)
3680+
ctx := tk.Session()
3681+
dom := domain.GetDomain(ctx)
3682+
dom.Reload()
3683+
schemaVersion := dom.InfoSchema().SchemaMetaVersion()
3684+
tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`)
3685+
tk.MustExec(`alter table t truncate partition p1`)
3686+
dom.Reload()
3687+
// Without global index, truncate partition could be a single state change
3688+
require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion)
3689+
tk.MustExec(`drop table t`)
3690+
tk.MustExec(`create table t (a int primary key , b varchar(255), unique key (b) global) partition by hash(a) partitions 3`)
3691+
schemaVersion = dom.InfoSchema().SchemaMetaVersion()
3692+
tk.MustExec(`insert into t values (1,1),(2,2),(3,3)`)
3693+
tk.MustExec(`alter table t truncate partition p1`)
3694+
dom.Reload()
3695+
require.Equal(t, int64(4), dom.InfoSchema().SchemaMetaVersion()-schemaVersion)
3696+
}

0 commit comments

Comments
 (0)