Skip to content

Commit 2125737

Browse files
authored
*: Reorganize partition one extra state (#56974)
ref #45133, close #57523
1 parent 138386c commit 2125737

File tree

8 files changed

+251
-141
lines changed

8 files changed

+251
-141
lines changed

pkg/ddl/partition.go

Lines changed: 68 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3017,36 +3017,29 @@ func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool {
30173017
return idx.Global
30183018
}
30193019

3020-
func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, []model.PartitionDefinition, []model.PartitionDefinition, error) {
3020+
func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePartitionArgs) (*model.TableInfo, []string, *model.PartitionInfo, error) {
30213021
schemaID := job.SchemaID
30223022
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
30233023
if err != nil {
3024-
return nil, nil, nil, nil, nil, errors.Trace(err)
3024+
return nil, nil, nil, errors.Trace(err)
30253025
}
30263026
partNames, partInfo := args.PartNames, args.PartInfo
3027-
var addingDefs, droppingDefs []model.PartitionDefinition
3028-
if tblInfo.Partition != nil {
3029-
addingDefs = tblInfo.Partition.AddingDefinitions
3030-
droppingDefs = tblInfo.Partition.DroppingDefinitions
3031-
tblInfo.Partition.NewTableID = partInfo.NewTableID
3032-
tblInfo.Partition.DDLType = partInfo.Type
3033-
tblInfo.Partition.DDLExpr = partInfo.Expr
3034-
tblInfo.Partition.DDLColumns = partInfo.Columns
3035-
} else {
3036-
tblInfo.Partition = getPartitionInfoTypeNone()
3037-
tblInfo.Partition.NewTableID = partInfo.NewTableID
3038-
tblInfo.Partition.Definitions[0].ID = tblInfo.ID
3039-
tblInfo.Partition.DDLType = partInfo.Type
3040-
tblInfo.Partition.DDLExpr = partInfo.Expr
3041-
tblInfo.Partition.DDLColumns = partInfo.Columns
3042-
}
3043-
if len(addingDefs) == 0 {
3044-
addingDefs = []model.PartitionDefinition{}
3045-
}
3046-
if len(droppingDefs) == 0 {
3047-
droppingDefs = []model.PartitionDefinition{}
3027+
if job.SchemaState == model.StateNone {
3028+
if tblInfo.Partition != nil {
3029+
tblInfo.Partition.NewTableID = partInfo.NewTableID
3030+
tblInfo.Partition.DDLType = partInfo.Type
3031+
tblInfo.Partition.DDLExpr = partInfo.Expr
3032+
tblInfo.Partition.DDLColumns = partInfo.Columns
3033+
} else {
3034+
tblInfo.Partition = getPartitionInfoTypeNone()
3035+
tblInfo.Partition.NewTableID = partInfo.NewTableID
3036+
tblInfo.Partition.Definitions[0].ID = tblInfo.ID
3037+
tblInfo.Partition.DDLType = partInfo.Type
3038+
tblInfo.Partition.DDLExpr = partInfo.Expr
3039+
tblInfo.Partition.DDLColumns = partInfo.Columns
3040+
}
30483041
}
3049-
return tblInfo, partNames, partInfo, droppingDefs, addingDefs, nil
3042+
return tblInfo, partNames, partInfo, nil
30503043
}
30513044

30523045
// onReorganizePartition reorganized the partitioning of a table including its indexes.
@@ -3068,8 +3061,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
30683061
//
30693062
// job.SchemaState goes through the following SchemaState(s):
30703063
// StateNone -> StateDeleteOnly -> StateWriteOnly -> StateWriteReorganization
3071-
// -> StateDeleteOrganization -> StatePublic
3064+
// -> StateDeleteOrganization -> StatePublic -> Done
30723065
// There are more details embedded in the implementation, but the high level changes are:
3066+
//
30733067
// StateNone -> StateDeleteOnly:
30743068
//
30753069
// Various checks and validations.
@@ -3095,13 +3089,20 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
30953089
// and if new unique indexes are added, it also updates them with the rest of data from
30963090
// the non-touched partitions.
30973091
// For indexes that are to be replaced with new ones (old/new global index),
3098-
// mark the old indexes as StateDeleteReorganization and new ones as StatePublic
3092+
// mark the old indexes as StateWriteOnly and new ones as StatePublic
30993093
// Finally make the table visible with the new partition definitions.
31003094
// I.e. in this state clients will read from the old set of partitions,
3101-
// and will read the new set of partitions in StateDeleteReorganization.
3095+
// and next state will read the new set of partitions in StateDeleteReorganization.
31023096
//
31033097
// StateDeleteOrganization -> StatePublic:
31043098
//
3099+
// Now we mark all replaced (old) indexes as StateDeleteOnly
3100+
// in case DeleteRange would be called directly after the DDL,
3101+
// this way there will be no orphan records inserted after DeleteRanges
3102+
// has cleaned up the old partitions and old global indexes.
3103+
//
3104+
// StatePublic -> Done:
3105+
//
31053106
// Now all heavy lifting is done, and we just need to finalize and drop things, while still doing
31063107
// double writes, since previous state sees the old partitions/indexes.
31073108
// Remove the old indexes and old partitions from the TableInfo.
@@ -3110,10 +3111,10 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
31103111
// if ALTER TABLE t PARTITION BY/REMOVE PARTITIONING:
31113112
// Recreate the table with the new TableID, by DropTableOrView+CreateTableOrView
31123113
//
3113-
// StatePublic:
3114+
// Done:
31143115
//
31153116
// Everything now looks as it should, no memory of old partitions/indexes,
3116-
// and no more double writing, since the previous state is only reading the new partitions/indexes.
3117+
// and no more double writing, since the previous state is only using the new partitions/indexes.
31173118
//
31183119
// Note: Special handling is also required in tables.newPartitionedTable(),
31193120
// to get per partition indexes in the right state.
@@ -3133,7 +3134,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
31333134
return ver, nil
31343135
}
31353136

3136-
tblInfo, partNames, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
3137+
tblInfo, partNames, partInfo, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
31373138
if err != nil {
31383139
return ver, err
31393140
}
@@ -3362,7 +3363,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
33623363
// For available state, the new added partition should wait its replica to
33633364
// be finished, otherwise the query to this partition will be blocked.
33643365
count := tblInfo.TiFlashReplica.Count
3365-
needRetry, err := checkPartitionReplica(count, addingDefinitions, jobCtx)
3366+
needRetry, err := checkPartitionReplica(count, tblInfo.Partition.AddingDefinitions, jobCtx)
33663367
if err != nil {
33673368
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
33683369
}
@@ -3376,7 +3377,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
33763377

33773378
// When TiFlash Replica is ready, we must move them into `AvailablePartitionIDs`.
33783379
// Since onUpdateFlashReplicaStatus cannot see the partitions yet (not public)
3379-
for _, d := range addingDefinitions {
3380+
for _, d := range tblInfo.Partition.AddingDefinitions {
33803381
tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, d.ID)
33813382
}
33823383
}
@@ -3491,6 +3492,37 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
34913492
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
34923493

34933494
case model.StateDeleteReorganization:
3495+
// Need to have one more state before completing, due to:
3496+
// - DeleteRanges could possibly start directly after DDL causing
3497+
// inserts during previous state (DeleteReorg) could insert after the cleanup
3498+
// leaving data in dropped partitions/indexes that will not be cleaned up again.
3499+
// - Updates in previous state (DeleteReorg) could have duplicate errors, if the row
3500+
// was deleted or updated in after finish (so here we need to have DeleteOnly index state!
3501+
// And we cannot rollback in this state!
3502+
3503+
// Stop double writing to the indexes, only do Deletes!
3504+
// so that previous could do inserts, we do delete and allow second insert for
3505+
// previous state clients!
3506+
for _, index := range tblInfo.Indices {
3507+
isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID]
3508+
if !ok || isNew {
3509+
continue
3510+
}
3511+
// Old index, should not be visible any longer,
3512+
// but needs to be deleted, in case previous state clients inserts.
3513+
index.State = model.StateDeleteOnly
3514+
}
3515+
failpoint.Inject("reorgPartFail3", func(val failpoint.Value) {
3516+
if val.(bool) {
3517+
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
3518+
failpoint.Return(ver, errors.New("Injected error by reorgPartFail3"))
3519+
}
3520+
})
3521+
job.SchemaState = model.StatePublic
3522+
tblInfo.Partition.DDLState = job.SchemaState
3523+
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
3524+
3525+
case model.StatePublic:
34943526
// Drop the droppingDefinitions and finish the DDL
34953527
// This state is needed for the case where client A sees the schema
34963528
// with version of StateWriteReorg and would not see updates of
@@ -3515,7 +3547,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
35153547

35163548
var dropIndices []*model.IndexInfo
35173549
for _, indexInfo := range tblInfo.Indices {
3518-
if indexInfo.Unique && indexInfo.State == model.StateWriteOnly {
3550+
if indexInfo.Unique && indexInfo.State == model.StateDeleteOnly {
35193551
// Drop the old unique (possible global) index, see onDropIndex
35203552
indexInfo.State = model.StateNone
35213553
DropIndexColumnFlag(tblInfo, indexInfo)
@@ -3530,10 +3562,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
35303562
for _, indexInfo := range dropIndices {
35313563
removeIndexInfo(tblInfo, indexInfo)
35323564
}
3533-
failpoint.Inject("reorgPartFail3", func(val failpoint.Value) {
3565+
failpoint.Inject("reorgPartFail4", func(val failpoint.Value) {
35343566
if val.(bool) {
35353567
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
3536-
failpoint.Return(ver, errors.New("Injected error by reorgPartFail3"))
3568+
failpoint.Return(ver, errors.New("Injected error by reorgPartFail4"))
35373569
}
35383570
})
35393571
var oldTblID int64
@@ -3567,12 +3599,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
35673599
// ALTER TABLE ... PARTITION BY
35683600
tblInfo.Partition.ClearReorgIntermediateInfo()
35693601
}
3570-
failpoint.Inject("reorgPartFail4", func(val failpoint.Value) {
3571-
if val.(bool) {
3572-
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
3573-
failpoint.Return(ver, errors.New("Injected error by reorgPartFail4"))
3574-
}
3575-
})
35763602
err = metaMut.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs)
35773603
if err != nil {
35783604
return ver, errors.Trace(err)
@@ -3593,6 +3619,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
35933619
})
35943620
args.OldPhysicalTblIDs = physicalTableIDs
35953621
args.NewPartitionIDs = newIDs
3622+
job.SchemaState = model.StateNone
35963623
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
35973624
if err != nil {
35983625
return ver, errors.Trace(err)

pkg/ddl/rollingback.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,11 @@ func onRollbackReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int6
376376
job.State = model.JobStateCancelled
377377
return ver, errors.Trace(err)
378378
}
379+
if job.SchemaState == model.StatePublic {
380+
// We started to destroy the old indexes, so we can no longer rollback!
381+
job.State = model.JobStateRunning
382+
return ver, nil
383+
}
379384
jobCtx.jobArgs = args
380385

381386
return rollbackReorganizePartitionWithErr(jobCtx, job, dbterror.ErrCancelledDDLJob)

pkg/ddl/schema_version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job,
221221
func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
222222
diff.TableID = job.TableID
223223
diff.OldTableID = job.TableID
224-
if job.SchemaState == model.StateDeleteReorganization {
224+
if job.SchemaState == model.StateNone {
225225
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
226226
partInfo := args.PartInfo
227227
// Final part, new table id is assigned

pkg/ddl/tests/partition/multi_domain_test.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -336,12 +336,25 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
336336
tkO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID))
337337
tkNO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID)))
338338

339-
logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID))
339+
logutil.BgLogger().Info("inserting rows", zap.Int("testID", testID), zap.String("state", schemaState))
340340

341341
testID++
342342
tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,%d)`+dbgStr, testID, testID))
343343
tkO.MustQuery(fmt.Sprintf(`select * from t where b = "%d"`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID)))
344344

345+
// Test for Index, specially between WriteOnly and DeleteOnly, but better to test all states.
346+
// if tkNO (DeleteOnly) updates a row, the new index should be deleted, but not inserted.
347+
// It will be inserted by backfill in WriteReorganize.
348+
// If not deleted, then there would be an orphan entry in the index!
349+
tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+100, testID))
350+
tkNO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100)))
351+
tkNO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+100)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID, testID+100)))
352+
tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID+99, testID-1))
353+
tkO.MustQuery(fmt.Sprintf(`select a, b from t where a = %d`+dbgStr, testID-1)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99)))
354+
tkO.MustQuery(fmt.Sprintf(`select a, b from t where b = "%d"`+dbgStr, testID+99)).Check(testkit.Rows(fmt.Sprintf("%d %d", testID-1, testID+99)))
355+
tkNO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID, testID))
356+
tkO.MustExec(fmt.Sprintf(`update t set b = %d where a = %d`+dbgStr, testID-1, testID-1))
357+
345358
switch schemaState {
346359
case model.StateDeleteOnly.String():
347360
// tkNO sees original table/partitions as before the DDL stated
@@ -387,16 +400,19 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
387400
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
388401
" PARTITION `p1` VALUES LESS THAN (200),\n" +
389402
" PARTITION `pMax` VALUES LESS THAN (MAXVALUE))"))
403+
case model.StatePublic.String():
404+
// not tested, both tkO and tkNO sees same partitions
390405
case model.StateNone.String():
406+
// not tested, both tkO and tkNO sees same partitions
391407
default:
392-
require.Failf(t, "unhandled schema state '%s'", schemaState)
408+
require.Failf(t, "unhandled schema state", "State '%s'", schemaState)
393409
}
394410
}
395411
postFn := func(tkO *testkit.TestKit, store kv.Storage) {
396412
tkO.MustQuery(`select * from t where b = 5`).Sort().Check(testkit.Rows("5 5"))
397413
tkO.MustQuery(`select * from t where b = "5"`).Sort().Check(testkit.Rows("5 5"))
398414
tkO.MustExec(`admin check table t`)
399-
tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "998 998", "999 999"))
415+
tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "10 10", "101 101", "102 102", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "2 2", "5 5", "6 6", "7 7", "8 8", "9 9", "984 984", "985 985", "986 986", "987 987", "988 988", "989 989", "990 990", "991 991", "992 992", "993 993", "994 994", "995 995", "998 998", "999 999"))
400416
// TODO: Verify that there are no KV entries for old partitions or old indexes!!!
401417
delRange := tkO.MustQuery(`select * from mysql.gc_delete_range_done`).Rows()
402418
s := ""
@@ -441,17 +457,16 @@ func TestMultiSchemaReorganizePartition(t *testing.T) {
441457
require.False(t, HaveEntriesForTableIndex(t, tkO, part.ID, locIdx), "Local index id %d for partition id %d has still entries!", locIdx, tableID)
442458
}
443459
}
444-
// TODO: Fix cleanup issues, most likely it needs one more SchemaState in onReorganizePartition
445-
//PartitionLoop:
446-
// for _, partID := range originalPartitions {
447-
// for _, def := range tbl.Meta().Partition.Definitions {
448-
// if def.ID == partID {
449-
// continue PartitionLoop
450-
// }
451-
// }
452-
// // old partitions removed
453-
// require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID)
454-
// }
460+
PartitionLoop:
461+
for _, partID := range originalPartitions {
462+
for _, def := range tbl.Meta().Partition.Definitions {
463+
if def.ID == partID {
464+
continue PartitionLoop
465+
}
466+
}
467+
// old partitions removed
468+
require.False(t, HaveEntriesForTableIndex(t, tkO, partID, 0), "Reorganized partition id %d for table id %d has still entries!", partID, tableID)
469+
}
455470
}
456471
runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn)
457472
}

0 commit comments

Comments
 (0)