Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6df378f
Better index management during rollback of partitioning DDLs
mjonss Oct 22, 2024
27978e0
Linting
mjonss Oct 22, 2024
8046319
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 22, 2024
10f5f61
Reworked index handling in Reorganize Partition.
mjonss Oct 23, 2024
cc4f5a8
Fixed assertion issues.
mjonss Oct 23, 2024
ee82920
reverted non-needed changes.
mjonss Oct 23, 2024
c45e3e9
Reverted non-needed changes.
mjonss Oct 23, 2024
16d1628
Simplified index management during partitioned table initialization.
mjonss Oct 24, 2024
a45bba6
Improved test and fixed more issues.
mjonss Oct 24, 2024
f68c29d
minor simplification and fixed failpoint leading to panic
mjonss Oct 24, 2024
ff3d980
Added test for verifying cleanup.
mjonss Oct 24, 2024
91fb5f9
Disabled test due to cleanup not fixed.
mjonss Oct 24, 2024
bf5defc
Simplified new partitions index states.
mjonss Oct 24, 2024
6103faf
Skipping global index entries changes.
mjonss Oct 24, 2024
f803d7b
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 28, 2024
08fab0b
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 29, 2024
e3125e2
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 30, 2024
28f21e4
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Oct 30, 2024
d4347d9
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 1, 2024
f21b86e
Removed limitation of global index cannot have all partitioning columns
mjonss Nov 1, 2024
03d049e
review comments addressed, simplified if statement.
mjonss Nov 4, 2024
4f1c046
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 5, 2024
a9d576e
Merge remote-tracking branch 'pingcap/master' into reorg-part-fix-56634
mjonss Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 47 additions & 56 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,9 +2185,7 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
if indexInfo.State == model.StateWriteOnly {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only drop the write-only indexes? What about other states? For example, what will happen when we rollback from the "none -> delete-only" step?

There may be delete-only indexes before converting to rolling back job:

newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should only be either StatePublic or StateWriteOnly so only write-only indexes that should be removed. Also in the follow ups #57114 and #56974 (based on this PR) it will block the rollback here and here for the last state. One can see this PR a part 1 of 3, where the optimistic path works, then the failure/rollback/cleanup is fixed in part 2 and 3.

dropIndices = append(dropIndices, indexInfo)
}
}
Expand Down Expand Up @@ -3043,9 +3041,6 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
}

func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool {
if len(partInfo.DDLUpdateIndexes) == 0 {
return idx.Global
}
for _, newIdx := range partInfo.DDLUpdateIndexes {
if strings.EqualFold(idx.Name.L, newIdx.IndexName) {
return newIdx.Global
Expand Down Expand Up @@ -3151,6 +3146,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
//
// Everything now looks as it should, no memory of old partitions/indexes,
// and no more double writing, since the previous state is only reading the new partitions/indexes.
//
// Note: Special handling is also required in tables.newPartitionedTable(),
// to get per partition indexes in the right state.
func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
Expand Down Expand Up @@ -3262,39 +3260,33 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if err != nil {
return ver, errors.Trace(err)
}
if !inAllPartitionColumns {
// Currently only support Explicit Global indexes.
if !newGlobal {
job.State = model.JobStateCancelled
return ver, dbterror.ErrGlobalIndexNotExplicitlySet.GenWithStackByArgs(index.Name.O)
}
// Duplicate the unique indexes with new index ids.
// If previously was Global or will be Global:
// it must be recreated with new index ID
// TODO: Could we allow that session in StateWriteReorganization, when StateDeleteReorganization
// has started, may not find changes through the global index that sessions in StateDeleteReorganization made?
// If so, then we could avoid copying the full Global Index if it has not changed from LOCAL!
// It might be possible to use the new, not yet public partitions to access those rows?!
// Just that it would not work with explicit partition select SELECT FROM t PARTITION (p,...)
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
newIndex.Global = true
tblInfo.Indices = append(tblInfo.Indices, newIndex)
} else {
if newGlobal {
// TODO: For the future loosen this restriction and allow global indexes for unique keys also including all partitioning columns
return ver, dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs(fmt.Sprintf("PARTITION BY, index '%v' is unique and contains all partitioning columns, but has Global Index set", index.Name.O))
}
if index.Global {
// Index was previously Global, now it needs to be duplicated and become a local index.
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
newIndex.Global = false
tblInfo.Indices = append(tblInfo.Indices, newIndex)
}
// Currently only support Explicit Global indexes.
if !inAllPartitionColumns && !newGlobal {
job.State = model.JobStateCancelled
return ver, dbterror.ErrGlobalIndexNotExplicitlySet.GenWithStackByArgs(index.Name.O)
}
if !index.Global && !newGlobal {
// still local index, no need to duplicate index.
continue
}
if tblInfo.Partition.DDLChangedIndex == nil {
tblInfo.Partition.DDLChangedIndex = make(map[int64]bool)
}
// Duplicate the unique indexes with new index ids.
// If previously was Global or will be Global:
// it must be recreated with new index ID
// TODO: Could we allow that session in StateWriteReorganization, when StateDeleteReorganization
// has started, may not find changes through the global index that sessions in StateDeleteReorganization made?
// If so, then we could avoid copying the full Global Index if it has not changed from LOCAL!
// It might be possible to use the new, not yet public partitions to access those rows?!
// Just that it would not work with explicit partition select SELECT FROM t PARTITION (p,...)
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
tblInfo.Partition.DDLChangedIndex[index.ID] = false
tblInfo.Partition.DDLChangedIndex[newIndex.ID] = true
newIndex.Global = newGlobal
tblInfo.Indices = append(tblInfo.Indices, newIndex)
}
failpoint.Inject("reorgPartCancel1", func(val failpoint.Value) {
if val.(bool) {
Expand Down Expand Up @@ -3487,26 +3479,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if !index.Unique {
continue
}
switch index.State {
case model.StateWriteReorganization:
isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID]
if !ok {
continue
}
if isNew {
// Newly created index, replacing old unique/global index
index.State = model.StatePublic
case model.StatePublic:
if index.Global {
// Mark the old global index as non-readable, and to be dropped
index.State = model.StateDeleteReorganization
} else {
inAllPartitionColumns, err := checkPartitionKeysConstraint(partInfo, index.Columns, tblInfo)
if err != nil {
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
if !inAllPartitionColumns {
// Mark the old unique index as non-readable, and to be dropped,
// since it is replaced by a global index
index.State = model.StateDeleteReorganization
}
}
continue
}
// Old index, should not be visible any longer,
// but needs to be kept up-to-date in case rollback happens.
index.State = model.StateWriteOnly
}
firstPartIdx, lastPartIdx, idMap, err2 := getReplacedPartitionIDs(partNames, tblInfo.Partition)
if err2 != nil {
Expand Down Expand Up @@ -3563,14 +3547,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique && indexInfo.State == model.StateDeleteReorganization {
if indexInfo.Unique && indexInfo.State == model.StateWriteOnly {
// Drop the old unique (possible global) index, see onDropIndex
indexInfo.State = model.StateNone
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
dropIndices = append(dropIndices, indexInfo)
}
}
// TODO: verify that the indexes are dropped,
// and that StateDeleteOnly+StateDeleteReorganization is not needed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests shows that at least the old partitions can be written (double write in case we need to rollback), so we need to add a new state to make sure all clients only use (read and write) the new partitions and new indexes, this should not be fixed in this PR, but a new PR for fixing #56819.

// local indexes is not an issue, since they will be gone with the dropped
// partitions, but replaced global indexes should be checked!
for _, indexInfo := range dropIndices {
removeIndexInfo(tblInfo, indexInfo)
}
Expand Down Expand Up @@ -3632,6 +3620,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
failpoint.Return(ver, errors.New("Injected error by reorgPartFail5"))
}
})
failpoint.Inject("updateVersionAndTableInfoErrInStateDeleteReorganization", func() {
failpoint.Return(ver, errors.New("Injected error in StateDeleteReorganization"))
})
args.OldPhysicalTblIDs = physicalTableIDs
args.NewPartitionIDs = newIDs
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
Expand Down
62 changes: 20 additions & 42 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job,
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}
Expand Down Expand Up @@ -371,51 +369,25 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
partNames = append(partNames, pd.Name.L)
}
var dropIndices []*model.IndexInfo
// When Global Index is duplicated to a non Global, we later need
// to know if if it was Global before (marked to be dropped) or not.
globalToUniqueDupMap := make(map[string]int64)
for _, indexInfo := range tblInfo.Indices {
if !indexInfo.Unique {
continue
}
switch indexInfo.State {
case model.StateWriteReorganization, model.StateDeleteOnly,
model.StateWriteOnly:
dropIndices = append(dropIndices, indexInfo)
case model.StateDeleteReorganization:
if pi.DDLState != model.StateDeleteReorganization {
continue
}
// Old index marked to be dropped, rollback by making it public again
indexInfo.State = model.StatePublic
if indexInfo.Global {
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
return ver, errors.NewNoStackErrorf("Duplicate global index names '%s', %d != %d", indexInfo.Name.O, indexInfo.ID, id)
}
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
case model.StatePublic:
if pi.DDLState != model.StateDeleteReorganization {
continue
isNew, ok := pi.DDLChangedIndex[indexInfo.ID]
if !ok {
// non-changed index
continue
}
if !isNew {
if pi.DDLState == model.StateDeleteReorganization {
// Revert the non-public state
indexInfo.State = model.StatePublic
}
// We cannot drop the index here, we need to wait until
// the next schema version
// i.e. rollback in rollbackLikeDropPartition
// New index that became public in this state,
// mark it to be dropped in next schema version
if indexInfo.Global {
indexInfo.State = model.StateDeleteReorganization
} else {
if pi.DDLState == model.StateDeleteReorganization {
indexInfo.State = model.StateWriteOnly
} else {
// How to know if this index was created as a duplicate or not?
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
// The original index
if id >= indexInfo.ID {
return ver, errors.NewNoStackErrorf("Indexes in wrong order during rollback, '%s', %d >= %d", indexInfo.Name.O, id, indexInfo.ID)
}
indexInfo.State = model.StateDeleteReorganization
} else {
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
dropIndices = append(dropIndices, indexInfo)
}
}
}
Expand Down Expand Up @@ -466,13 +438,19 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
return ver, errors.Trace(errors.New("Internal error, failed to find original partition definitions"))
}
pi.Definitions = newDefs
pi.Num = uint64(len(pi.Definitions))
} else {
// Move back to StateWriteReorganization, i.e. use the original table
// (non-partitioned or differently partitioned) as the main table to use.
// Otherwise, the Type does not match the expression.
pi.Type, pi.DDLType = pi.DDLType, pi.Type
pi.Expr, pi.DDLExpr = pi.DDLExpr, pi.Expr
pi.Columns, pi.DDLColumns = pi.DDLColumns, pi.Columns
pi.Definitions = pi.DroppingDefinitions
}
pi.Num = uint64(len(pi.Definitions))
// We should move back one state, since there might be other sessions seeing the new partitions.
job.SchemaState = model.StateWriteReorganization
pi.DDLState = job.SchemaState
}

args := jobCtx.jobArgs.(*model.TablePartitionArgs)
Expand Down
Loading