From 5ad000cb8dfd7edb8b77312d4b8e719fbab03928 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Wed, 3 Sep 2025 17:57:08 +0800 Subject: [PATCH 1/2] concurrently set tiflash replicas Signed-off-by: Jianjun Liao --- br/pkg/restore/log_client/client.go | 22 ++++++++++++++++++++++ br/pkg/task/stream.go | 15 ++------------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 8d90e8cbec440..5b7203a804eef 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -1504,6 +1504,28 @@ func WrapLogFilesIterWithCheckpointFailpoint( return logIter, nil } +// ResetTiflashReplicas set tiflash replicas by given sqls concurrently. +func (rc *LogClient) ResetTiflashReplicas(ctx context.Context, sqls []string, g glue.Glue) error { + resetSessions, err := createSessions(ctx, g, rc.dom.Store(), 16) + if err != nil { + return errors.Trace(err) + } + defer func() { + closeSessions(resetSessions) + }() + workerpool := tidbutil.NewWorkerPool(16, "repair ingest index") + eg, ectx := errgroup.WithContext(ctx) + for _, sql := range sqls { + resetSQL := sql + workerpool.ApplyWithIDInErrorGroup(eg, func(id uint64) error { + resetSession := resetSessions[id%uint64(len(resetSessions))] + log.Info("reset tiflash replica", zap.String("sql", sql)) + return resetSession.ExecuteInternal(ectx, resetSQL) + }) + } + return eg.Wait() +} + func colsToStr(cols []ast.CIStr) string { var str strings.Builder for i, col := range cols { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 04648aa0f90f7..d8609717ce353 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1783,19 +1783,8 @@ func restoreStream( sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema()) log.Info("Generating SQLs for restoring TiFlash Replica", zap.Strings("sqls", sqls)) - err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error { - for _, sql := range sqls { - if errExec := se.ExecuteInternal(ctx, sql); errExec != nil { - logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.", - logutil.ShortError(errExec), - zap.String("sql", sql), - ) - } - } - return nil - }) - if err != nil { - return err + if client.ResetTiflashReplicas(ctx, sqls, g); err != nil { + return errors.Annotate(err, "failed to reset tiflash replicas") } } From b92ea435e3240a752b8296b0c5d1f88484558472 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Mon, 15 Sep 2025 13:53:42 +0800 Subject: [PATCH 2/2] commit some suggestions Signed-off-by: Jianjun Liao --- br/pkg/restore/log_client/client.go | 3 +-- br/pkg/task/stream.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 5b7203a804eef..bfb04b46996ef 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -1516,11 +1516,10 @@ func (rc *LogClient) ResetTiflashReplicas(ctx context.Context, sqls []string, g workerpool := tidbutil.NewWorkerPool(16, "repair ingest index") eg, ectx := errgroup.WithContext(ctx) for _, sql := range sqls { - resetSQL := sql workerpool.ApplyWithIDInErrorGroup(eg, func(id uint64) error { resetSession := resetSessions[id%uint64(len(resetSessions))] log.Info("reset tiflash replica", zap.String("sql", sql)) - return resetSession.ExecuteInternal(ectx, resetSQL) + return resetSession.ExecuteInternal(ectx, sql) }) } return eg.Wait() diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index d8609717ce353..8ed58665001c5 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1783,7 +1783,7 @@ func restoreStream( sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema()) log.Info("Generating SQLs for restoring TiFlash Replica", zap.Strings("sqls", sqls)) - if client.ResetTiflashReplicas(ctx, sqls, g); err != nil { + if err := client.ResetTiflashReplicas(ctx, sqls, g); err != nil { return errors.Annotate(err, "failed to reset tiflash replicas") } }