Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 47 additions & 56 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,9 +2185,7 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique &&
indexInfo.State == model.StateDeleteReorganization &&
tblInfo.Partition.DDLState == model.StateDeleteReorganization {
if indexInfo.State == model.StateWriteOnly {
dropIndices = append(dropIndices, indexInfo)
}
}
Expand Down Expand Up @@ -3043,9 +3041,6 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
}

func getNewGlobal(partInfo *model.PartitionInfo, idx *model.IndexInfo) bool {
if len(partInfo.DDLUpdateIndexes) == 0 {
return idx.Global
}
for _, newIdx := range partInfo.DDLUpdateIndexes {
if strings.EqualFold(idx.Name.L, newIdx.IndexName) {
return newIdx.Global
Expand Down Expand Up @@ -3151,6 +3146,9 @@ func getReorgPartitionInfo(t *meta.Mutator, job *model.Job, args *model.TablePar
//
// Everything now looks as it should, no memory of old partitions/indexes,
// and no more double writing, since the previous state is only reading the new partitions/indexes.
//
// Note: Special handling is also required in tables.newPartitionedTable(),
// to get per partition indexes in the right state.
func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args, err := model.GetTablePartitionArgs(job)
if err != nil {
Expand Down Expand Up @@ -3262,39 +3260,33 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if err != nil {
return ver, errors.Trace(err)
}
if !inAllPartitionColumns {
// Currently only support Explicit Global indexes.
if !newGlobal {
job.State = model.JobStateCancelled
return ver, dbterror.ErrGlobalIndexNotExplicitlySet.GenWithStackByArgs(index.Name.O)
}
// Duplicate the unique indexes with new index ids.
// If previously was Global or will be Global:
// it must be recreated with new index ID
// TODO: Could we allow that session in StateWriteReorganization, when StateDeleteReorganization
// has started, may not find changes through the global index that sessions in StateDeleteReorganization made?
// If so, then we could avoid copying the full Global Index if it has not changed from LOCAL!
// It might be possible to use the new, not yet public partitions to access those rows?!
// Just that it would not work with explicit partition select SELECT FROM t PARTITION (p,...)
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
newIndex.Global = true
tblInfo.Indices = append(tblInfo.Indices, newIndex)
} else {
if newGlobal {
// TODO: For the future loosen this restriction and allow global indexes for unique keys also including all partitioning columns
return ver, dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs(fmt.Sprintf("PARTITION BY, index '%v' is unique and contains all partitioning columns, but has Global Index set", index.Name.O))
}
if index.Global {
// Index was previously Global, now it needs to be duplicated and become a local index.
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
newIndex.Global = false
tblInfo.Indices = append(tblInfo.Indices, newIndex)
}
// Currently only support Explicit Global indexes.
if !inAllPartitionColumns && !newGlobal {
job.State = model.JobStateCancelled
return ver, dbterror.ErrGlobalIndexNotExplicitlySet.GenWithStackByArgs(index.Name.O)
}
if !index.Global && !newGlobal {
// still local index, no need to duplicate index.
continue
}
if tblInfo.Partition.DDLChangedIndex == nil {
tblInfo.Partition.DDLChangedIndex = make(map[int64]bool)
}
// Duplicate the unique indexes with new index ids.
// If previously was Global or will be Global:
// it must be recreated with new index ID
// TODO: Could we allow that session in StateWriteReorganization, when StateDeleteReorganization
// has started, may not find changes through the global index that sessions in StateDeleteReorganization made?
// If so, then we could avoid copying the full Global Index if it has not changed from LOCAL!
// It might be possible to use the new, not yet public partitions to access those rows?!
// Just that it would not work with explicit partition select SELECT FROM t PARTITION (p,...)
newIndex := index.Clone()
newIndex.State = model.StateDeleteOnly
newIndex.ID = AllocateIndexID(tblInfo)
tblInfo.Partition.DDLChangedIndex[index.ID] = false
tblInfo.Partition.DDLChangedIndex[newIndex.ID] = true
newIndex.Global = newGlobal
tblInfo.Indices = append(tblInfo.Indices, newIndex)
}
failpoint.Inject("reorgPartCancel1", func(val failpoint.Value) {
if val.(bool) {
Expand Down Expand Up @@ -3487,26 +3479,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
if !index.Unique {
continue
}
switch index.State {
case model.StateWriteReorganization:
isNew, ok := tblInfo.Partition.DDLChangedIndex[index.ID]
if !ok {
continue
}
if isNew {
// Newly created index, replacing old unique/global index
index.State = model.StatePublic
case model.StatePublic:
if index.Global {
// Mark the old global index as non-readable, and to be dropped
index.State = model.StateDeleteReorganization
} else {
inAllPartitionColumns, err := checkPartitionKeysConstraint(partInfo, index.Columns, tblInfo)
if err != nil {
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
if !inAllPartitionColumns {
// Mark the old unique index as non-readable, and to be dropped,
// since it is replaced by a global index
index.State = model.StateDeleteReorganization
}
}
continue
}
// Old index, should not be visible any longer,
// but needs to be kept up-to-date in case rollback happens.
index.State = model.StateWriteOnly
}
firstPartIdx, lastPartIdx, idMap, err2 := getReplacedPartitionIDs(partNames, tblInfo.Partition)
if err2 != nil {
Expand Down Expand Up @@ -3563,14 +3547,18 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver

var dropIndices []*model.IndexInfo
for _, indexInfo := range tblInfo.Indices {
if indexInfo.Unique && indexInfo.State == model.StateDeleteReorganization {
if indexInfo.Unique && indexInfo.State == model.StateWriteOnly {
// Drop the old unique (possible global) index, see onDropIndex
indexInfo.State = model.StateNone
DropIndexColumnFlag(tblInfo, indexInfo)
RemoveDependentHiddenColumns(tblInfo, indexInfo)
dropIndices = append(dropIndices, indexInfo)
}
}
// TODO: verify that the indexes are dropped,
// and that StateDeleteOnly+StateDeleteReorganization is not needed.
// local indexes is not an issue, since they will be gone with the dropped
// partitions, but replaced global indexes should be checked!
for _, indexInfo := range dropIndices {
removeIndexInfo(tblInfo, indexInfo)
}
Expand Down Expand Up @@ -3632,6 +3620,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
failpoint.Return(ver, errors.New("Injected error by reorgPartFail5"))
}
})
failpoint.Inject("updateVersionAndTableInfoErrInStateDeleteReorganization", func() {
failpoint.Return(ver, errors.New("Injected error in StateDeleteReorganization"))
})
args.OldPhysicalTblIDs = physicalTableIDs
args.NewPartitionIDs = newIDs
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
Expand Down
62 changes: 20 additions & 42 deletions pkg/ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job,
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}
Expand Down Expand Up @@ -371,51 +369,25 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
partNames = append(partNames, pd.Name.L)
}
var dropIndices []*model.IndexInfo
// When Global Index is duplicated to a non Global, we later need
// to know if if it was Global before (marked to be dropped) or not.
globalToUniqueDupMap := make(map[string]int64)
for _, indexInfo := range tblInfo.Indices {
if !indexInfo.Unique {
continue
}
switch indexInfo.State {
case model.StateWriteReorganization, model.StateDeleteOnly,
model.StateWriteOnly:
dropIndices = append(dropIndices, indexInfo)
case model.StateDeleteReorganization:
if pi.DDLState != model.StateDeleteReorganization {
continue
}
// Old index marked to be dropped, rollback by making it public again
indexInfo.State = model.StatePublic
if indexInfo.Global {
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
return ver, errors.NewNoStackErrorf("Duplicate global index names '%s', %d != %d", indexInfo.Name.O, indexInfo.ID, id)
}
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
case model.StatePublic:
if pi.DDLState != model.StateDeleteReorganization {
continue
isNew, ok := pi.DDLChangedIndex[indexInfo.ID]
if !ok {
// non-changed index
continue
}
if !isNew {
if pi.DDLState == model.StateDeleteReorganization {
// Revert the non-public state
indexInfo.State = model.StatePublic
}
// We cannot drop the index here, we need to wait until
// the next schema version
// i.e. rollback in rollbackLikeDropPartition
// New index that became public in this state,
// mark it to be dropped in next schema version
if indexInfo.Global {
indexInfo.State = model.StateDeleteReorganization
} else {
if pi.DDLState == model.StateDeleteReorganization {
indexInfo.State = model.StateWriteOnly
} else {
// How to know if this index was created as a duplicate or not?
if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok {
// The original index
if id >= indexInfo.ID {
return ver, errors.NewNoStackErrorf("Indexes in wrong order during rollback, '%s', %d >= %d", indexInfo.Name.O, id, indexInfo.ID)
}
indexInfo.State = model.StateDeleteReorganization
} else {
globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID
}
dropIndices = append(dropIndices, indexInfo)
}
}
}
Expand Down Expand Up @@ -466,13 +438,19 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot
return ver, errors.Trace(errors.New("Internal error, failed to find original partition definitions"))
}
pi.Definitions = newDefs
pi.Num = uint64(len(pi.Definitions))
} else {
// Move back to StateWriteReorganization, i.e. use the original table
// (non-partitioned or differently partitioned) as the main table to use.
// Otherwise, the Type does not match the expression.
pi.Type, pi.DDLType = pi.DDLType, pi.Type
pi.Expr, pi.DDLExpr = pi.DDLExpr, pi.Expr
pi.Columns, pi.DDLColumns = pi.DDLColumns, pi.Columns
pi.Definitions = pi.DroppingDefinitions
}
pi.Num = uint64(len(pi.Definitions))
// We should move back one state, since there might be other sessions seeing the new partitions.
job.SchemaState = model.StateWriteReorganization
pi.DDLState = job.SchemaState
}

args := jobCtx.jobArgs.(*model.TablePartitionArgs)
Expand Down
Loading