Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,28 @@ func WrapLogFilesIterWithCheckpointFailpoint(
return logIter, nil
}

// ResetTiflashReplicas set tiflash replicas by given sqls concurrently.
func (rc *LogClient) ResetTiflashReplicas(ctx context.Context, sqls []string, g glue.Glue) error {
resetSessions, err := createSessions(ctx, g, rc.dom.Store(), 16)
if err != nil {
return errors.Trace(err)
}
defer func() {
closeSessions(resetSessions)
}()
workerpool := tidbutil.NewWorkerPool(16, "repair ingest index")
eg, ectx := errgroup.WithContext(ctx)
for _, sql := range sqls {
resetSQL := sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember in our go version, we can directly use sql in for-loop

https://go.dev/blog/loopvar-preview

workerpool.ApplyWithIDInErrorGroup(eg, func(id uint64) error {
resetSession := resetSessions[id%uint64(len(resetSessions))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems ID are already allocated to 1 to worker pool size. So we can use resetSessions[id - 1]

log.Info("reset tiflash replica", zap.String("sql", sql))
return resetSession.ExecuteInternal(ectx, resetSQL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one SQL fails, should we continue to execute rest of them?

})
}
return eg.Wait()
}

func colsToStr(cols []ast.CIStr) string {
var str strings.Builder
for i, col := range cols {
Expand Down
15 changes: 2 additions & 13 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1783,19 +1783,8 @@ func restoreStream(
sqls := cfg.tiflashRecorder.GenerateAlterTableDDLs(mgr.GetDomain().InfoSchema())
log.Info("Generating SQLs for restoring TiFlash Replica",
zap.Strings("sqls", sqls))
err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error {
for _, sql := range sqls {
if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
logutil.ShortError(errExec),
zap.String("sql", sql),
)
}
}
return nil
})
if err != nil {
return err
if client.ResetTiflashReplicas(ctx, sqls, g); err != nil {
return errors.Annotate(err, "failed to reset tiflash replicas")
}
}

Expand Down
Loading