Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(

// Extract the operations on the original index and replay them later.
for _, elem := range tempIdxVal {
if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete {
if elem.KeyVer == tablecodec.TempIndexKeyTypeMerge || elem.KeyVer == tablecodec.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 21,
shard_count = 22,
deps = [
"//pkg/config",
"//pkg/ddl/ingest/testutil",
Expand Down
27 changes: 27 additions & 0 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,3 +470,30 @@ func TestAddGlobalIndexInIngest(t *testing.T) {
require.Equal(t, rsGlobalIndex1.String(), rsTable.String())
require.Equal(t, rsGlobalIndex1.String(), rsGlobalIndex2.String())
}

func TestAddGlobalIndexInIngestWithUpdate(t *testing.T) {
store := testkit.CreateMockStore(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int) partition by hash(a) partitions 5")
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3)")
var i atomic.Int32
i.Store(3)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobUpdated", func(job *model.Job) {
tk2 := testkit.NewTestKit(t, store)
tmp := i.Add(1)
_, err := tk2.Exec(fmt.Sprintf("insert into test.t values (%d, %d)", tmp, tmp))
assert.Nil(t, err)

_, err = tk2.Exec(fmt.Sprintf("update test.t set b = b + 11, a = b where b = %d", tmp-1))
assert.Nil(t, err)
})
tk.MustExec("alter table t add unique index idx(b) global")
rsGlobalIndex := tk.MustQuery("select *,_tidb_rowid from t use index(idx)").Sort()
rsTable := tk.MustQuery("select *,_tidb_rowid from t use index()").Sort()
require.Equal(t, rsGlobalIndex.String(), rsTable.String())
}
4 changes: 2 additions & 2 deletions pkg/kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,9 @@ func (m *MemAwareHandleMap[V]) Range(fn func(h Handle, val V) bool) {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious question: In this function is m.ints and m.strs empty if m.partitionInts and m.partitionStrs is used and vice versa? Or can both pair of maps be used at the same time?

Copy link
Contributor Author

@Defined2014 Defined2014 Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could both used.

}
}
for _, v := range m.partitionInts {
for pid, v := range m.partitionInts {
for h, val := range v.M {
if !fn(IntHandle(h), val) {
if !fn(NewPartitionHandle(pid, IntHandle(h)), val) {
return
}
}
Expand Down
27 changes: 10 additions & 17 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (c *index) create(sctx table.MutateContext, txn kv.Transaction, indexedValu
)
if !opt.FromBackFill() {
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete {
if keyVer == tablecodec.TempIndexKeyTypeBackfill || keyVer == tablecodec.TempIndexKeyTypeDelete {
key, tempKey = tempKey, nil
keyIsTempIdxKey = true
}
Expand Down Expand Up @@ -409,6 +409,10 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue
}

tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct}
if c.idxInfo.Global {
tempValElem.Global = true
tempValElem.Handle = kv.NewPartitionHandle(c.phyTblID, h)
}
if distinct {
if len(key) > 0 {
okToDelete := true
Expand Down Expand Up @@ -479,40 +483,29 @@ func (c *index) GenIndexKVIter(ec errctx.Context, loc *time.Location, indexedVal
return table.NewPlainIndexKVGenerator(c, ec, loc, h, handleRestoreData, indexedValue)
}

const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
TempIndexKeyTypeDelete byte = 'd'
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
TempIndexKeyTypeBackfill byte = 'b'
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
TempIndexKeyTypeMerge byte = 'm'
)

// GenTempIdxKeyByState is used to get the key version and the temporary key.
// The tempKeyVer means the temp index key/value version.
func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) {
if indexInfo.State != model.StatePublic {
switch indexInfo.BackfillState {
case model.BackfillStateInapplicable:
return indexKey, nil, TempIndexKeyTypeNone
return indexKey, nil, tablecodec.TempIndexKeyTypeNone
case model.BackfillStateRunning:
// Write to the temporary index.
tablecodec.IndexKey2TempIndexKey(indexKey)
if indexInfo.State == model.StateDeleteOnly {
return nil, indexKey, TempIndexKeyTypeDelete
return nil, indexKey, tablecodec.TempIndexKeyTypeDelete
}
return nil, indexKey, TempIndexKeyTypeBackfill
return nil, indexKey, tablecodec.TempIndexKeyTypeBackfill
case model.BackfillStateReadyToMerge, model.BackfillStateMerging:
// Double write
tmp := make([]byte, len(indexKey))
copy(tmp, indexKey)
tablecodec.IndexKey2TempIndexKey(tmp)
return indexKey, tmp, TempIndexKeyTypeMerge
return indexKey, tmp, tablecodec.TempIndexKeyTypeMerge
}
}
return indexKey, nil, TempIndexKeyTypeNone
return indexKey, nil, tablecodec.TempIndexKeyTypeNone
}

func (c *index) Exist(ec errctx.Context, loc *time.Location, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
Expand Down
39 changes: 36 additions & 3 deletions pkg/tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,16 +1307,31 @@ func (v TempIndexValue) FilterOverwritten() TempIndexValue {
// A temp index value element is encoded as one of:
// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal}
// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal}
// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted}
// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [partitionIdFlag 1 byte] [partitionID 8 bytes] [key_version 1 byte] {distinct deleted}
// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted}
type TempIndexValueElem struct {
Value []byte
Handle kv.Handle
KeyVer byte
Delete bool
Distinct bool

Global bool
}

const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
TempIndexKeyTypeDelete byte = 'd'
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
TempIndexKeyTypeBackfill byte = 'b'
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
TempIndexKeyTypeMerge byte = 'm'
// TempIndexKeyTypePartitionIDFlag indicates the following value is partition id.
TempIndexKeyTypePartitionIDFlag byte = 'p'
)

// Encode encodes the temp index value.
func (v *TempIndexValueElem) Encode(buf []byte) []byte {
if v.Delete {
Expand All @@ -1331,13 +1346,21 @@ func (v *TempIndexValueElem) Encode(buf []byte) []byte {
hEncoded = handle.Encoded()
hLen = uint16(len(hEncoded))
}
// flag + handle length + handle + temp key version
// flag + handle length + handle + [partition id] + temp key version
if buf == nil {
buf = make([]byte, 0, hLen+4)
l := hLen + 4
if v.Global {
l += 9
}
buf = make([]byte, 0, l)
}
buf = append(buf, byte(TempIndexValueFlagDeleted))
buf = append(buf, byte(hLen>>8), byte(hLen))
buf = append(buf, hEncoded...)
if v.Global {
buf = append(buf, TempIndexKeyTypePartitionIDFlag)
buf = append(buf, codec.EncodeInt(nil, v.Handle.(kv.PartitionHandle).PartitionID)...)
}
buf = append(buf, v.KeyVer)
return buf
}
Expand Down Expand Up @@ -1415,6 +1438,16 @@ func (v *TempIndexValueElem) DecodeOne(b []byte) (remain []byte, err error) {
v.Handle, _ = kv.NewCommonHandle(b[:hLen])
}
b = b[hLen:]
if b[0] == TempIndexKeyTypePartitionIDFlag {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only needed here, and not also in case TempIndexValueFlagNormal:?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v.Global = true
var pid int64
_, pid, err = codec.DecodeInt(b[1:9])
if err != nil {
return nil, err
}
v.Handle = kv.NewPartitionHandle(pid, v.Handle)
b = b[9:]
}
v.KeyVer = b[0]
b = b[1:]
v.Distinct = true
Expand Down