Skip to content
18 changes: 14 additions & 4 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2220,9 +2220,15 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
if tbl, ok := t.(table.PartitionedTable); ok {
var finish bool
for !finish {
p := tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
var p table.PhysicalTable
if tbl.Meta().ID == reorgInfo.PhysicalTableID {
//nolint:forcetypeassert
p = t.(table.PhysicalTable)
} else {
p = tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
}
err = w.addPhysicalTableIndex(p, reorgInfo)
if err != nil {
Expand Down Expand Up @@ -2549,7 +2555,11 @@ func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysic
if len(pi.AddingDefinitions) == 0 {
// case 1
// Simply AddIndex, without any partitions added or dropped!
pid, err = findNextPartitionID(currPhysicalTableID, pi.Definitions)
if reorg.mergingTmpIdx && currPhysicalTableID == t.Meta().ID {
pid = pi.Definitions[0].ID
} else {
pid, err = findNextPartitionID(currPhysicalTableID, pi.Definitions)
}
} else {
// case 3 (or if not found AddingDefinitions; 4)
// check if recreating Global Index (during Reorg Partition)
Expand Down
12 changes: 12 additions & 0 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -882,6 +883,17 @@ func getReorgInfo(ctx *ReorgContext, jobCtx *jobContext, rh *reorgHandler, job *
tb = tbl.(table.PhysicalTable)
}
if mergingTmpIdx {
for _, element := range elements {
if !bytes.Equal(element.TypeKey, meta.IndexElementKey) {
continue
}
// If has a global index in elements, need start process at `tblInfo.ID`
// because there are some temporary global indexes prefixed with table ID.
idxInfo := model.FindIndexInfoByID(tblInfo.Indices, element.ID)
if idxInfo.Global {
pid = tblInfo.ID
}
}
firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID
lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID
start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil)
Expand Down
26 changes: 26 additions & 0 deletions tests/realtikvtest/addindextest3/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,29 @@ func TestIssue55808(t *testing.T) {
err := tk.ExecToErr("alter table t add index idx(a);")
require.ErrorContains(t, err, "injected error")
}

func TestAddGlobalIndexInIngest(t *testing.T) {
store, _ := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int) partition by hash(a) partitions 5;")
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);")
var i atomic.Int32
i.Store(3)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/writeLocalExec", func(bool) {
tk2 := testkit.NewTestKit(t, store)
tmp := i.Add(1)
_, err := tk2.Exec(fmt.Sprintf("insert into test.t values (%d, %d)", tmp, tmp))
assert.Nil(t, err)
})
tk.MustExec("alter table t add index idx_1(b), add unique index idx_2(b) global;")
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/writeLocalExec")
rs1 := tk.MustQuery("select * from t use index(idx_2)").Sort()
rs2 := tk.MustQuery("select * from t use index()").Sort()
rs3 := tk.MustQuery("select * from t use index(idx_1)").Sort()
require.Greater(t, len(rs1.Rows()), 3)
require.Equal(t, rs1.String(), rs2.String())
require.Equal(t, rs1.String(), rs3.String())
}