Skip to content

Commit b2c5f41

Browse files
authored
ddl: Non clustered reorg duplicate tidb rowid fix (#60132) (#60565)
close #59680
1 parent 2ce41da commit b2c5f41

File tree

8 files changed

+2067
-300
lines changed

8 files changed

+2067
-300
lines changed

pkg/ddl/partition.go

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3773,11 +3773,6 @@ type reorgPartitionWorker struct {
37733773
writeColOffsetMap map[int64]int
37743774
maxOffset int
37753775
reorgedTbl table.PartitionedTable
3776-
// Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
3777-
// and check if the old _tidb_rowid was already written or not.
3778-
// If the old _tidb_rowid already exists, then the row is already backfilled (double written)
3779-
// and can be skipped. Otherwise, we will insert it and generate index entries.
3780-
oldKeys []kv.Key
37813776
}
37823777

37833778
func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) {
@@ -3844,46 +3839,88 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
38443839
// i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
38453840
// and if so, skip it.
38463841
var found map[string][]byte
3847-
if len(w.oldKeys) > 0 {
3842+
lockKey := make([]byte, 0, tablecodec.RecordRowKeyLen)
3843+
lockKey = append(lockKey, handleRange.startKey[:tablecodec.TableSplitKeyLen]...)
3844+
if !w.table.Meta().HasClusteredIndex() && len(w.rowRecords) > 0 {
3845+
failpoint.InjectCall("PartitionBackfillNonClustered", w.rowRecords[0].vals)
38483846
// we must check if old IDs already been written,
38493847
// i.e. double written by StateWriteOnly or StateWriteReorganization.
3850-
// TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
3851-
found, err = txn.BatchGet(ctx, w.oldKeys)
3848+
3849+
// TODO: test how to use PresumeKeyNotExists/NeedConstraintCheckInPrewrite/DO_CONSTRAINT_CHECK
3850+
// to delay the check until commit.
3851+
// And handle commit errors and fall back to this method of checking all keys to see if we need to skip any.
3852+
newKeys := make([]kv.Key, 0, len(w.rowRecords))
3853+
for i := range w.rowRecords {
3854+
newKeys = append(newKeys, w.rowRecords[i].key)
3855+
}
3856+
found, err = txn.BatchGet(ctx, newKeys)
38523857
if err != nil {
38533858
return errors.Trace(err)
38543859
}
3860+
3861+
// TODO: Add test that kills (like `kill -9`) the currently running
3862+
// ddl owner, to see how it handles re-running this backfill when some batches has
3863+
// committed and reorgInfo has not been updated, so it needs to redo some batches.
38553864
}
3865+
tmpRow := make([]types.Datum, len(w.reorgedTbl.Cols()))
38563866

3857-
for i, prr := range w.rowRecords {
3867+
for _, prr := range w.rowRecords {
38583868
taskCtx.scanCount++
38593869
key := prr.key
3870+
lockKey = lockKey[:tablecodec.TableSplitKeyLen]
3871+
lockKey = append(lockKey, key[tablecodec.TableSplitKeyLen:]...)
3872+
// Lock the *old* key, since there can still be concurrent update happening on
3873+
// the rows from fetchRowColVals(). If we cannot lock the keys in this
3874+
// transaction and succeed when committing, then another transaction did update
3875+
// the same key, and we will fail and retry. When retrying, this key would be found
3876+
// through BatchGet and skipped.
3877+
// TODO: would it help to accumulate the keys in a slice and then only call this once?
3878+
err = txn.LockKeys(context.Background(), new(kv.LockCtx), lockKey)
3879+
if err != nil {
3880+
return errors.Trace(err)
3881+
}
38603882

3861-
// w.oldKeys is only set for non-clustered tables, in w.fetchRowColVals().
3862-
if len(w.oldKeys) > 0 {
3863-
if _, ok := found[string(w.oldKeys[i])]; ok {
3864-
// Already filled, i.e. double written earlier by concurrent DML
3883+
if vals, ok := found[string(key)]; ok {
3884+
if len(vals) == len(prr.vals) && bytes.Equal(vals, prr.vals) {
3885+
// Already backfilled or double written earlier by concurrent DML
38653886
continue
38663887
}
3867-
3868-
// Check if we can lock the old key, since there can still be concurrent update
3869-
// happening on the rows from fetchRowColVals(), if we cannot lock the keys in this
3870-
// transaction and succeed when committing, then another transaction did update
3871-
// the same key, and we will fail and retry. When retrying, this key would be found
3872-
// through BatchGet and skipped.
3873-
err = txn.LockKeys(context.Background(), new(kv.LockCtx), w.oldKeys[i])
3888+
// Not same row, due to earlier EXCHANGE PARTITION.
3889+
// Update the current read row by Remove it and Add it back (which will give it a new _tidb_rowid)
3890+
// which then also will be used as unique id in the new partition.
3891+
var h kv.Handle
3892+
var currPartID int64
3893+
currPartID, h, err = tablecodec.DecodeRecordKey(lockKey)
38743894
if err != nil {
38753895
return errors.Trace(err)
38763896
}
3877-
3878-
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
3879-
// Generate new _tidb_rowid.
3880-
recordID, err := tables.AllocHandle(w.ctx, w.tblCtx, w.reorgedTbl)
3897+
_, err = w.rowDecoder.DecodeTheExistedColumnMap(w.exprCtx, h, prr.vals, w.loc, w.rowMap)
38813898
if err != nil {
38823899
return errors.Trace(err)
38833900
}
3884-
3885-
// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
3886-
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], recordID)
3901+
for _, col := range w.table.WritableCols() {
3902+
d, ok := w.rowMap[col.ID]
3903+
if !ok {
3904+
return dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs()
3905+
}
3906+
tmpRow[col.Offset] = d
3907+
}
3908+
// Use RemoveRecord/AddRecord to keep the indexes in-sync!
3909+
pt := w.table.GetPartitionedTable().GetPartition(currPartID)
3910+
err = pt.RemoveRecord(w.tblCtx, txn, h, tmpRow)
3911+
if err != nil {
3912+
return errors.Trace(err)
3913+
}
3914+
h, err = pt.AddRecord(w.tblCtx, txn, tmpRow)
3915+
if err != nil {
3916+
return errors.Trace(err)
3917+
}
3918+
w.cleanRowMap()
3919+
// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2 ("_r")
3920+
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], h)
3921+
// OK to only do txn.Set() for the new partition, and defer creating the indexes,
3922+
// since any DML changes the record it will also update or create the indexes,
3923+
// by doing RemoveRecord+UpdateRecord
38873924
}
38883925
err = txn.Set(key, prr.vals)
38893926
if err != nil {
@@ -3900,8 +3937,6 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
39003937

39013938
func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) {
39023939
w.rowRecords = w.rowRecords[:0]
3903-
isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle
3904-
w.oldKeys = w.oldKeys[:0]
39053940
startTime := time.Now()
39063941

39073942
// taskDone means that the added handle is out of taskRange.endHandle.
@@ -3944,12 +3979,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
39443979
newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...)
39453980
w.rowRecords = append(w.rowRecords, &rowRecord{key: newKey, vals: rawRow})
39463981

3947-
if !isClustered {
3948-
oldKey := newKey[:tablecodec.TableSplitKeyLen]
3949-
oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...)
3950-
w.oldKeys = append(w.oldKeys, oldKey)
3951-
}
3952-
39533982
w.cleanRowMap()
39543983
lastAccessedHandle = recordKey
39553984
if recordKey.Cmp(taskRange.endKey) == 0 {

pkg/ddl/tests/partition/db_partition_test.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3216,31 +3216,31 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
32163216
tk2.MustExec(`insert into t values (null, 26)`)
32173217
tk3.MustExec(`COMMIT`)
32183218
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3219+
"13 11 11",
3220+
"14 2 2",
3221+
"15 12 12",
3222+
"17 16 18",
3223+
"19 18 4",
3224+
"21 20 5",
3225+
"23 22 6",
3226+
"25 24 7",
32193227
"27 26 8",
3220-
"30012 12 12",
3221-
"30013 18 4",
3222-
"30014 24 7",
3223-
"30015 16 18",
3224-
"30016 22 6",
3225-
"30017 28 9",
3226-
"30018 11 11",
3227-
"30019 2 2",
3228-
"30020 20 5",
3228+
"29 28 9",
32293229
"31 30 10",
32303230
"35 34 22",
32313231
"39 38 24",
32323232
"43 42 26"))
32333233
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3234+
"13 11 11",
3235+
"14 2 2",
3236+
"15 12 12",
3237+
"17 16 18",
3238+
"19 18 4",
3239+
"21 20 5",
3240+
"23 22 6",
3241+
"25 24 7",
32343242
"27 26 8",
3235-
"30012 12 12",
3236-
"30013 18 4",
3237-
"30014 24 7",
3238-
"30015 16 18",
3239-
"30016 22 6",
3240-
"30017 28 9",
3241-
"30018 11 11",
3242-
"30019 2 2",
3243-
"30020 20 5",
3243+
"29 28 9",
32443244
"31 30 10",
32453245
"33 32 21",
32463246
"35 34 22",
@@ -3250,16 +3250,16 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
32503250
waitFor(4, "t", "public")
32513251
tk2.MustExec(`commit`)
32523252
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
3253+
"13 11 11",
3254+
"14 2 2",
3255+
"15 12 12",
3256+
"17 16 18",
3257+
"19 18 4",
3258+
"21 20 5",
3259+
"23 22 6",
3260+
"25 24 7",
32533261
"27 26 8",
3254-
"30012 12 12",
3255-
"30013 18 4",
3256-
"30014 24 7",
3257-
"30015 16 18",
3258-
"30016 22 6",
3259-
"30017 28 9",
3260-
"30018 11 11",
3261-
"30019 2 2",
3262-
"30020 20 5",
3262+
"29 28 9",
32633263
"31 30 10",
32643264
"33 32 21",
32653265
"35 34 22",

0 commit comments

Comments
 (0)