From 20ec4a63b174d04475f3d214c3f0b62194e920c9 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 22 Apr 2024 16:44:47 +0800 Subject: [PATCH 1/8] util/dbterror: add ErrUnknown to reorg retryable errors --- pkg/util/dbterror/ddl_terror.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/dbterror/ddl_terror.go b/pkg/util/dbterror/ddl_terror.go index 35d41f308631b..3353a8c614300 100644 --- a/pkg/util/dbterror/ddl_terror.go +++ b/pkg/util/dbterror/ddl_terror.go @@ -516,6 +516,7 @@ var ReorgRetryableErrCodes = map[uint16]struct{}{ mysql.ErrWriteConflictInTiDB: {}, mysql.ErrTxnRetryable: {}, mysql.ErrNotOwner: {}, + mysql.ErrUnknown: {}, // Temporary network partitioning may cause pk commit failure. uint16(terror.CodeResultUndetermined): {}, From 50d129b50d6bdfbdf15cdd155b947e9cc91152b4 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 26 Apr 2024 12:15:17 +0800 Subject: [PATCH 2/8] refine reorg retryable errors --- pkg/ddl/backfilling_dist_executor.go | 14 +------------- pkg/ddl/index.go | 9 +-------- pkg/util/dbterror/ddl_terror.go | 29 +++++++++++++++++++++++++--- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index ac1024279931d..66e91fd548381 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/logutil" @@ -204,19 +203,8 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool { return true } -func isRetryableError(err error) bool { - originErr := errors.Cause(err) - if tErr, ok := originErr.(*terror.Error); ok { - sqlErr := terror.ToSQLError(tErr) - _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] - return ok - } - // can't retry Unknown err. - return false -} - func (*backfillDistExecutor) IsRetryableError(err error) bool { - return common.IsRetryableError(err) || isRetryableError(err) + return common.IsRetryableError(err) || dbterror.IsReorgRetryableErr(err) } func (s *backfillDistExecutor) Close() { diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 99b735d6afdd4..398c89c36cb43 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -970,14 +970,7 @@ func errorIsRetryable(err error, job *model.Job) bool { if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() { return false } - originErr := errors.Cause(err) - if tErr, ok := originErr.(*terror.Error); ok { - sqlErr := terror.ToSQLError(tErr) - _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] - return ok - } - // For the unknown errors, we should retry. - return true + return dbterror.IsReorgRetryableErr(err) } func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error { diff --git a/pkg/util/dbterror/ddl_terror.go b/pkg/util/dbterror/ddl_terror.go index 3353a8c614300..07f87ed0fddf9 100644 --- a/pkg/util/dbterror/ddl_terror.go +++ b/pkg/util/dbterror/ddl_terror.go @@ -16,10 +16,12 @@ package dbterror import ( "fmt" + "strings" mysql "github.com/pingcap/tidb/pkg/errno" parser_mysql "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pkg/errors" ) var ( @@ -497,8 +499,7 @@ var ( "tidb_enable_dist_task setting. To utilize distributed task execution, please enable tidb_ddl_enable_fast_reorg first."), nil)) ) -// ReorgRetryableErrCodes is the error codes that are retryable for reorganization. -var ReorgRetryableErrCodes = map[uint16]struct{}{ +var reorgRetryableErrCodes = map[uint16]struct{}{ mysql.ErrPDServerTimeout: {}, mysql.ErrTiKVServerTimeout: {}, mysql.ErrTiKVServerBusy: {}, @@ -516,8 +517,30 @@ var ReorgRetryableErrCodes = map[uint16]struct{}{ mysql.ErrWriteConflictInTiDB: {}, mysql.ErrTxnRetryable: {}, mysql.ErrNotOwner: {}, - mysql.ErrUnknown: {}, // Temporary network partitioning may cause pk commit failure. uint16(terror.CodeResultUndetermined): {}, } + +// reorgRetryableMessages record errors that do not have an error code. +var reorgRetryableMessages = []string{ + "ErrPDBatchScanRegion", // TiDB restarts during import may encounter this error. +} + +// IsReorgRetryableErr check whether the error is retryable during DDL reorganization. +func IsReorgRetryableErr(err error) bool { + msg := err.Error() + for _, retryMsg := range reorgRetryableMessages { + if strings.Contains(msg, retryMsg) { + return true + } + } + originErr := errors.Cause(err) + if tErr, ok := originErr.(*terror.Error); ok { + sqlErr := terror.ToSQLError(tErr) + _, ok := reorgRetryableErrCodes[sqlErr.Code] + return ok + } + // can't retry unknown err. + return false +} From a9ab7cfac1d163fcd064e0b11efc4589db722032 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 26 Apr 2024 13:21:50 +0800 Subject: [PATCH 3/8] update bazel --- pkg/util/dbterror/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/dbterror/BUILD.bazel b/pkg/util/dbterror/BUILD.bazel index 188fe9af542e3..e6c1238c50bdc 100644 --- a/pkg/util/dbterror/BUILD.bazel +++ b/pkg/util/dbterror/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/errno", "//pkg/parser/mysql", "//pkg/parser/terror", + "@com_github_pkg_errors//:errors", ], ) From 0c4f5b130093c7f248fd1eebbd0944192f6a4f60 Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 28 Apr 2024 15:54:34 +0800 Subject: [PATCH 4/8] fix integration test TestIngestError --- pkg/ddl/index.go | 2 +- pkg/ddl/index_cop.go | 2 +- pkg/util/dbterror/ddl_terror.go | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 398c89c36cb43..02f46c1cbfb20 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1817,7 +1817,7 @@ func writeOneKVToLocal( return errors.Trace(err) } failpoint.Inject("mockLocalWriterError", func() { - failpoint.Return(errors.New("mock engine error")) + failpoint.Return(errors.New("ErrMockRetryable")) }) writeBufs.IndexKeyBuf = key writeBufs.RowValBuf = idxVal diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 58d6980ad9788..1faa1428d4394 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -169,7 +169,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session) rate := float64(srcChk.MemoryUsage()) / 1024.0 / 1024.0 / time.Since(startTime).Seconds() metrics.AddIndexScanRate.WithLabelValues(metrics.LblAddIndex).Observe(rate) failpoint.Inject("mockCopSenderError", func() { - idxRs.Err = errors.New("mock cop error") + idxRs.Err = errors.New("ErrMockRetryable") }) p.chunkSender.AddTask(idxRs) startTime = time.Now() diff --git a/pkg/util/dbterror/ddl_terror.go b/pkg/util/dbterror/ddl_terror.go index 07f87ed0fddf9..e0cddf7cbaf90 100644 --- a/pkg/util/dbterror/ddl_terror.go +++ b/pkg/util/dbterror/ddl_terror.go @@ -525,9 +525,10 @@ var reorgRetryableErrCodes = map[uint16]struct{}{ // reorgRetryableMessages record errors that do not have an error code. var reorgRetryableMessages = []string{ "ErrPDBatchScanRegion", // TiDB restarts during import may encounter this error. + "ErrMockRetryable", // Only used for test. } -// IsReorgRetryableErr check whether the error is retryable during DDL reorganization. +// IsReorgRetryableErr checks whether the error is retryable during DDL reorganization. func IsReorgRetryableErr(err error) bool { msg := err.Error() for _, retryMsg := range reorgRetryableMessages { From b1b5b2b037185c3f100b78f715f9dfcf9ef175fd Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 28 Apr 2024 16:53:31 +0800 Subject: [PATCH 5/8] update bazel --- pkg/util/dbterror/BUILD.bazel | 2 +- pkg/util/dbterror/ddl_terror.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/dbterror/BUILD.bazel b/pkg/util/dbterror/BUILD.bazel index e6c1238c50bdc..64488970e6f9e 100644 --- a/pkg/util/dbterror/BUILD.bazel +++ b/pkg/util/dbterror/BUILD.bazel @@ -12,7 +12,7 @@ go_library( "//pkg/errno", "//pkg/parser/mysql", "//pkg/parser/terror", - "@com_github_pkg_errors//:errors", + "@com_github_pingcap_errors//:errors", ], ) diff --git a/pkg/util/dbterror/ddl_terror.go b/pkg/util/dbterror/ddl_terror.go index e0cddf7cbaf90..2a072819ac09e 100644 --- a/pkg/util/dbterror/ddl_terror.go +++ b/pkg/util/dbterror/ddl_terror.go @@ -18,10 +18,10 @@ import ( "fmt" "strings" + "github.com/pingcap/errors" mysql "github.com/pingcap/tidb/pkg/errno" parser_mysql "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" - "github.com/pkg/errors" ) var ( From e63e00ca7dbe96e09fba2c8750b75449d12b0df7 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 9 May 2024 11:26:12 +0800 Subject: [PATCH 6/8] fix TestAddIndexIngestMemoryUsage --- pkg/ddl/ingest/engine.go | 9 +++++---- pkg/ddl/ingest/engine_mgr.go | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ddl/ingest/engine.go b/pkg/ddl/ingest/engine.go index 1a66d9ef0f4b1..413650efd4251 100644 --- a/pkg/ddl/ingest/engine.go +++ b/pkg/ddl/ingest/engine.go @@ -67,7 +67,7 @@ type engineInfo struct { // newEngineInfo create a new engineInfo struct. func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig, - en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot) *engineInfo { + en *backend.OpenedEngine, uuid uuid.UUID, memRoot MemRoot) *engineInfo { return &engineInfo{ ctx: ctx, jobID: jobID, @@ -75,8 +75,8 @@ func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.Engin cfg: cfg, openedEngine: en, uuid: uuid, - writerCount: wCnt, - writerCache: generic.NewSyncMap[int, backend.EngineWriter](wCnt), + writerCount: 0, + writerCache: generic.NewSyncMap[int, backend.EngineWriter](4), memRoot: memRoot, flushLock: &sync.RWMutex{}, } @@ -194,13 +194,14 @@ func (ei *engineInfo) CreateWriter(id int) (Writer, error) { return nil, err } + ei.writerCount++ ei.memRoot.Consume(StructSizeWriterCtx) logutil.Logger(ei.ctx).Info(LitInfoCreateWrite, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID), zap.Int("worker ID", id), zap.Int64("allocate memory", StructSizeWriterCtx), zap.Int64("current memory usage", ei.memRoot.CurrentUsage()), zap.Int64("max memory quota", ei.memRoot.MaxMemoryQuota())) - return wCtx, err + return wCtx, nil } // newWriterContext will get worker local writer from engine info writer cache first, if exists. diff --git a/pkg/ddl/ingest/engine_mgr.go b/pkg/ddl/ingest/engine_mgr.go index de2483c383504..cf77f73a2c19b 100644 --- a/pkg/ddl/ingest/engine_mgr.go +++ b/pkg/ddl/ingest/engine_mgr.go @@ -62,7 +62,7 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st return nil, errors.Trace(err) } id := openedEn.GetEngineUUID() - en = newEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, 1, bc.MemRoot) + en = newEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, bc.MemRoot) bc.Store(indexID, en) bc.MemRoot.Consume(StructSizeEngineInfo) bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize) @@ -74,7 +74,6 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency)) return nil, dbterror.ErrIngestFailed.FastGenByArgs("concurrency quota exceeded") } - en.writerCount++ info = LitInfoAddWriter } bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) From 41a76ffd8ee838b19d83ef64b7029270576e94ad Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 9 May 2024 11:31:32 +0800 Subject: [PATCH 7/8] fix TestAddIndexImportFailed --- pkg/lightning/backend/local/region_job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 7bc812e4b9bb6..9e51ca04d8565 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -308,7 +308,7 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { } failpoint.Inject("mockWritePeerErr", func() { - err = errors.Errorf("mock write peer error") + err = errors.Errorf("ErrMockRetryable") failpoint.Return(annotateErr(err, peer, "when open write stream")) }) From 28c1d2d0123cb9eb62733fd700753b7c281d59e9 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 9 May 2024 11:32:52 +0800 Subject: [PATCH 8/8] fix TestAddIndexFinishImportError --- pkg/ddl/ingest/backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 7747502493283..0afff5ce85e81 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -146,7 +146,7 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl } failpoint.Inject("mockFinishImportErr", func() { - failpoint.Return(fmt.Errorf("mock finish import error")) + failpoint.Return(fmt.Errorf("ErrMockRetryable")) }) // Check remote duplicate value for the index.