Skip to content

Commit 3157fc3

Browse files
authored
*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114) (#58124)
ref #45133, close #56822, close #57510
1 parent f2c805b commit 3157fc3

File tree

7 files changed

+362
-117
lines changed

7 files changed

+362
-117
lines changed

pkg/ddl/partition.go

Lines changed: 88 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import (
4444
"github.com/pingcap/tidb/pkg/parser/model"
4545
"github.com/pingcap/tidb/pkg/parser/mysql"
4646
"github.com/pingcap/tidb/pkg/parser/opcode"
47-
"github.com/pingcap/tidb/pkg/parser/terror"
4847
"github.com/pingcap/tidb/pkg/sessionctx"
4948
"github.com/pingcap/tidb/pkg/sessionctx/variable"
5049
"github.com/pingcap/tidb/pkg/table"
@@ -3143,6 +3142,12 @@ type reorgPartitionWorker struct {
31433142
writeColOffsetMap map[int64]int
31443143
maxOffset int
31453144
reorgedTbl table.PartitionedTable
3145+
// Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
3146+
// and check if the old _tidb_rowid was already written or not.
3147+
// If the old _tidb_rowid already exists, then the row is already backfilled (double written)
3148+
// and can be skipped. Otherwise, we will insert it with a new _tidb_rowid.
3149+
// The original _tidb_rowids, used to check if already backfilled (double written).
3150+
oldKeys []kv.Key
31463151
}
31473152

31483153
func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) (*reorgPartitionWorker, error) {
@@ -3190,54 +3195,92 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
31903195
}
31913196
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)
31923197

3193-
rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
3198+
nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
31943199
if err != nil {
31953200
return errors.Trace(err)
31963201
}
31973202
taskCtx.nextKey = nextKey
31983203
taskCtx.done = taskDone
31993204

3200-
warningsMap := make(map[errors.ErrorID]*terror.Error)
3201-
warningsCountMap := make(map[errors.ErrorID]int64)
3202-
for _, prr := range rowRecords {
3203-
taskCtx.scanCount++
3204-
3205-
err = txn.Set(prr.key, prr.vals)
3205+
var found map[string][]byte
3206+
// If non-clustered table, then we need to replace the _tidb_rowid handles since
3207+
// there may be duplicates across different partitions, due to EXCHANGE PARTITION.
3208+
// Meaning we need to check here if a record was double written to the new partition,
3209+
// i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
3210+
// If so, then we must skip it.
3211+
if len(w.oldKeys) > 0 {
3212+
// If we skip checking, then we will duplicate that double written row, with a new _tidb_rowid.
3213+
found, err = txn.BatchGet(ctx, w.oldKeys)
32063214
if err != nil {
32073215
return errors.Trace(err)
32083216
}
3209-
taskCtx.addedCount++
3210-
if prr.warning != nil {
3211-
if _, ok := warningsCountMap[prr.warning.ID()]; ok {
3212-
warningsCountMap[prr.warning.ID()]++
3213-
} else {
3214-
warningsCountMap[prr.warning.ID()] = 1
3215-
warningsMap[prr.warning.ID()] = prr.warning
3216-
}
3217-
}
3218-
// TODO: Future optimization: also write the indexes here?
3219-
// What if the transaction limit is just enough for a single row, without index?
3220-
// Hmm, how could that be in the first place?
3221-
// For now, implement the batch-txn w.addTableIndex,
3222-
// since it already exists and is in use
3217+
// TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
32233218
}
32243219

3225-
// Collect the warnings.
3226-
taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap
3220+
failpoint.Call("github.com/pingcap/tidb/pkg/ddl/PartitionBackfillData", len(w.rowRecords) > 0)
3221+
for i, prr := range w.rowRecords {
3222+
taskCtx.scanCount++
3223+
key := prr.key
3224+
if len(w.oldKeys) > 0 {
3225+
if _, ok := found[string(w.oldKeys[i])]; ok {
3226+
// Already filled, i.e. double written by concurrent DML.
3227+
continue
3228+
}
32273229

3228-
// also add the index entries here? And make sure they are not added somewhere else
3230+
// Pretend/Check if we can write the old key,
3231+
// since there can still be a concurrent update/insert happening that would
3232+
// cause a duplicate.
3233+
err = txn.Set(w.oldKeys[i], prr.vals)
3234+
if err != nil {
3235+
return errors.Trace(err)
3236+
}
3237+
err = txn.SetAssertion(w.oldKeys[i], kv.SetAssertNotExist)
3238+
if err != nil {
3239+
return errors.Trace(err)
3240+
}
3241+
// Don't actually write it, just make sure this transaction would
3242+
// fail if another transaction writes the same key before us.
3243+
err = txn.Delete(w.oldKeys[i])
3244+
if err != nil {
3245+
return errors.Trace(err)
3246+
}
3247+
// Generate new _tidb_rowid.
3248+
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
3249+
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
3250+
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
3251+
// Keep using the original table's allocator
3252+
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids)
3253+
if err != nil {
3254+
return errors.Trace(err)
3255+
}
3256+
}
3257+
recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl)
3258+
if err != nil {
3259+
return errors.Trace(err)
3260+
}
32293261

3262+
// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
3263+
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], recordID)
3264+
}
3265+
err = txn.Set(key, prr.vals)
3266+
if err != nil {
3267+
return errors.Trace(err)
3268+
}
3269+
taskCtx.addedCount++
3270+
}
32303271
return nil
32313272
})
32323273
logSlowOperations(time.Since(oprStartTime), "BackfillData", 3000)
32333274

32343275
return
32353276
}
32363277

3237-
func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) {
3278+
func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) {
32383279
w.rowRecords = w.rowRecords[:0]
3280+
w.oldKeys = w.oldKeys[:0]
32393281
startTime := time.Now()
32403282

3283+
isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle
32413284
// taskDone means that the added handle is out of taskRange.endHandle.
32423285
taskDone := false
32433286
sysTZ := w.sessCtx.GetSessionVars().StmtCtx.TimeZone()
@@ -3257,8 +3300,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
32573300
return false, nil
32583301
}
32593302

3260-
// TODO: Extend for normal tables
3261-
// TODO: Extend for REMOVE PARTITIONING
32623303
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, sysTZ, w.rowMap)
32633304
if err != nil {
32643305
return false, errors.Trace(err)
@@ -3276,34 +3317,14 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
32763317
if err != nil {
32773318
return false, errors.Trace(err)
32783319
}
3279-
var newKey kv.Key
3280-
if w.reorgedTbl.Meta().PKIsHandle || w.reorgedTbl.Meta().IsCommonHandle {
3281-
pid := p.GetPhysicalID()
3282-
newKey = tablecodec.EncodeTablePrefix(pid)
3283-
newKey = append(newKey, recordKey[len(newKey):]...)
3284-
} else {
3285-
// Non-clustered table / not unique _tidb_rowid for the whole table
3286-
// Generate new _tidb_rowid if exists.
3287-
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
3288-
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
3289-
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
3290-
// TODO: Which autoid allocator to use?
3291-
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
3292-
// Keep using the original table's allocator
3293-
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids)
3294-
if err != nil {
3295-
return false, errors.Trace(err)
3296-
}
3297-
}
3298-
recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl)
3299-
if err != nil {
3300-
return false, errors.Trace(err)
3301-
}
3302-
newKey = tablecodec.EncodeRecordKey(p.RecordPrefix(), recordID)
3320+
newKey := tablecodec.EncodeTablePrefix(p.GetPhysicalID())
3321+
newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...)
3322+
w.rowRecords = append(w.rowRecords, &rowRecord{key: newKey, vals: rawRow})
3323+
if !isClustered {
3324+
oldKey := newKey[:tablecodec.TableSplitKeyLen]
3325+
oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...)
3326+
w.oldKeys = append(w.oldKeys, oldKey)
33033327
}
3304-
w.rowRecords = append(w.rowRecords, &rowRecord{
3305-
key: newKey, vals: rawRow,
3306-
})
33073328

33083329
w.cleanRowMap()
33093330
lastAccessedHandle = recordKey
@@ -3318,8 +3339,8 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
33183339
taskDone = true
33193340
}
33203341

3321-
logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
3322-
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
3342+
logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)), zap.Error(err))
3343+
return getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
33233344
}
33243345

33253346
func (w *reorgPartitionWorker) cleanRowMap() {
@@ -3341,17 +3362,20 @@ func (w *reorgPartitionWorker) GetCtx() *backfillCtx {
33413362
}
33423363

33433364
func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) error {
3344-
// First copy all table data to the new partitions
3365+
// First copy all table data to the new AddingDefinitions partitions
33453366
// from each of the DroppingDefinitions partitions.
3346-
// Then create all indexes on the AddingDefinitions partitions
3347-
// for each new index, one partition at a time.
3367+
// Then create all indexes on the AddingDefinitions partitions,
33483368

33493369
// Copy the data from the DroppingDefinitions to the AddingDefinitions
33503370
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
33513371
err := w.updatePhysicalTableRow(t, reorgInfo)
33523372
if err != nil {
33533373
return errors.Trace(err)
33543374
}
3375+
if len(reorgInfo.elements) <= 1 {
3376+
// No indexes to (re)create, all done!
3377+
return nil
3378+
}
33553379
}
33563380

33573381
failpoint.Inject("reorgPartitionAfterDataCopy", func(val failpoint.Value) {
@@ -3361,32 +3385,11 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
33613385
}
33623386
})
33633387

3364-
// Rewrite this to do all indexes at once in addTableIndex
3365-
// instead of calling it once per index (meaning reading the table multiple times)
3366-
// But for now, try to understand how it works...
3367-
firstNewPartitionID := t.Meta().Partition.AddingDefinitions[0].ID
3368-
startElementOffset := 0
3369-
//startElementOffsetToResetHandle := -1
3370-
// This backfill job starts with backfilling index data, whose index ID is currElement.ID.
33713388
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) {
3372-
// First run, have not yet started backfilling index data
3373-
// Restart with the first new partition.
3374-
// TODO: handle remove partitioning
3375-
reorgInfo.PhysicalTableID = firstNewPartitionID
3376-
} else {
3377-
// The job was interrupted and has been restarted,
3378-
// reset and start from where it was done
3379-
for i, element := range reorgInfo.elements[1:] {
3380-
if reorgInfo.currElement.ID == element.ID {
3381-
startElementOffset = i
3382-
//startElementOffsetToResetHandle = i
3383-
break
3384-
}
3385-
}
3386-
}
3387-
3388-
for i := startElementOffset; i < len(reorgInfo.elements[1:]); i++ {
3389-
// Now build the indexes in the new partitions
3389+
// row data has been copied, now proceed with creating the indexes
3390+
// on the new AddingDefinitions partitions
3391+
reorgInfo.PhysicalTableID = t.Meta().Partition.AddingDefinitions[0].ID
3392+
reorgInfo.currElement = reorgInfo.elements[1]
33903393
var physTbl table.PhysicalTable
33913394
if tbl, ok := t.(table.PartitionedTable); ok {
33923395
physTbl = tbl.GetPartition(reorgInfo.PhysicalTableID)
@@ -3399,10 +3402,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
33993402
if err != nil {
34003403
return errors.Trace(err)
34013404
}
3402-
// TODO: Can we improve this in case of a crash?
3403-
// like where the regInfo PhysicalTableID and element is the same,
3404-
// and the tableid in the key-prefix regInfo.StartKey and regInfo.EndKey matches with PhysicalTableID
3405-
// do not change the reorgInfo start/end key
34063405
startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority)
34073406
if err != nil {
34083407
return errors.Trace(err)
@@ -3411,8 +3410,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
34113410
// Always (re)start with the full PhysicalTable range
34123411
reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle
34133412

3414-
// Update the element in the reorgInfo for updating the reorg meta below.
3415-
reorgInfo.currElement = reorgInfo.elements[i+1]
34163413
// Write the reorg info to store so the whole reorganize process can recover from panic.
34173414
err = reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool)
34183415
logutil.BgLogger().Info("update column and indexes", zap.String("category", "ddl"),
@@ -3429,7 +3426,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
34293426
if err != nil {
34303427
return errors.Trace(err)
34313428
}
3432-
reorgInfo.PhysicalTableID = firstNewPartitionID
34333429
}
34343430
failpoint.Inject("reorgPartitionAfterIndex", func(val failpoint.Value) {
34353431
//nolint:forcetypeassert

pkg/ddl/tests/partition/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ go_test(
66
srcs = [
77
"db_partition_test.go",
88
"main_test.go",
9+
"multi_domain_test.go",
910
],
1011
flaky = True,
11-
shard_count = 49,
12+
shard_count = 50,
1213
deps = [
1314
"//pkg/config",
1415
"//pkg/ddl",

pkg/ddl/tests/partition/db_partition_test.go

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3608,10 +3608,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
36083608
tk3.MustExec(`COMMIT`)
36093609
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
36103610
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
3611-
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9"))
3611+
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9"))
36123612
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
36133613
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
3614-
"19 18 4", "23 22 6", "27 26 8", "32 31 10"))
3614+
"19 18 4", "23 22 6", "27 26 8", "31 30 10"))
36153615

36163616
waitFor(4, "t", "write reorganization")
36173617
tk3.MustExec(`BEGIN`)
@@ -3621,28 +3621,20 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
36213621
tk3.MustExec(`insert into t values (null, 23)`)
36223622
tk2.MustExec(`COMMIT`)
36233623

3624-
/*
3625-
waitFor(4, "t", "delete reorganization")
3626-
tk2.MustExec(`BEGIN`)
3627-
tk2.MustExec(`insert into t values (null, 24)`)
3624+
waitFor(4, "t", "delete reorganization")
3625+
tk2.MustExec(`BEGIN`)
3626+
tk2.MustExec(`insert into t values (null, 24)`)
36283627

3629-
tk3.MustExec(`insert into t values (null, 25)`)
3630-
tk2.MustExec(`insert into t values (null, 26)`)
3631-
*/
3628+
tk3.MustExec(`insert into t values (null, 25)`)
3629+
tk2.MustExec(`insert into t values (null, 26)`)
36323630
tk3.MustExec(`COMMIT`)
36333631
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3634-
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
3635-
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
3636-
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))
3637-
3638-
//waitFor(4, "t", "public")
3639-
//tk2.MustExec(`commit`)
3640-
// TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904
3641-
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'")
3632+
"27 26 8", "30012 12 12", "30013 18 4", "30014 24 7", "30264 16 18", "30265 22 6", "30266 28 9", "30516 11 11", "30517 2 2", "30518 20 5", "31 30 10", "33 32 21", "35 34 22", "37 36 23", "41 40 25"))
3633+
waitFor(4, "t", "none")
3634+
tk2.MustExec(`commit`)
3635+
require.NoError(t, <-alterChan)
36423636
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3643-
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
3644-
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
3645-
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))
3637+
"27 26 8", "30012 12 12", "30013 18 4", "30014 24 7", "30264 16 18", "30265 22 6", "30266 28 9", "30516 11 11", "30517 2 2", "30518 20 5", "31 30 10", "33 32 21", "35 34 22", "37 36 23", "39 38 24", "41 40 25", "43 42 26"))
36463638
}
36473639

36483640
func TestAlterLastIntervalPartition(t *testing.T) {

0 commit comments

Comments
 (0)