Skip to content

Commit c1e3dae

Browse files
authored
ddl: fix add index's merge with multi-schema optimization (#51747)
close #51746
1 parent 6a76187 commit c1e3dae

File tree

4 files changed

+158
-55
lines changed

4 files changed

+158
-55
lines changed

pkg/ddl/index_merge_tmp.go

Lines changed: 126 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ import (
3535

3636
func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
3737
txn kv.Transaction,
38-
idxInfo *model.IndexInfo,
3938
idxRecords []*temporaryIndexRecord,
4039
) error {
41-
if !idxInfo.Unique {
40+
if !w.currentIndex.Unique {
4241
// non-unique key need no check, just overwrite it,
4342
// because in most case, backfilling indices is not exists.
4443
return nil
@@ -55,7 +54,7 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
5554
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
5655
if err != nil {
5756
if kv.ErrKeyExists.Equal(err) {
58-
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), idxInfo.ID)
57+
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), w.currentIndex.ID)
5958
}
6059
return errors.Trace(err)
6160
}
@@ -128,6 +127,10 @@ type mergeIndexWorker struct {
128127
tmpIdxRecords []*temporaryIndexRecord
129128
originIdxKeys []kv.Key
130129
tmpIdxKeys []kv.Key
130+
131+
needValidateKey bool
132+
currentTempIndexPrefix []byte
133+
currentIndex *model.IndexInfo
131134
}
132135

133136
func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements []*meta.Element) *mergeIndexWorker {
@@ -144,68 +147,99 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements
144147
}
145148
}
146149

150+
func (w *mergeIndexWorker) validateTaskRange(taskRange *reorgBackfillTask) (skip bool, err error) {
151+
tmpID, err := tablecodec.DecodeIndexID(taskRange.startKey)
152+
if err != nil {
153+
return false, err
154+
}
155+
startIndexID := tmpID & tablecodec.IndexIDMask
156+
tmpID, err = tablecodec.DecodeIndexID(taskRange.endKey)
157+
if err != nil {
158+
return false, err
159+
}
160+
endIndexID := tmpID & tablecodec.IndexIDMask
161+
162+
w.needValidateKey = startIndexID != endIndexID
163+
containsTargetID := false
164+
for _, idx := range w.indexes {
165+
idxInfo := idx.Meta()
166+
if idxInfo.ID == startIndexID {
167+
containsTargetID = true
168+
w.currentIndex = idxInfo
169+
break
170+
}
171+
if idxInfo.ID == endIndexID {
172+
containsTargetID = true
173+
}
174+
}
175+
return !containsTargetID, nil
176+
}
177+
147178
// BackfillData merge temp index data in txn.
148179
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
180+
skip, err := w.validateTaskRange(&taskRange)
181+
if skip || err != nil {
182+
return taskCtx, err
183+
}
184+
149185
oprStartTime := time.Now()
150186
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
151-
for _, idx := range w.indexes {
152-
idx := idx // Make linter noloopclosure happy.
153-
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
154-
taskCtx.addedCount = 0
155-
taskCtx.scanCount = 0
156-
updateTxnEntrySizeLimitIfNeeded(txn)
157-
txn.SetOption(kv.Priority, taskRange.priority)
158-
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
159-
txn.SetOption(kv.ResourceGroupTagger, tagger)
160-
}
161-
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)
162187

163-
tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, idx.Meta(), taskRange)
164-
if err != nil {
165-
return errors.Trace(err)
188+
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
189+
taskCtx.addedCount = 0
190+
taskCtx.scanCount = 0
191+
updateTxnEntrySizeLimitIfNeeded(txn)
192+
txn.SetOption(kv.Priority, taskRange.priority)
193+
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
194+
txn.SetOption(kv.ResourceGroupTagger, tagger)
195+
}
196+
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)
197+
198+
tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange)
199+
if err != nil {
200+
return errors.Trace(err)
201+
}
202+
taskCtx.nextKey = nextKey
203+
taskCtx.done = taskDone
204+
205+
err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords)
206+
if err != nil {
207+
return errors.Trace(err)
208+
}
209+
210+
for i, idxRecord := range tmpIdxRecords {
211+
taskCtx.scanCount++
212+
// The index is already exists, we skip it, no needs to backfill it.
213+
// The following update, delete, insert on these rows, TiDB can handle it correctly.
214+
// If all batch are skipped, update first index key to make txn commit to release lock.
215+
if idxRecord.skip {
216+
continue
166217
}
167-
taskCtx.nextKey = nextKey
168-
taskCtx.done = taskDone
169218

170-
err = w.batchCheckTemporaryUniqueKey(txn, idx.Meta(), tmpIdxRecords)
219+
// Lock the corresponding row keys so that it doesn't modify the index KVs
220+
// that are changing by a pessimistic transaction.
221+
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
222+
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
171223
if err != nil {
172224
return errors.Trace(err)
173225
}
174226

175-
for i, idxRecord := range tmpIdxRecords {
176-
taskCtx.scanCount++
177-
// The index is already exists, we skip it, no needs to backfill it.
178-
// The following update, delete, insert on these rows, TiDB can handle it correctly.
179-
// If all batch are skipped, update first index key to make txn commit to release lock.
180-
if idxRecord.skip {
181-
continue
182-
}
183-
184-
// Lock the corresponding row keys so that it doesn't modify the index KVs
185-
// that are changing by a pessimistic transaction.
186-
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
187-
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
188-
if err != nil {
189-
return errors.Trace(err)
190-
}
191-
192-
if idxRecord.delete {
193-
if idxRecord.unique {
194-
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
195-
} else {
196-
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
197-
}
227+
if idxRecord.delete {
228+
if idxRecord.unique {
229+
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
198230
} else {
199-
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
200-
}
201-
if err != nil {
202-
return err
231+
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
203232
}
204-
taskCtx.addedCount++
233+
} else {
234+
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
205235
}
206-
return nil
207-
})
208-
}
236+
if err != nil {
237+
return err
238+
}
239+
taskCtx.addedCount++
240+
}
241+
return nil
242+
})
209243

210244
failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
211245
//nolint:forcetypeassert
@@ -228,9 +262,41 @@ func (w *mergeIndexWorker) GetCtx() *backfillCtx {
228262
return w.backfillCtx
229263
}
230264

265+
func (w *mergeIndexWorker) prefixIsChanged(newKey kv.Key) bool {
266+
return len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(newKey, w.currentTempIndexPrefix)
267+
}
268+
269+
func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool, err error) {
270+
tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey)
271+
if err != nil {
272+
return false, err
273+
}
274+
idxID := tablecodec.IndexIDMask & tempIdxID
275+
var curIdx *model.IndexInfo
276+
for _, idx := range w.indexes {
277+
if idx.Meta().ID == idxID {
278+
curIdx = idx.Meta()
279+
}
280+
}
281+
if curIdx == nil {
282+
// Index IDs are always increasing, but not always continuous:
283+
// if DDL adds another index between these indexes, it is possible that:
284+
// multi-schema add index IDs = [1, 2, 4, 5]
285+
// another index ID = [3]
286+
// If the new index get rollback, temp index 0xFFxxx03 may have dirty records.
287+
// We should skip these dirty records.
288+
return true, nil
289+
}
290+
pfx := tablecodec.CutIndexPrefix(newIndexKey)
291+
292+
w.currentTempIndexPrefix = kv.Key(pfx).Clone()
293+
w.currentIndex = curIdx
294+
295+
return false, nil
296+
}
297+
231298
func (w *mergeIndexWorker) fetchTempIndexVals(
232299
txn kv.Transaction,
233-
indexInfo *model.IndexInfo,
234300
taskRange reorgBackfillTask,
235301
) ([]*temporaryIndexRecord, kv.Key, bool, error) {
236302
startTime := time.Now()
@@ -254,11 +320,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals(
254320
return false, nil
255321
}
256322

323+
if w.needValidateKey && w.prefixIsChanged(indexKey) {
324+
skip, err := w.updateCurrentIndexInfo(indexKey)
325+
if err != nil || skip {
326+
return skip, err
327+
}
328+
}
329+
257330
tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue)
258331
if err != nil {
259332
return false, err
260333
}
261-
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(indexInfo.Columns))
334+
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.currentIndex.Columns))
262335
if err != nil {
263336
return false, err
264337
}

pkg/ddl/ingest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ go_test(
6868
embed = [":ingest"],
6969
flaky = True,
7070
race = "on",
71-
shard_count = 16,
71+
shard_count = 17,
7272
deps = [
7373
"//pkg/config",
7474
"//pkg/ddl",

pkg/ddl/ingest/integration_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,3 +340,30 @@ func TestAddIndexDuplicateMessage(t *testing.T) {
340340
tk.MustExec("admin check table t;")
341341
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1", "2 1 2"))
342342
}
343+
344+
func TestMultiSchemaAddIndexMerge(t *testing.T) {
345+
store := testkit.CreateMockStore(t)
346+
defer ingesttestutil.InjectMockBackendMgr(t, store)()
347+
tk := testkit.NewTestKit(t, store)
348+
tk.MustExec("use test")
349+
tk2 := testkit.NewTestKit(t, store)
350+
tk2.MustExec("use test")
351+
352+
tk.MustExec("create table t (a int, b int);")
353+
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")
354+
355+
first := true
356+
var tk2Err error
357+
ingest.MockExecAfterWriteRow = func() {
358+
if !first {
359+
return
360+
}
361+
_, tk2Err = tk2.Exec("insert into t values (4, 4);")
362+
first = false
363+
}
364+
365+
tk.MustExec("alter table t add index idx1(a), add index idx2(b);")
366+
require.False(t, first)
367+
require.NoError(t, tk2Err)
368+
tk.MustExec("admin check table t;")
369+
}

pkg/ddl/reorg.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,10 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job,
706706
tb = tbl.(table.PhysicalTable)
707707
}
708708
if mergingTmpIdx {
709-
start, end = tablecodec.GetTableIndexKeyRange(pid, tablecodec.TempIndexPrefix|elements[0].ID)
709+
firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID
710+
lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID
711+
start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil)
712+
end = tablecodec.EncodeIndexSeekKey(pid, lastElemTempID, []byte{255})
710713
} else {
711714
start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority)
712715
if err != nil {

0 commit comments

Comments
 (0)