Skip to content
14 changes: 1 addition & 13 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -205,19 +204,8 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool {
return true
}

func isRetryableError(err error) bool {
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code]
return ok
}
// can't retry Unknown err.
return false
}

func (*backfillDistExecutor) IsRetryableError(err error) bool {
return common.IsRetryableError(err) || isRetryableError(err)
return common.IsRetryableError(err) || dbterror.IsReorgRetryableErr(err)
}

func (s *backfillDistExecutor) Close() {
Expand Down
11 changes: 2 additions & 9 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,14 +973,7 @@ func errorIsRetryable(err error, job *model.Job) bool {
if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() {
return false
}
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code]
return ok
}
// For the unknown errors, we should retry.
return true
return dbterror.IsReorgRetryableErr(err)
}

func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error {
Expand Down Expand Up @@ -1827,7 +1820,7 @@ func writeOneKVToLocal(
return errors.Trace(err)
}
failpoint.Inject("mockLocalWriterError", func() {
failpoint.Return(errors.New("mock engine error"))
failpoint.Return(errors.New("ErrMockRetryable"))
})
writeBufs.IndexKeyBuf = key
writeBufs.RowValBuf = idxVal
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
rate := float64(srcChk.MemoryUsage()) / 1024.0 / 1024.0 / time.Since(startTime).Seconds()
metrics.AddIndexScanRate.WithLabelValues(metrics.LblAddIndex).Observe(rate)
failpoint.Inject("mockCopSenderError", func() {
idxRs.Err = errors.New("mock cop error")
idxRs.Err = errors.New("ErrMockRetryable")
})
p.chunkSender.AddTask(idxRs)
startTime = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
}

failpoint.Inject("mockFinishImportErr", func() {
failpoint.Return(fmt.Errorf("mock finish import error"))
failpoint.Return(fmt.Errorf("ErrMockRetryable"))
})

// Check remote duplicate value for the index.
Expand Down
9 changes: 5 additions & 4 deletions pkg/ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ type engineInfo struct {

// newEngineInfo create a new engineInfo struct.
func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig,
en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot) *engineInfo {
en *backend.OpenedEngine, uuid uuid.UUID, memRoot MemRoot) *engineInfo {
return &engineInfo{
ctx: ctx,
jobID: jobID,
indexID: indexID,
cfg: cfg,
openedEngine: en,
uuid: uuid,
writerCount: wCnt,
writerCache: generic.NewSyncMap[int, backend.EngineWriter](wCnt),
writerCount: 0,
writerCache: generic.NewSyncMap[int, backend.EngineWriter](4),
memRoot: memRoot,
flushLock: &sync.RWMutex{},
}
Expand Down Expand Up @@ -194,13 +194,14 @@ func (ei *engineInfo) CreateWriter(id int) (Writer, error) {
return nil, err
}

ei.writerCount++
ei.memRoot.Consume(StructSizeWriterCtx)
logutil.Logger(ei.ctx).Info(LitInfoCreateWrite, zap.Int64("job ID", ei.jobID),
zap.Int64("index ID", ei.indexID), zap.Int("worker ID", id),
zap.Int64("allocate memory", StructSizeWriterCtx),
zap.Int64("current memory usage", ei.memRoot.CurrentUsage()),
zap.Int64("max memory quota", ei.memRoot.MaxMemoryQuota()))
return wCtx, err
return wCtx, nil
}

// newWriterContext will get worker local writer from engine info writer cache first, if exists.
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st
return nil, errors.Trace(err)
}
id := openedEn.GetEngineUUID()
en = newEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, 1, bc.MemRoot)
en = newEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, bc.MemRoot)
bc.Store(indexID, en)
bc.MemRoot.Consume(StructSizeEngineInfo)
bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize)
Expand All @@ -74,7 +74,6 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st
zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency))
return nil, dbterror.ErrIngestFailed.FastGenByArgs("concurrency quota exceeded")
}
en.writerCount++
info = LitInfoAddWriter
}
bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize))
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
}

failpoint.Inject("mockWritePeerErr", func() {
err = errors.Errorf("mock write peer error")
err = errors.Errorf("ErrMockRetryable")
failpoint.Return(annotateErr(err, peer, "when open write stream"))
})

Expand Down
1 change: 1 addition & 0 deletions pkg/util/dbterror/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/errno",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"@com_github_pingcap_errors//:errors",
],
)

Expand Down
29 changes: 27 additions & 2 deletions pkg/util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package dbterror

import (
"fmt"
"strings"

"github.com/pingcap/errors"
mysql "github.com/pingcap/tidb/pkg/errno"
parser_mysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
Expand Down Expand Up @@ -497,8 +499,7 @@ var (
"tidb_enable_dist_task setting. To utilize distributed task execution, please enable tidb_ddl_enable_fast_reorg first."), nil))
)

// ReorgRetryableErrCodes is the error codes that are retryable for reorganization.
var ReorgRetryableErrCodes = map[uint16]struct{}{
var reorgRetryableErrCodes = map[uint16]struct{}{
mysql.ErrPDServerTimeout: {},
mysql.ErrTiKVServerTimeout: {},
mysql.ErrTiKVServerBusy: {},
Expand All @@ -520,3 +521,27 @@ var ReorgRetryableErrCodes = map[uint16]struct{}{
// Temporary network partitioning may cause pk commit failure.
uint16(terror.CodeResultUndetermined): {},
}

// reorgRetryableMessages record errors that do not have an error code.
var reorgRetryableMessages = []string{
"ErrPDBatchScanRegion", // TiDB restarts during import may encounter this error.
"ErrMockRetryable", // Only used for test.
}

// IsReorgRetryableErr checks whether the error is retryable during DDL reorganization.
func IsReorgRetryableErr(err error) bool {
msg := err.Error()
for _, retryMsg := range reorgRetryableMessages {
if strings.Contains(msg, retryMsg) {
return true
}
}
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := reorgRetryableErrCodes[sqlErr.Code]
return ok
}
// can't retry unknown err.
return false
}