Skip to content

Commit cb04077

Browse files
authored
global-sort: unify error message for adding unique index using global sort (#61094)
ref #60621, close #61084
1 parent 1ee3c45 commit cb04077

File tree

15 files changed

+122
-556
lines changed

15 files changed

+122
-556
lines changed

pkg/ddl/backfilling_dist_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func (s *backfillDistExecutor) newBackfillStepExecutor(
166166
ddlObj.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
167167
return newReadIndexExecutor(ddlObj, jobMeta, indexInfos, tbl, jc, cloudStorageURI, estRowSize)
168168
case proto.BackfillStepMergeSort:
169-
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl, cloudStorageURI)
169+
return newMergeSortExecutor(jobMeta.ID, indexInfos, tbl, cloudStorageURI)
170170
case proto.BackfillStepWriteAndIngest:
171171
if len(cloudStorageURI) == 0 {
172172
return nil, errors.Errorf("local import does not have write & ingest step")

pkg/ddl/backfilling_import_cloud.go

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -98,27 +98,9 @@ func (e *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
9898
return errors.Errorf("local backend not found")
9999
}
100100

101-
var (
102-
currentIdx *model.IndexInfo
103-
idxID int64
104-
)
105-
switch len(sm.EleIDs) {
106-
case 1:
107-
for _, idx := range e.indexes {
108-
if idx.ID == sm.EleIDs[0] {
109-
currentIdx = idx
110-
idxID = idx.ID
111-
break
112-
}
113-
}
114-
case 0:
115-
// maybe this subtask is generated from an old version TiDB
116-
if len(e.indexes) == 1 {
117-
currentIdx = e.indexes[0]
118-
}
119-
idxID = e.indexes[0].ID
120-
default:
121-
return errors.Errorf("unexpected EleIDs count %v", sm.EleIDs)
101+
currentIdx, idxID, err := getIndexInfoAndID(sm.EleIDs, e.indexes)
102+
if err != nil {
103+
return err
122104
}
123105

124106
_, engineUUID := backend.MakeUUID(e.ptbl.Meta().Name.L, idxID)
@@ -162,10 +144,7 @@ func (e *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
162144
}
163145

164146
if currentIdx != nil {
165-
if common.ErrFoundDuplicateKeys.Equal(err) {
166-
return local.ConvertToErrFoundConflictRecords(err, e.ptbl)
167-
}
168-
return err
147+
return ingest.TryConvertToKeyExistsErr(err, currentIdx, e.ptbl.Meta())
169148
}
170149

171150
// cannot fill the index name for subtask generated from an old version TiDB
@@ -200,3 +179,25 @@ func (*cloudImportExecutor) ResourceModified(_ context.Context, _ *proto.StepRes
200179
// Will be added in the future PR
201180
return nil
202181
}
182+
183+
func getIndexInfoAndID(eleIDs []int64, indexes []*model.IndexInfo) (currentIdx *model.IndexInfo, idxID int64, err error) {
184+
switch len(eleIDs) {
185+
case 1:
186+
for _, idx := range indexes {
187+
if idx.ID == eleIDs[0] {
188+
currentIdx = idx
189+
idxID = idx.ID
190+
break
191+
}
192+
}
193+
case 0:
194+
// maybe this subtask is generated from an old version TiDB
195+
if len(indexes) == 1 {
196+
currentIdx = indexes[0]
197+
}
198+
idxID = indexes[0].ID
199+
default:
200+
return nil, 0, errors.Errorf("unexpected EleIDs count %v", eleIDs)
201+
}
202+
return
203+
}

pkg/ddl/backfilling_merge_sort.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ import (
2323
"github.com/pingcap/errors"
2424
"github.com/pingcap/failpoint"
2525
"github.com/pingcap/tidb/br/pkg/storage"
26+
"github.com/pingcap/tidb/pkg/ddl/ingest"
2627
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
2728
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
2829
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
2930
"github.com/pingcap/tidb/pkg/lightning/backend/external"
30-
"github.com/pingcap/tidb/pkg/lightning/backend/local"
31-
"github.com/pingcap/tidb/pkg/lightning/common"
31+
"github.com/pingcap/tidb/pkg/meta/model"
3232
"github.com/pingcap/tidb/pkg/table"
3333
"github.com/pingcap/tidb/pkg/util/logutil"
3434
)
@@ -37,6 +37,7 @@ type mergeSortExecutor struct {
3737
taskexecutor.BaseStepExecutor
3838
jobID int64
3939
idxNum int
40+
indexes []*model.IndexInfo
4041
ptbl table.PhysicalTable
4142
cloudStoreURI string
4243

@@ -46,13 +47,13 @@ type mergeSortExecutor struct {
4647

4748
func newMergeSortExecutor(
4849
jobID int64,
49-
idxNum int,
50+
indexes []*model.IndexInfo,
5051
ptbl table.PhysicalTable,
5152
cloudStoreURI string,
5253
) (*mergeSortExecutor, error) {
5354
return &mergeSortExecutor{
5455
jobID: jobID,
55-
idxNum: idxNum,
56+
indexes: indexes,
5657
ptbl: ptbl,
5758
cloudStoreURI: cloudStoreURI,
5859
}, nil
@@ -108,8 +109,9 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
108109
err = context.DeadlineExceeded
109110
})
110111
if err != nil {
111-
if common.ErrFoundDuplicateKeys.Equal(err) {
112-
return local.ConvertToErrFoundConflictRecords(err, m.ptbl)
112+
currentIdx, _, err2 := getIndexInfoAndID(sm.EleIDs, m.indexes)
113+
if err2 == nil {
114+
return ingest.TryConvertToKeyExistsErr(err, currentIdx, m.ptbl.Meta())
113115
}
114116
return errors.Trace(err)
115117
}

pkg/ddl/backfilling_operators.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ import (
3737
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
3838
"github.com/pingcap/tidb/pkg/kv"
3939
"github.com/pingcap/tidb/pkg/lightning/backend/external"
40-
"github.com/pingcap/tidb/pkg/lightning/backend/local"
41-
"github.com/pingcap/tidb/pkg/lightning/common"
4240
"github.com/pingcap/tidb/pkg/meta/model"
4341
"github.com/pingcap/tidb/pkg/metrics"
4442
"github.com/pingcap/tidb/pkg/parser/terror"
@@ -833,16 +831,14 @@ func (w *indexIngestWorker) initSessCtx() {
833831
func (w *indexIngestWorker) Close() {
834832
// TODO(lance6716): unify the real write action for engineInfo and external
835833
// writer.
836-
for _, writer := range w.writers {
834+
for i, writer := range w.writers {
837835
ew, ok := writer.(*external.Writer)
838836
if !ok {
839837
break
840838
}
841839
err := ew.Close(w.ctx)
842840
if err != nil {
843-
if common.ErrFoundDuplicateKeys.Equal(err) {
844-
err = local.ConvertToErrFoundConflictRecords(err, w.tbl)
845-
}
841+
err = ingest.TryConvertToKeyExistsErr(err, w.indexes[i].Meta(), w.tbl.Meta())
846842
w.ctx.onError(err)
847843
}
848844
}
@@ -862,11 +858,8 @@ func (w *indexIngestWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey
862858
oprStartTime := time.Now()
863859
vars := w.se.GetSessionVars()
864860
sc := vars.StmtCtx
865-
cnt, lastHandle, err := writeChunk(w.ctx, w.writers, w.indexes, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk)
861+
cnt, lastHandle, err := writeChunk(w.ctx, w.writers, w.indexes, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk, w.tbl.Meta())
866862
if err != nil || cnt == 0 {
867-
if common.ErrFoundDuplicateKeys.Equal(err) {
868-
err = local.ConvertToErrFoundConflictRecords(err, w.tbl)
869-
}
870863
return 0, nil, err
871864
}
872865
logSlowOperations(time.Since(oprStartTime), "writeChunk", 3000)

pkg/ddl/index.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1637,7 +1637,7 @@ func runReorgJobAndHandleErr(
16371637
return false, ver, nil
16381638
}
16391639
// TODO(tangenta): get duplicate column and match index.
1640-
err = local.ConvertToErrFoundConflictRecords(err, tbl)
1640+
err = ingest.TryConvertToKeyExistsErr(err, allIndexInfos[0], tbl.Meta())
16411641
if !isRetryableJobError(err, job.ErrorCount) {
16421642
logutil.DDLLogger().Warn("run add index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
16431643
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err)
@@ -2277,6 +2277,7 @@ func writeChunk(
22772277
errCtx errctx.Context,
22782278
writeStmtBufs *variable.WriteStmtBufs,
22792279
copChunk *chunk.Chunk,
2280+
tblInfo *model.TableInfo,
22802281
) (int, kv.Handle, error) {
22812282
iter := chunk.NewIterator4Chunk(copChunk)
22822283
c := copCtx.GetBase()
@@ -2335,6 +2336,7 @@ func writeChunk(
23352336
}
23362337
err = writeOneKV(ctx, writers[i], index, loc, errCtx, writeStmtBufs, idxData, rsData, h)
23372338
if err != nil {
2339+
err = ingest.TryConvertToKeyExistsErr(err, index.Meta(), tblInfo)
23382340
return 0, nil, errors.Trace(err)
23392341
}
23402342
}

pkg/lightning/backend/external/BUILD.bazel

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ go_library(
3939
"//pkg/util/logutil",
4040
"//pkg/util/size",
4141
"@com_github_aws_aws_sdk_go//aws",
42-
"@com_github_cockroachdb_pebble//:pebble",
4342
"@com_github_docker_go_units//:go-units",
4443
"@com_github_google_uuid//:uuid",
4544
"@com_github_jfcg_sorty_v2//:sorty",
@@ -87,15 +86,13 @@ go_test(
8786
"//pkg/lightning/config",
8887
"//pkg/lightning/log",
8988
"//pkg/lightning/membuf",
90-
"//pkg/util/codec",
9189
"//pkg/util/intest",
9290
"//pkg/util/logutil",
9391
"//pkg/util/size",
9492
"@com_github_aws_aws_sdk_go//aws",
9593
"@com_github_aws_aws_sdk_go//aws/credentials",
9694
"@com_github_aws_aws_sdk_go//aws/session",
9795
"@com_github_aws_aws_sdk_go//service/s3",
98-
"@com_github_cockroachdb_pebble//:pebble",
9996
"@com_github_docker_go_units//:go-units",
10097
"@com_github_felixge_fgprof//:fgprof",
10198
"@com_github_google_uuid//:uuid",

0 commit comments

Comments
 (0)