Skip to content

Commit ef73392

Browse files
authored
*: let TempIndex support encode/decode partitionID flag (#57017) (#57075)
close #56535
1 parent d6d9b68 commit ef73392

File tree

6 files changed

+78
-24
lines changed

6 files changed

+78
-24
lines changed

pkg/ddl/index_merge_tmp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(
350350

351351
// Extract the operations on the original index and replay them later.
352352
for _, elem := range tempIdxVal {
353-
if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete {
353+
if elem.KeyVer == tablecodec.TempIndexKeyTypeMerge || elem.KeyVer == tablecodec.TempIndexKeyTypeDelete {
354354
// For 'm' version kvs, they are double-written.
355355
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
356356
continue

pkg/ddl/ingest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ go_test(
7272
embed = [":ingest"],
7373
flaky = True,
7474
race = "on",
75-
shard_count = 21,
75+
shard_count = 22,
7676
deps = [
7777
"//pkg/config",
7878
"//pkg/ddl/ingest/testutil",

pkg/ddl/ingest/integration_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,3 +470,30 @@ func TestAddGlobalIndexInIngest(t *testing.T) {
470470
require.Equal(t, rsGlobalIndex1.String(), rsTable.String())
471471
require.Equal(t, rsGlobalIndex1.String(), rsGlobalIndex2.String())
472472
}
473+
474+
func TestAddGlobalIndexInIngestWithUpdate(t *testing.T) {
475+
store := testkit.CreateMockStore(t)
476+
defer ingesttestutil.InjectMockBackendMgr(t, store)()
477+
478+
tk := testkit.NewTestKit(t, store)
479+
tk.MustExec("use test")
480+
481+
tk.MustExec("drop table if exists t")
482+
tk.MustExec("create table t (a int, b int) partition by hash(a) partitions 5")
483+
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3)")
484+
var i atomic.Int32
485+
i.Store(3)
486+
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
487+
tk2 := testkit.NewTestKit(t, store)
488+
tmp := i.Add(1)
489+
_, err := tk2.Exec(fmt.Sprintf("insert into test.t values (%d, %d)", tmp, tmp))
490+
assert.Nil(t, err)
491+
492+
_, err = tk2.Exec(fmt.Sprintf("update test.t set b = b + 11, a = b where b = %d", tmp-1))
493+
assert.Nil(t, err)
494+
})
495+
tk.MustExec("alter table t add unique index idx(b) global")
496+
rsGlobalIndex := tk.MustQuery("select *,_tidb_rowid from t use index(idx)").Sort()
497+
rsTable := tk.MustQuery("select *,_tidb_rowid from t use index()").Sort()
498+
require.Equal(t, rsGlobalIndex.String(), rsTable.String())
499+
}

pkg/kv/key.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,9 +683,9 @@ func (m *MemAwareHandleMap[V]) Range(fn func(h Handle, val V) bool) {
683683
return
684684
}
685685
}
686-
for _, v := range m.partitionInts {
686+
for pid, v := range m.partitionInts {
687687
for h, val := range v.M {
688-
if !fn(IntHandle(h), val) {
688+
if !fn(NewPartitionHandle(pid, IntHandle(h)), val) {
689689
return
690690
}
691691
}

pkg/table/tables/index.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
194194
)
195195
if !opt.FromBackFill() {
196196
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
197-
if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete {
197+
if keyVer == tablecodec.TempIndexKeyTypeBackfill || keyVer == tablecodec.TempIndexKeyTypeDelete {
198198
key, tempKey = tempKey, nil
199199
keyIsTempIdxKey = true
200200
}
@@ -409,6 +409,10 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue
409409
}
410410

411411
tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct}
412+
if c.idxInfo.Global {
413+
tempValElem.Global = true
414+
tempValElem.Handle = kv.NewPartitionHandle(c.phyTblID, h)
415+
}
412416
if distinct {
413417
if len(key) > 0 {
414418
okToDelete := true
@@ -479,40 +483,29 @@ func (c *index) GenIndexKVIter(ec errctx.Context, loc *time.Location, indexedVal
479483
return table.NewPlainIndexKVGenerator(c, ec, loc, h, handleRestoreData, indexedValue)
480484
}
481485

482-
const (
483-
// TempIndexKeyTypeNone means the key is not a temporary index key.
484-
TempIndexKeyTypeNone byte = 0
485-
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
486-
TempIndexKeyTypeDelete byte = 'd'
487-
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
488-
TempIndexKeyTypeBackfill byte = 'b'
489-
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
490-
TempIndexKeyTypeMerge byte = 'm'
491-
)
492-
493486
// GenTempIdxKeyByState is used to get the key version and the temporary key.
494487
// The tempKeyVer means the temp index key/value version.
495488
func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) {
496489
if indexInfo.State != model.StatePublic {
497490
switch indexInfo.BackfillState {
498491
case model.BackfillStateInapplicable:
499-
return indexKey, nil, TempIndexKeyTypeNone
492+
return indexKey, nil, tablecodec.TempIndexKeyTypeNone
500493
case model.BackfillStateRunning:
501494
// Write to the temporary index.
502495
tablecodec.IndexKey2TempIndexKey(indexKey)
503496
if indexInfo.State == model.StateDeleteOnly {
504-
return nil, indexKey, TempIndexKeyTypeDelete
497+
return nil, indexKey, tablecodec.TempIndexKeyTypeDelete
505498
}
506-
return nil, indexKey, TempIndexKeyTypeBackfill
499+
return nil, indexKey, tablecodec.TempIndexKeyTypeBackfill
507500
case model.BackfillStateReadyToMerge, model.BackfillStateMerging:
508501
// Double write
509502
tmp := make([]byte, len(indexKey))
510503
copy(tmp, indexKey)
511504
tablecodec.IndexKey2TempIndexKey(tmp)
512-
return indexKey, tmp, TempIndexKeyTypeMerge
505+
return indexKey, tmp, tablecodec.TempIndexKeyTypeMerge
513506
}
514507
}
515-
return indexKey, nil, TempIndexKeyTypeNone
508+
return indexKey, nil, tablecodec.TempIndexKeyTypeNone
516509
}
517510

518511
func (c *index) Exist(ec errctx.Context, loc *time.Location, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) {

pkg/tablecodec/tablecodec.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,16 +1307,32 @@ func (v TempIndexValue) FilterOverwritten() TempIndexValue {
13071307
// A temp index value element is encoded as one of:
13081308
// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal}
13091309
// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal}
1310-
// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted}
1310+
// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [partitionIdFlag 1 byte] [partitionID 8 bytes] [key_version 1 byte] {distinct deleted}
13111311
// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted}
13121312
type TempIndexValueElem struct {
13131313
Value []byte
13141314
Handle kv.Handle
13151315
KeyVer byte
13161316
Delete bool
13171317
Distinct bool
1318+
1319+
// Global means it's a global Index, for partitioned tables. Currently only used in `distinct` + `deleted` scenarios.
1320+
Global bool
13181321
}
13191322

1323+
const (
1324+
// TempIndexKeyTypeNone means the key is not a temporary index key.
1325+
TempIndexKeyTypeNone byte = 0
1326+
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
1327+
TempIndexKeyTypeDelete byte = 'd'
1328+
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
1329+
TempIndexKeyTypeBackfill byte = 'b'
1330+
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
1331+
TempIndexKeyTypeMerge byte = 'm'
1332+
// TempIndexKeyTypePartitionIDFlag indicates the following value is partition id.
1333+
TempIndexKeyTypePartitionIDFlag byte = 'p'
1334+
)
1335+
13201336
// Encode encodes the temp index value.
13211337
func (v *TempIndexValueElem) Encode(buf []byte) []byte {
13221338
if v.Delete {
@@ -1331,13 +1347,21 @@ func (v *TempIndexValueElem) Encode(buf []byte) []byte {
13311347
hEncoded = handle.Encoded()
13321348
hLen = uint16(len(hEncoded))
13331349
}
1334-
// flag + handle length + handle + temp key version
1350+
// flag + handle length + handle + [partition id] + temp key version
13351351
if buf == nil {
1336-
buf = make([]byte, 0, hLen+4)
1352+
l := hLen + 4
1353+
if v.Global {
1354+
l += 9
1355+
}
1356+
buf = make([]byte, 0, l)
13371357
}
13381358
buf = append(buf, byte(TempIndexValueFlagDeleted))
13391359
buf = append(buf, byte(hLen>>8), byte(hLen))
13401360
buf = append(buf, hEncoded...)
1361+
if v.Global {
1362+
buf = append(buf, TempIndexKeyTypePartitionIDFlag)
1363+
buf = append(buf, codec.EncodeInt(nil, v.Handle.(kv.PartitionHandle).PartitionID)...)
1364+
}
13411365
buf = append(buf, v.KeyVer)
13421366
return buf
13431367
}
@@ -1415,6 +1439,16 @@ func (v *TempIndexValueElem) DecodeOne(b []byte) (remain []byte, err error) {
14151439
v.Handle, _ = kv.NewCommonHandle(b[:hLen])
14161440
}
14171441
b = b[hLen:]
1442+
if b[0] == TempIndexKeyTypePartitionIDFlag {
1443+
v.Global = true
1444+
var pid int64
1445+
_, pid, err = codec.DecodeInt(b[1:9])
1446+
if err != nil {
1447+
return nil, err
1448+
}
1449+
v.Handle = kv.NewPartitionHandle(pid, v.Handle)
1450+
b = b[9:]
1451+
}
14181452
v.KeyVer = b[0]
14191453
b = b[1:]
14201454
v.Distinct = true

0 commit comments

Comments
 (0)