Skip to content
Merged
Show file tree
Hide file tree
Changes from 79 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
afeae7f
Change from table.AddRecord, to txn.Set, with check for concurrent tr…
mjonss Dec 13, 2024
73d75b2
Moved the AssertNotExists before txn.Set
mjonss Dec 17, 2024
1f87051
Using LockKeys instead.
mjonss Dec 17, 2024
75fc1b4
Added a comment.
mjonss Dec 18, 2024
fc27c67
Merge remote-tracking branch 'pingcap/master' into non-cluster-reorg-…
mjonss Dec 18, 2024
1e26b51
Merge remote-tracking branch 'pingcap/master' into non-cluster-reorg-…
mjonss Jan 8, 2025
d3cf5bc
Updated test
mjonss Jan 9, 2025
3c03c0e
Merge remote-tracking branch 'pingcap/master' into non-cluster-reorg-…
mjonss Jan 9, 2025
12d12cb
Updated test
mjonss Jan 10, 2025
21c0b51
refined test
mjonss Jan 10, 2025
3c6b00d
tmp commit
mjonss Jan 10, 2025
5567535
Fix for bad index asserts during REORGANIZE PARTITION
mjonss Jan 13, 2025
d77b897
Linting
mjonss Jan 13, 2025
6269f23
Improved tests
mjonss Jan 13, 2025
ea73d39
Added more tests
mjonss Jan 13, 2025
af041af
Linting
mjonss Jan 21, 2025
8fbad34
Added more tests and TODOs
mjonss Mar 11, 2025
409e05b
Merge remote-tracking branch 'pingcap/master' into non-cluster-reorg-…
mjonss Mar 11, 2025
f0cc426
Fixed compilation
mjonss Mar 11, 2025
285c254
bazel_prepare
mjonss Mar 11, 2025
7265545
Refined test
mjonss Mar 12, 2025
b34318c
minor cleanups and added more tests
mjonss Mar 12, 2025
9c9ea94
Only generate new _tidb_rowid in case of actual clash
mjonss Mar 12, 2025
b013c04
Updated test result
mjonss Mar 12, 2025
e61a688
Updated test results
mjonss Mar 12, 2025
1049378
Cleanups and adding a simpler test case for the Assert.
mjonss Mar 13, 2025
95278e5
Updated test.
mjonss Mar 13, 2025
36f5314
Linting
mjonss Mar 13, 2025
bcd45c0
Merge branch 'non-cluster-reorg-tests' into non-clustered-reorg-part-fix
mjonss Mar 14, 2025
06adad0
Also locked clustered tables, and enabled more testing
mjonss Mar 14, 2025
4e31f62
Added test
mjonss Mar 17, 2025
7b94940
Added map between _tidb_rowid for old and new partitions
mjonss Mar 18, 2025
4ef60b3
Linting
mjonss Mar 18, 2025
ffdb38c
Linting
mjonss Mar 18, 2025
8769496
Added test for concurrent DML during backfill to make it retry.
mjonss Mar 18, 2025
552f20b
WIP!!! Added map for new _tidb_rowid's for non-clustered partitioned …
mjonss Mar 19, 2025
c2f0ed4
Missed slice index somehow?!?
mjonss Mar 19, 2025
e0df62d
current test works, but rest fails... WIP
mjonss Mar 19, 2025
5ad9ef7
mostly cleanup
mjonss Mar 19, 2025
7b4a52d
All tests passes, now time for cleanup and refactoring :)
mjonss Mar 20, 2025
ef9efe9
test cleanup
mjonss Mar 20, 2025
2306147
Linting
mjonss Mar 20, 2025
ee29b13
Test fixes
mjonss Mar 20, 2025
55677f5
Removed debug logs
mjonss Mar 20, 2025
8f52d13
Merge branch 'master' into non-clustered-reorg-duplicate-_tidb_rowid-fix
mjonss Mar 20, 2025
1bcbcce
Missed one place to set genRecordID for IsUpdate()
mjonss Mar 20, 2025
4c007fa
Updated tests, shows missing cleanup of temporary index map
mjonss Mar 20, 2025
cf15a3e
Added deleteRange for temporary index map of _tidb_rowid
mjonss Mar 20, 2025
d68b493
Tests for temporary index map now passes
mjonss Mar 21, 2025
f18e90b
Fixed test
mjonss Mar 21, 2025
86dc175
Test to show Delete needs to use map as well.
mjonss Mar 21, 2025
a75e82e
Added test case for showing missing deletes
mjonss Mar 22, 2025
eeb9272
Fixed delete for non-clus...
mjonss Mar 22, 2025
57ef716
Still not reproduced the update deletes the wrong newFrom row, but fo…
mjonss Mar 22, 2025
64d6255
Found a new issue, extra row?!?
mjonss Mar 22, 2025
a951c86
Fixed where we could miss to delete a key
mjonss Mar 22, 2025
340b7d6
Fix for ddlargsv1
mjonss Mar 22, 2025
a415204
WIP: testing keeping both curr and other partition set in sync instea…
mjonss Mar 23, 2025
cbd8fdb
Changed to backfill does Remove+Add of curr rec
mjonss Mar 24, 2025
d6ca69c
Reverted change for temporary index map
mjonss Mar 24, 2025
4a1bb9c
Linting
mjonss Mar 24, 2025
4c6ecb2
Moved the full row check to be done if needed
mjonss Mar 24, 2025
eb4e471
removed debug code
mjonss Mar 24, 2025
fe5b854
code cleanups
mjonss Mar 24, 2025
dc4489d
minor test code cleanup
mjonss Mar 24, 2025
33fbc9f
revert non-relevant ddlEventCh warn log
mjonss Mar 24, 2025
cf05cee
more code cleanup, fetchRowColVals
mjonss Mar 24, 2025
2bdbe54
more code cleanups, to make the code diff smaller
mjonss Mar 24, 2025
322f130
Use getPartition instead of GetPartition :)
mjonss Mar 24, 2025
02e3554
test cleanup
mjonss Mar 24, 2025
c3ea51e
Test cleanup (and bug reporting)
mjonss Mar 25, 2025
ef0f5e9
Test cleanups
mjonss Mar 25, 2025
ea4bf0e
Test cleanups
mjonss Mar 25, 2025
86cc881
Typo _tikv_rowid -> _tidb_rowid
mjonss Mar 26, 2025
204b708
Addressed reviewers comments, simplified the code.
mjonss Mar 26, 2025
d674822
Merge remote-tracking branch 'pingcap/master' into non-clustered-reor…
mjonss Mar 26, 2025
28936d6
Linting
mjonss Mar 26, 2025
5a6f6a9
linting
mjonss Mar 26, 2025
d992efb
Merge remote-tracking branch 'pingcap/master' into non-clustered-reor…
mjonss Apr 4, 2025
4d949dd
Replaced single key txn.BatchGet() with getKeyInTxn().
mjonss Apr 9, 2025
372c795
Optimized partitionedTableUpdateRecord with txn.Get() instead of Batc…
mjonss Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 64 additions & 35 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3755,11 +3755,6 @@ type reorgPartitionWorker struct {
writeColOffsetMap map[int64]int
maxOffset int
reorgedTbl table.PartitionedTable
// Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
// and check if the old _tidb_rowid was already written or not.
// If the old _tidb_rowid already exists, then the row is already backfilled (double written)
// and can be skipped. Otherwise, we will insert it and generate index entries.
oldKeys []kv.Key
}

func newReorgPartitionWorker(i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*reorgPartitionWorker, error) {
Expand Down Expand Up @@ -3826,46 +3821,88 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
// i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
// and if so, skip it.
var found map[string][]byte
if len(w.oldKeys) > 0 {
lockKey := make([]byte, 0, tablecodec.RecordRowKeyLen)
lockKey = append(lockKey, handleRange.startKey[:tablecodec.TableSplitKeyLen]...)
if !w.table.Meta().HasClusteredIndex() && len(w.rowRecords) > 0 {
failpoint.InjectCall("PartitionBackfillNonClustered", w.rowRecords[0].vals)
// we must check if old IDs already been written,
// i.e. double written by StateWriteOnly or StateWriteReorganization.
// TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
found, err = txn.BatchGet(ctx, w.oldKeys)

// TODO: test how to use PresumeKeyNotExists/NeedConstraintCheckInPrewrite/DO_CONSTRAINT_CHECK
// to delay the check until commit.
// And handle commit errors and fall back to this method of checking all keys to see if we need to skip any.
newKeys := make([]kv.Key, 0, len(w.rowRecords))
for i := range w.rowRecords {
newKeys = append(newKeys, w.rowRecords[i].key)
}
found, err = txn.BatchGet(ctx, newKeys)
if err != nil {
return errors.Trace(err)
}

// TODO: Add test that kills (like `kill -9`) the currently running
// ddl owner, to see how it handles re-running this backfill when some batches has
// committed and reorgInfo has not been updated, so it needs to redo some batches.
}
tmpRow := make([]types.Datum, len(w.reorgedTbl.Cols()))

for i, prr := range w.rowRecords {
for _, prr := range w.rowRecords {
taskCtx.scanCount++
key := prr.key
lockKey = lockKey[:tablecodec.TableSplitKeyLen]
lockKey = append(lockKey, key[tablecodec.TableSplitKeyLen:]...)
// Lock the *old* key, since there can still be concurrent update happening on
// the rows from fetchRowColVals(). If we cannot lock the keys in this
// transaction and succeed when committing, then another transaction did update
// the same key, and we will fail and retry. When retrying, this key would be found
// through BatchGet and skipped.
// TODO: would it help to accumulate the keys in a slice and then only call this once?
err = txn.LockKeys(context.Background(), new(kv.LockCtx), lockKey)
if err != nil {
return errors.Trace(err)
}

// w.oldKeys is only set for non-clustered tables, in w.fetchRowColVals().
if len(w.oldKeys) > 0 {
if _, ok := found[string(w.oldKeys[i])]; ok {
// Already filled, i.e. double written earlier by concurrent DML
if vals, ok := found[string(key)]; ok {
if len(vals) == len(prr.vals) && bytes.Equal(vals, prr.vals) {
// Already backfilled or double written earlier by concurrent DML
continue
}

// Check if we can lock the old key, since there can still be concurrent update
// happening on the rows from fetchRowColVals(), if we cannot lock the keys in this
// transaction and succeed when committing, then another transaction did update
// the same key, and we will fail and retry. When retrying, this key would be found
// through BatchGet and skipped.
err = txn.LockKeys(context.Background(), new(kv.LockCtx), w.oldKeys[i])
// Not same row, due to earlier EXCHANGE PARTITION.
// Update the current read row by Remove it and Add it back (which will give it a new _tidb_rowid)
// which then also will be used as unique id in the new partition.
var h kv.Handle
var currPartID int64
currPartID, h, err = tablecodec.DecodeRecordKey(lockKey)
if err != nil {
return errors.Trace(err)
}

// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
// Generate new _tidb_rowid.
recordID, err := tables.AllocHandle(w.ctx, w.tblCtx, w.reorgedTbl)
_, err = w.rowDecoder.DecodeTheExistedColumnMap(w.exprCtx, h, prr.vals, w.loc, w.rowMap)
if err != nil {
return errors.Trace(err)
}

// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], recordID)
for _, col := range w.table.WritableCols() {
d, ok := w.rowMap[col.ID]
if !ok {
return dbterror.ErrUnsupportedReorganizePartition.GenWithStackByArgs()
}
tmpRow[col.Offset] = d
}
// Use RemoveRecord/AddRecord to keep the indexes in-sync!
pt := w.table.GetPartitionedTable().GetPartition(currPartID)
err = pt.RemoveRecord(w.tblCtx, txn, h, tmpRow)
if err != nil {
return errors.Trace(err)
}
h, err = pt.AddRecord(w.tblCtx, txn, tmpRow)
if err != nil {
return errors.Trace(err)
}
w.cleanRowMap()
// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2 ("_r")
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], h)
// OK to only do txn.Set() for the new partition, and defer creating the indexes,
// since any DML changes the record it will also update or create the indexes,
// by doing RemoveRecord+UpdateRecord
}
err = txn.Set(key, prr.vals)
if err != nil {
Expand All @@ -3882,8 +3919,6 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task

func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) {
w.rowRecords = w.rowRecords[:0]
isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle
w.oldKeys = w.oldKeys[:0]
startTime := time.Now()

// taskDone means that the added handle is out of taskRange.endHandle.
Expand Down Expand Up @@ -3926,12 +3961,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...)
w.rowRecords = append(w.rowRecords, &rowRecord{key: newKey, vals: rawRow})

if !isClustered {
oldKey := newKey[:tablecodec.TableSplitKeyLen]
oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...)
w.oldKeys = append(w.oldKeys, oldKey)
}

w.cleanRowMap()
lastAccessedHandle = recordKey
if recordKey.Cmp(taskRange.endKey) == 0 {
Expand Down
54 changes: 27 additions & 27 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3218,31 +3218,31 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk2.MustExec(`insert into t values (null, 26)`)
tk3.MustExec(`COMMIT`)
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11",
"14 2 2",
"15 12 12",
"17 16 18",
"19 18 4",
"21 20 5",
"23 22 6",
"25 24 7",
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"29 28 9",
"31 30 10",
"35 34 22",
"39 38 24",
"43 42 26"))
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11",
"14 2 2",
"15 12 12",
"17 16 18",
"19 18 4",
"21 20 5",
"23 22 6",
"25 24 7",
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"29 28 9",
"31 30 10",
"33 32 21",
"35 34 22",
Expand All @@ -3252,16 +3252,16 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
waitFor(4, "t", "public")
tk2.MustExec(`commit`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11",
"14 2 2",
"15 12 12",
"17 16 18",
"19 18 4",
"21 20 5",
"23 22 6",
"25 24 7",
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"29 28 9",
"31 30 10",
"33 32 21",
"35 34 22",
Expand Down
Loading