Skip to content

Commit f1d99db

Browse files
LeavrthMichael Deng
authored andcommitted
br: concurrently repairing indexes (pingcap#59159)
close pingcap#59158
1 parent 950ea34 commit f1d99db

File tree

6 files changed

+360
-25
lines changed

6 files changed

+360
-25
lines changed

br/pkg/checkpoint/log_restore.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,9 @@ type CheckpointIngestIndexRepairSQL struct {
315315
IndexName string `json:"index-name"`
316316
AddSQL string `json:"add-sql"`
317317
AddArgs []any `json:"add-args"`
318+
319+
OldIndexIDFound bool `json:"-"`
320+
IndexRepaired bool `json:"-"`
318321
}
319322

320323
type CheckpointIngestIndexRepairSQLs struct {

br/pkg/glue/progressing.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ import (
77
"fmt"
88
"io"
99
"os"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/fatih/color"
14+
"github.com/pingcap/log"
1315
"github.com/pingcap/tidb/br/pkg/utils"
1416
"github.com/vbauerster/mpb/v7"
1517
"github.com/vbauerster/mpb/v7/decor"
18+
"go.uber.org/zap"
1619
"golang.org/x/term"
1720
)
1821

@@ -173,3 +176,80 @@ func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
173176
color.RedString("ABORTED"))),
174177
)
175178
}
179+
180+
type ProgressBar interface {
181+
Increment()
182+
Done()
183+
}
184+
185+
type MultiProgress interface {
186+
AddTextBar(string, int64) ProgressBar
187+
Wait()
188+
}
189+
190+
func (ops ConsoleOperations) StartMultiProgress() MultiProgress {
191+
if !ops.OutputIsTTY() {
192+
return &NopMultiProgress{}
193+
}
194+
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
195+
return &TerminalMultiProgress{
196+
progress: pb,
197+
}
198+
}
199+
200+
type NopMultiProgress struct{}
201+
202+
type LogBar struct {
203+
name string
204+
total int64
205+
}
206+
207+
func (nmp *NopMultiProgress) AddTextBar(name string, total int64) ProgressBar {
208+
log.Info("progress start", zap.String("name", name))
209+
return &LogBar{
210+
name: name,
211+
total: total,
212+
}
213+
}
214+
215+
func (nmp *NopMultiProgress) Wait() {}
216+
217+
func (lb *LogBar) Increment() {
218+
if atomic.AddInt64(&lb.total, -1) <= 0 {
219+
log.Info("progress done", zap.String("name", lb.name))
220+
}
221+
}
222+
223+
func (lb *LogBar) Done() {}
224+
225+
type TerminalBar struct {
226+
bar *mpb.Bar
227+
}
228+
229+
func (tb *TerminalBar) Increment() {
230+
tb.bar.Increment()
231+
}
232+
233+
func (tb *TerminalBar) Done() {
234+
tb.bar.Abort(false)
235+
tb.bar.Wait()
236+
}
237+
238+
type TerminalMultiProgress struct {
239+
progress *mpb.Progress
240+
}
241+
242+
func (tmp *TerminalMultiProgress) AddTextBar(name string, total int64) ProgressBar {
243+
bar := tmp.progress.New(total,
244+
mpb.NopStyle(),
245+
mpb.PrependDecorators(decor.Name(name)),
246+
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText),
247+
color.RedString("ABORTED"),
248+
)),
249+
)
250+
return &TerminalBar{bar: bar}
251+
}
252+
253+
func (tmp *TerminalMultiProgress) Wait() {
254+
tmp.progress.Wait()
255+
}

br/pkg/restore/log_client/client.go

Lines changed: 83 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ const maxSplitKeysOnce = 10240
8080
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
8181
const rawKVBatchCount = 64
8282

83+
// session count for repairing ingest indexes. Currently only one TiDB node executes adding index jobs
84+
// at the same time and the add-index job concurrency is about min(10, `TiDB CPUs / 4`).
85+
const defaultRepairIndexSessionCount uint = 10
86+
8387
type LogClient struct {
8488
cipher *backuppb.CipherInfo
8589
pdClient pd.Client
@@ -223,10 +227,48 @@ func (rc *LogClient) StartCheckpointRunnerForLogRestore(ctx context.Context, g g
223227
return runner, errors.Trace(err)
224228
}
225229

230+
func createSession(ctx context.Context, g glue.Glue, store kv.Storage) (glue.Session, error) {
231+
unsafeSession, err := g.CreateSession(store)
232+
if err != nil {
233+
return nil, errors.Trace(err)
234+
}
235+
// Set SQL mode to None for avoiding SQL compatibility problem
236+
err = unsafeSession.Execute(ctx, "set @@sql_mode=''")
237+
if err != nil {
238+
return nil, errors.Trace(err)
239+
}
240+
return unsafeSession, nil
241+
}
242+
243+
func createSessions(ctx context.Context, g glue.Glue, store kv.Storage, count uint) (createdUnsafeSessions []glue.Session, createErr error) {
244+
unsafeSessions := make([]glue.Session, 0, count)
245+
defer func() {
246+
if createErr != nil {
247+
closeSessions(unsafeSessions)
248+
}
249+
}()
250+
for range count {
251+
unsafeSession, err := createSession(ctx, g, store)
252+
if err != nil {
253+
return nil, errors.Trace(err)
254+
}
255+
unsafeSessions = append(unsafeSessions, unsafeSession)
256+
}
257+
return unsafeSessions, nil
258+
}
259+
260+
func closeSessions(sessions []glue.Session) {
261+
for _, session := range sessions {
262+
if session != nil {
263+
session.Close()
264+
}
265+
}
266+
}
267+
226268
// Init create db connection and domain for storage.
227-
func (rc *LogClient) Init(g glue.Glue, store kv.Storage) error {
269+
func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error {
228270
var err error
229-
rc.se, err = g.CreateSession(store)
271+
rc.se, err = createSession(ctx, g, store)
230272
if err != nil {
231273
return errors.Trace(err)
232274
}
@@ -1423,39 +1465,60 @@ func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *inge
14231465

14241466
info := rc.dom.InfoSchema()
14251467
console := glue.GetConsole(g)
1426-
NEXTSQL:
1427-
for _, sql := range sqls {
1428-
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
1429-
1468+
for i, sql := range sqls {
14301469
tableInfo, err := info.TableByName(ctx, sql.SchemaName, sql.TableName)
14311470
if err != nil {
14321471
return errors.Trace(err)
14331472
}
1434-
oldIndexIDFound := false
1473+
sqls[i].OldIndexIDFound = false
1474+
sqls[i].IndexRepaired = false
14351475
if fromCheckpoint {
14361476
for _, idx := range tableInfo.Indices() {
14371477
indexInfo := idx.Meta()
14381478
if indexInfo.ID == sql.IndexID {
14391479
// the original index id is not dropped
1440-
oldIndexIDFound = true
1480+
sqls[i].OldIndexIDFound = true
14411481
break
14421482
}
14431483
// what if index's state is not public?
14441484
if indexInfo.Name.O == sql.IndexName {
1485+
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
14451486
// find the same name index, but not the same index id,
14461487
// which means the repaired index id is created
14471488
if _, err := fmt.Fprintf(console.Out(), "%s ... %s\n", progressTitle, color.HiGreenString("SKIPPED DUE TO CHECKPOINT MODE")); err != nil {
14481489
return errors.Trace(err)
14491490
}
1450-
continue NEXTSQL
1491+
sqls[i].IndexRepaired = true
1492+
break
14511493
}
14521494
}
14531495
}
1496+
}
14541497

1455-
if err := func(sql checkpoint.CheckpointIngestIndexRepairSQL) error {
1456-
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
1457-
defer w.Close()
1498+
sessionCount := defaultRepairIndexSessionCount
1499+
indexSessions, err := createSessions(ctx, g, rc.dom.Store(), sessionCount)
1500+
if err != nil {
1501+
return errors.Trace(err)
1502+
}
1503+
defer func() {
1504+
closeSessions(indexSessions)
1505+
}()
1506+
workerpool := tidbutil.NewWorkerPool(sessionCount, "repair ingest index")
1507+
eg, ectx := errgroup.WithContext(ctx)
1508+
mp := console.StartMultiProgress()
1509+
for _, sql := range sqls {
1510+
if sql.IndexRepaired {
1511+
continue
1512+
}
1513+
if ectx.Err() != nil {
1514+
break
1515+
}
1516+
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
1517+
w := mp.AddTextBar(progressTitle, 1)
1518+
workerpool.ApplyWithIDInErrorGroup(eg, func(id uint64) error {
1519+
defer w.Done()
14581520

1521+
indexSession := indexSessions[id%uint64(len(indexSessions))]
14591522
// TODO: When the TiDB supports the DROP and CREATE the same name index in one SQL,
14601523
// the checkpoint for ingest recorder can be removed and directly use the SQL:
14611524
// ALTER TABLE db.tbl DROP INDEX `i_1`, ADD IDNEX `i_1` ...
@@ -1466,8 +1529,8 @@ NEXTSQL:
14661529
// restored metakv and then skips repairing it.
14671530

14681531
// only when first execution or old index id is not dropped
1469-
if !fromCheckpoint || oldIndexIDFound {
1470-
if err := rc.se.ExecuteInternal(ctx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
1532+
if !fromCheckpoint || sql.OldIndexIDFound {
1533+
if err := indexSession.ExecuteInternal(ectx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
14711534
return errors.Trace(err)
14721535
}
14731536
}
@@ -1477,17 +1540,15 @@ NEXTSQL:
14771540
}
14781541
})
14791542
// create the repaired index when first execution or not found it
1480-
if err := rc.se.ExecuteInternal(ctx, sql.AddSQL, sql.AddArgs...); err != nil {
1481-
return errors.Trace(err)
1482-
}
1483-
w.Inc()
1484-
if err := w.Wait(ctx); err != nil {
1543+
if err := indexSession.ExecuteInternal(ectx, sql.AddSQL, sql.AddArgs...); err != nil {
14851544
return errors.Trace(err)
14861545
}
1546+
w.Increment()
14871547
return nil
1488-
}(sql); err != nil {
1489-
return errors.Trace(err)
1490-
}
1548+
})
1549+
}
1550+
if err := eg.Wait(); err != nil {
1551+
return errors.Trace(err)
14911552
}
14921553

14931554
return nil

0 commit comments

Comments
 (0)