Skip to content

Commit 13fe7ab

Browse files
authored
ddl: Non clustered reorg duplicate tidb rowid fix (#60132)
close #59680
1 parent a239200 commit 13fe7ab

File tree

8 files changed

+2065
-300
lines changed

8 files changed

+2065
-300
lines changed

pkg/ddl/partition.go

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3755,11 +3755,6 @@ type reorgPartitionWorker struct {
37553755
writeColOffsetMap map[int64]int
37563756
maxOffset int
37573757
reorgedTbl table.PartitionedTable
3758-
// Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
3759-
// and check if the old _tidb_rowid was already written or not.
3760-
// If the old _tidb_rowid already exists, then the row is already backfilled (double written)
3761-
// and can be skipped. Otherwise, we will insert it and generate index entries.
3762-
oldKeys []kv.Key
37633758
}
37643759

37653760
func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) {
@@ -3826,46 +3821,88 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
38263821
// i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
38273822
// and if so, skip it.
38283823
var found map[string][]byte
3829-
if len(w.oldKeys) > 0 {
3824+
lockKey := make([]byte, 0, tablecodec.RecordRowKeyLen)
3825+
lockKey = append(lockKey, handleRange.startKey[:tablecodec.TableSplitKeyLen]...)
3826+
if !w.table.Meta().HasClusteredIndex() && len(w.rowRecords) > 0 {
3827+
failpoint.InjectCall("PartitionBackfillNonClustered", w.rowRecords[0].vals)
38303828
// we must check if old IDs already been written,
38313829
// i.e. double written by StateWriteOnly or StateWriteReorganization.
3832-
// TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
3833-
found, err = txn.BatchGet(ctx, w.oldKeys)
3830+
3831+
// TODO: test how to use PresumeKeyNotExists/NeedConstraintCheckInPrewrite/DO_CONSTRAINT_CHECK
3832+
// to delay the check until commit.
3833+
// And handle commit errors and fall back to this method of checking all keys to see if we need to skip any.
3834+
newKeys := make([]kv.Key, 0, len(w.rowRecords))
3835+
for i := range w.rowRecords {
3836+
newKeys = append(newKeys, w.rowRecords[i].key)
3837+
}
3838+
found, err = txn.BatchGet(ctx, newKeys)
38343839
if err != nil {
38353840
return errors.Trace(err)
38363841
}
3842+
3843+
// TODO: Add test that kills (like `kill -9`) the currently running
3844+
// ddl owner, to see how it handles re-running this backfill when some batches has
3845+
// committed and reorgInfo has not been updated, so it needs to redo some batches.
38373846
}
3847+
tmpRow := make([]types.Datum, len(w.reorgedTbl.Cols()))
38383848

3839-
for i, prr := range w.rowRecords {
3849+
for _, prr := range w.rowRecords {
38403850
taskCtx.scanCount++
38413851
key := prr.key
3852+
lockKey = lockKey[:tablecodec.TableSplitKeyLen]
3853+
lockKey = append(lockKey, key[tablecodec.TableSplitKeyLen:]...)
3854+
// Lock the *old* key, since there can still be concurrent update happening on
3855+
// the rows from fetchRowColVals(). If we cannot lock the keys in this
3856+
// transaction and succeed when committing, then another transaction did update
3857+
// the same key, and we will fail and retry. When retrying, this key would be found
3858+
// through BatchGet and skipped.
3859+
// TODO: would it help to accumulate the keys in a slice and then only call this once?
3860+
err = txn.LockKeys(context.Background(), new(kv.LockCtx), lockKey)
3861+
if err != nil {
3862+
return errors.Trace(err)
3863+
}
38423864

3843-
// w.oldKeys is only set for non-clustered tables, in w.fetchRowColVals().
3844-
if len(w.oldKeys) > 0 {
3845-
if _, ok := found[string(w.oldKeys[i])]; ok {
3846-
// Already filled, i.e. double written earlier by concurrent DML
3865+
if vals, ok := found[string(key)]; ok {
3866+
if len(vals) == len(prr.vals) && bytes.Equal(vals, prr.vals) {
3867+
// Already backfilled or double written earlier by concurrent DML
38473868
continue
38483869
}
3849-
3850-
// Check if we can lock the old key, since there can still be concurrent update
3851-
// happening on the rows from fetchRowColVals(), if we cannot lock the keys in this
3852-
// transaction and succeed when committing, then another transaction did update
3853-
// the same key, and we will fail and retry. When retrying, this key would be found
3854-
// through BatchGet and skipped.
3855-
err = txn.LockKeys(context.Background(), new(kv.LockCtx), w.oldKeys[i])
3870+
// Not same row, due to earlier EXCHANGE PARTITION.
3871+
// Update the current read row by Remove it and Add it back (which will give it a new _tidb_rowid)
3872+
// which then also will be used as unique id in the new partition.
3873+
var h kv.Handle
3874+
var currPartID int64
3875+
currPartID, h, err = tablecodec.DecodeRecordKey(lockKey)
38563876
if err != nil {
38573877
return errors.Trace(err)
38583878
}
3859-
3860-
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
3861-
// Generate new _tidb_rowid.
3862-
recordID, err := tables.AllocHandle(w.ctx, w.tblCtx, w.reorgedTbl)
3879+
_, err = w.rowDecoder.DecodeTheExistedColumnMap(w.exprCtx, h, prr.vals, w.loc, w.rowMap)
38633880
if err != nil {
38643881
return errors.Trace(err)
38653882
}
3866-
3867-
// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
3868-
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], recordID)
3883+
for _, col := range w.table.WritableCols() {
3884+
d, ok := w.rowMap[col.ID]
3885+
if !ok {
3886+
return dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs()
3887+
}
3888+
tmpRow[col.Offset] = d
3889+
}
3890+
// Use RemoveRecord/AddRecord to keep the indexes in-sync!
3891+
pt := w.table.GetPartitionedTable().GetPartition(currPartID)
3892+
err = pt.RemoveRecord(w.tblCtx, txn, h, tmpRow)
3893+
if err != nil {
3894+
return errors.Trace(err)
3895+
}
3896+
h, err = pt.AddRecord(w.tblCtx, txn, tmpRow)
3897+
if err != nil {
3898+
return errors.Trace(err)
3899+
}
3900+
w.cleanRowMap()
3901+
// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2 ("_r")
3902+
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], h)
3903+
// OK to only do txn.Set() for the new partition, and defer creating the indexes,
3904+
// since any DML changes the record it will also update or create the indexes,
3905+
// by doing RemoveRecord+UpdateRecord
38693906
}
38703907
err = txn.Set(key, prr.vals)
38713908
if err != nil {
@@ -3882,8 +3919,6 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
38823919

38833920
func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) {
38843921
w.rowRecords = w.rowRecords[:0]
3885-
isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle
3886-
w.oldKeys = w.oldKeys[:0]
38873922
startTime := time.Now()
38883923

38893924
// taskDone means that the added handle is out of taskRange.endHandle.
@@ -3926,12 +3961,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
39263961
newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...)
39273962
w.rowRecords = append(w.rowRecords, &rowRecord{key: newKey, vals: rawRow})
39283963

3929-
if !isClustered {
3930-
oldKey := newKey[:tablecodec.TableSplitKeyLen]
3931-
oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...)
3932-
w.oldKeys = append(w.oldKeys, oldKey)
3933-
}
3934-
39353964
w.cleanRowMap()
39363965
lastAccessedHandle = recordKey
39373966
if recordKey.Cmp(taskRange.endKey) == 0 {

pkg/ddl/tests/partition/db_partition_test.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3218,31 +3218,31 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
32183218
tk2.MustExec(`insert into t values (null, 26)`)
32193219
tk3.MustExec(`COMMIT`)
32203220
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3221+
"13 11 11",
3222+
"14 2 2",
3223+
"15 12 12",
3224+
"17 16 18",
3225+
"19 18 4",
3226+
"21 20 5",
3227+
"23 22 6",
3228+
"25 24 7",
32213229
"27 26 8",
3222-
"30012 12 12",
3223-
"30013 18 4",
3224-
"30014 24 7",
3225-
"30015 16 18",
3226-
"30016 22 6",
3227-
"30017 28 9",
3228-
"30018 11 11",
3229-
"30019 2 2",
3230-
"30020 20 5",
3230+
"29 28 9",
32313231
"31 30 10",
32323232
"35 34 22",
32333233
"39 38 24",
32343234
"43 42 26"))
32353235
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3236+
"13 11 11",
3237+
"14 2 2",
3238+
"15 12 12",
3239+
"17 16 18",
3240+
"19 18 4",
3241+
"21 20 5",
3242+
"23 22 6",
3243+
"25 24 7",
32363244
"27 26 8",
3237-
"30012 12 12",
3238-
"30013 18 4",
3239-
"30014 24 7",
3240-
"30015 16 18",
3241-
"30016 22 6",
3242-
"30017 28 9",
3243-
"30018 11 11",
3244-
"30019 2 2",
3245-
"30020 20 5",
3245+
"29 28 9",
32463246
"31 30 10",
32473247
"33 32 21",
32483248
"35 34 22",
@@ -3252,16 +3252,16 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
32523252
waitFor(4, "t", "public")
32533253
tk2.MustExec(`commit`)
32543254
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3255+
"13 11 11",
3256+
"14 2 2",
3257+
"15 12 12",
3258+
"17 16 18",
3259+
"19 18 4",
3260+
"21 20 5",
3261+
"23 22 6",
3262+
"25 24 7",
32553263
"27 26 8",
3256-
"30012 12 12",
3257-
"30013 18 4",
3258-
"30014 24 7",
3259-
"30015 16 18",
3260-
"30016 22 6",
3261-
"30017 28 9",
3262-
"30018 11 11",
3263-
"30019 2 2",
3264-
"30020 20 5",
3264+
"29 28 9",
32653265
"31 30 10",
32663266
"33 32 21",
32673267
"35 34 22",

0 commit comments

Comments
 (0)