Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6df378f
Better index management during rollback of partitioning DDLs
mjonss Oct 22, 2024
27978e0
Linting
mjonss Oct 22, 2024
8046319
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 22, 2024
10f5f61
Reworked index handling in Reorganize Partition.
mjonss Oct 23, 2024
cc4f5a8
Fixed assertion issues.
mjonss Oct 23, 2024
ee82920
reverted non-needed changes.
mjonss Oct 23, 2024
c45e3e9
Reverted non-needed changes.
mjonss Oct 23, 2024
16d1628
Simplified index management during partitioned table initialization.
mjonss Oct 24, 2024
a45bba6
Improved test and fixed more issues.
mjonss Oct 24, 2024
f68c29d
minor simplification and fixed failpoint leading to panic
mjonss Oct 24, 2024
ff3d980
Added test for verifying cleanup.
mjonss Oct 24, 2024
91fb5f9
Disabled test due to cleanup not fixed.
mjonss Oct 24, 2024
bf5defc
Simplified new partitions index states.
mjonss Oct 24, 2024
6103faf
Skipping global index entries changes.
mjonss Oct 24, 2024
f803d7b
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 28, 2024
08fab0b
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 29, 2024
e3125e2
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 30, 2024
28f21e4
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 30, 2024
d4347d9
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 1, 2024
f21b86e
Removed limitation of global index cannot have all partitioning columns
mjonss Nov 1, 2024
03d049e
review comments addressed, simplified if statement.
mjonss Nov 4, 2024
4f1c046
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 5, 2024
a9d576e
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2186,8 +2186,8 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
indexInfo.State == model.StateWriteOnly &&
tblInfo.Partition.DDLState == model.StateWriteReorganization {
dropIndices = append(dropIndices, indexInfo)
}
}
Expand Down Expand Up @@ -3151,6 +3151,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
//
// Everything now looks as it should, no memory of old partitions/indexes,
// and no more double writing, since the previous state is only reading the new partitions/indexes.
//
// Note: Special handling is also required in tables.newPartitionedTable(),
// to get per partition indexes in the right state.
func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
Expand Down Expand Up @@ -3262,6 +3265,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if err != nil {
return ver, errors.Trace(err)
}
if tblInfo.Partition.DDLChangedIndex == nil {
tblInfo.Partition.DDLChangedIndex = make(map[int64]bool)
}
if !inAllPartitionColumns {
// Currently only support Explicit Global indexes.
if !newGlobal {
Expand All @@ -3279,6 +3285,8 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
tblInfo.Partition.DDLChangedIndex[index.ID] = false
tblInfo.Partition.DDLChangedIndex[newIndex.ID] = true
newIndex.Global = true
tblInfo.Indices = append(tblInfo.Indices, newIndex)
} else {
Expand All @@ -3291,6 +3299,8 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
tblInfo.Partition.DDLChangedIndex[index.ID] = false
tblInfo.Partition.DDLChangedIndex[newIndex.ID] = true
newIndex.Global = false
tblInfo.Indices = append(tblInfo.Indices, newIndex)
}
Expand Down Expand Up @@ -3487,26 +3497,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if !index.Unique {
continue
}
switch index.State {
case model.StateWriteReorganization:
isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID]
if !ok {
continue
}
if isNew {
// Newly created index, replacing old unique/global index
index.State = model.StatePublic
case model.StatePublic:
if index.Global {
// Mark the old global index as non-readable, and to be dropped
index.State = model.StateDeleteReorganization
} else {
inAllPartitionColumns, err := checkPartitionKeysConstraint(partInfo, index.Columns, tblInfo)
if err != nil {
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
if !inAllPartitionColumns {
// Mark the old unique index as non-readable, and to be dropped,
// since it is replaced by a global index
index.State = model.StateDeleteReorganization
}
}
continue
}
// Old index, should not be visible any longer,
// but needs to be kept up-to-date in case rollback happens.
index.State = model.StateWriteOnly
}
firstPartIdx, lastPartIdx, idMap, err2 := getReplacedPartitionIDs(partNames, tblInfo.Partition)
if err2 != nil {
Expand Down Expand Up @@ -3563,14 +3565,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique && indexInfo.State == model.StateDeleteReorganization {
if indexInfo.Unique && indexInfo.State == model.StateWriteOnly {
// Drop the old unique (possible global) index, see onDropIndex
indexInfo.State = model.StateNone
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
dropIndices = append(dropIndices, indexInfo)
}
}
// TODO: verify that the indexes are dropped,
// and that StateDeleteOnly+StateDeleteReorganization is not needed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests shows that at least the old partitions can be written (double write in case we need to rollback), so we need to add a new state to make sure all clients only use (read and write) the new partitions and new indexes, this should not be fixed in this PR, but a new PR for fixing #56819.

// local indexes is not an issue, since they will be gone with the dropped
// partitions, but replaced global indexes should be checked!
for _, indexInfo := range dropIndices {
removeIndexInfo(tblInfo, indexInfo)
}
Expand Down Expand Up @@ -3632,6 +3638,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
failpoint.Return(ver, errors.New("Injected error by reorgPartFail5"))
}
})
failpoint.Inject("updateVersionAndTableInfoErrInStateDeleteReorganization", func() {
failpoint.Return(ver, errors.New("Injected error in StateDeleteReorganization"))
})
args.OldPhysicalTblIDs = physicalTableIDs
args.NewPartitionIDs = newIDs
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
Expand Down
62 changes: 20 additions & 42 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job,
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}
Expand Down Expand Up @@ -371,51 +369,25 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
partNames = append(partNames, pd.Name.L)
}
var dropIndices []*model.IndexInfo
// When Global Index is duplicated to a non Global, we later need
// to know if if it was Global before (marked to be dropped) or not.
globalToUniqueDupMap := make(map[string]int64)
for _, indexInfo := range tblInfo.Indices {
if !indexInfo.Unique {
continue
}
switch indexInfo.State {
case model.StateWriteReorganization, model.StateDeleteOnly,
model.StateWriteOnly:
dropIndices = append(dropIndices, indexInfo)
case model.StateDeleteReorganization:
if pi.DDLState != model.StateDeleteReorganization {
continue
}
// Old index marked to be dropped, rollback by making it public again
indexInfo.State = model.StatePublic
if indexInfo.Global {
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
return ver, errors.NewNoStackErrorf("Duplicate global index names '%s', %d != %d", indexInfo.Name.O, indexInfo.ID, id)
}
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
case model.StatePublic:
if pi.DDLState != model.StateDeleteReorganization {
continue
isNew, ok := pi.DDLChangedIndex[indexInfo.ID]
if !ok {
// non-changed index
continue
}
if !isNew {
if pi.DDLState == model.StateDeleteReorganization {
// Revert the non-public state
indexInfo.State = model.StatePublic
}
// We cannot drop the index here, we need to wait until
// the next schema version
// i.e. rollback in rollbackLikeDropPartition
// New index that became public in this state,
// mark it to be dropped in next schema version
if indexInfo.Global {
indexInfo.State = model.StateDeleteReorganization
} else {
if pi.DDLState == model.StateDeleteReorganization {
indexInfo.State = model.StateWriteOnly
} else {
// How to know if this index was created as a duplicate or not?
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
// The original index
if id >= indexInfo.ID {
return ver, errors.NewNoStackErrorf("Indexes in wrong order during rollback, '%s', %d >= %d", indexInfo.Name.O, id, indexInfo.ID)
}
indexInfo.State = model.StateDeleteReorganization
} else {
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
dropIndices = append(dropIndices, indexInfo)
}
}
}
Expand Down Expand Up @@ -466,13 +438,19 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
return ver, errors.Trace(errors.New("Internal error, failed to find original partition definitions"))
}
pi.Definitions = newDefs
pi.Num = uint64(len(pi.Definitions))
} else {
// Move back to StateWriteReorganization, i.e. use the original table
// (non-partitioned or differently partitioned) as the main table to use.
// Otherwise, the Type does not match the expression.
pi.Type, pi.DDLType = pi.DDLType, pi.Type
pi.Expr, pi.DDLExpr = pi.DDLExpr, pi.Expr
pi.Columns, pi.DDLColumns = pi.DDLColumns, pi.Columns
pi.Definitions = pi.DroppingDefinitions
}
pi.Num = uint64(len(pi.Definitions))
// We should move back one state, since there might be other sessions seeing the new partitions.
job.SchemaState = model.StateWriteReorganization
pi.DDLState = job.SchemaState
}

args := jobCtx.jobArgs.(*model.TablePartitionArgs)
Expand Down
Loading