Skip to content

Commit daea732

Browse files
Leavrthzeminzhou
authored andcommitted
br: concurrently repairing indexes (pingcap#59159)
close pingcap#59158
1 parent 5653601 commit daea732

File tree

6 files changed

+358
-27
lines changed

6 files changed

+358
-27
lines changed

br/pkg/checkpoint/log_restore.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ type CheckpointIngestIndexRepairSQL struct {
304304
IndexName string `json:"index-name"`
305305
AddSQL string `json:"add-sql"`
306306
AddArgs []any `json:"add-args"`
307+
308+
OldIndexIDFound bool `json:"-"`
309+
IndexRepaired bool `json:"-"`
307310
}
308311

309312
type CheckpointIngestIndexRepairSQLs struct {

br/pkg/glue/progressing.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ import (
77
"fmt"
88
"io"
99
"os"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/fatih/color"
14+
"github.com/pingcap/log"
1315
"github.com/pingcap/tidb/br/pkg/utils"
1416
"github.com/vbauerster/mpb/v7"
1517
"github.com/vbauerster/mpb/v7/decor"
18+
"go.uber.org/zap"
1619
"golang.org/x/term"
1720
)
1821

@@ -188,3 +191,80 @@ func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
188191
color.RedString("ABORTED"))),
189192
)
190193
}
194+
195+
type ProgressBar interface {
196+
Increment()
197+
Done()
198+
}
199+
200+
type MultiProgress interface {
201+
AddTextBar(string, int64) ProgressBar
202+
Wait()
203+
}
204+
205+
func (ops ConsoleOperations) StartMultiProgress() MultiProgress {
206+
if !ops.OutputIsTTY() {
207+
return &NopMultiProgress{}
208+
}
209+
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
210+
return &TerminalMultiProgress{
211+
progress: pb,
212+
}
213+
}
214+
215+
type NopMultiProgress struct{}
216+
217+
type LogBar struct {
218+
name string
219+
total int64
220+
}
221+
222+
func (nmp *NopMultiProgress) AddTextBar(name string, total int64) ProgressBar {
223+
log.Info("progress start", zap.String("name", name))
224+
return &LogBar{
225+
name: name,
226+
total: total,
227+
}
228+
}
229+
230+
func (nmp *NopMultiProgress) Wait() {}
231+
232+
func (lb *LogBar) Increment() {
233+
if atomic.AddInt64(&lb.total, -1) <= 0 {
234+
log.Info("progress done", zap.String("name", lb.name))
235+
}
236+
}
237+
238+
func (lb *LogBar) Done() {}
239+
240+
type TerminalBar struct {
241+
bar *mpb.Bar
242+
}
243+
244+
func (tb *TerminalBar) Increment() {
245+
tb.bar.Increment()
246+
}
247+
248+
func (tb *TerminalBar) Done() {
249+
tb.bar.Abort(false)
250+
tb.bar.Wait()
251+
}
252+
253+
type TerminalMultiProgress struct {
254+
progress *mpb.Progress
255+
}
256+
257+
func (tmp *TerminalMultiProgress) AddTextBar(name string, total int64) ProgressBar {
258+
bar := tmp.progress.New(total,
259+
mpb.NopStyle(),
260+
mpb.PrependDecorators(decor.Name(name)),
261+
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText),
262+
color.RedString("ABORTED"),
263+
)),
264+
)
265+
return &TerminalBar{bar: bar}
266+
}
267+
268+
func (tmp *TerminalMultiProgress) Wait() {
269+
tmp.progress.Wait()
270+
}

br/pkg/restore/log_client/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,13 @@ go_test(
9595
flaky = True,
9696
shard_count = 50,
9797
deps = [
98+
"//br/pkg/checkpoint",
9899
"//br/pkg/errors",
99100
"//br/pkg/glue",
100101
"//br/pkg/gluetidb",
101102
"//br/pkg/mock",
102103
"//br/pkg/restore",
104+
"//br/pkg/restore/ingestrec",
103105
"//br/pkg/restore/internal/import_client",
104106
"//br/pkg/restore/internal/rawkv",
105107
"//br/pkg/restore/split",
@@ -111,6 +113,8 @@ go_test(
111113
"//br/pkg/utiltest",
112114
"//pkg/domain",
113115
"//pkg/kv",
116+
"//pkg/meta/model",
117+
"//pkg/parser/ast",
114118
"//pkg/planner/core/resolve",
115119
"//pkg/session",
116120
"//pkg/sessionctx",

br/pkg/restore/log_client/client.go

Lines changed: 82 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ const maxSplitKeysOnce = 10240
8484
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
8585
const rawKVBatchCount = 64
8686

87+
// session count for repairing ingest indexes. Currently only one TiDB node executes adding index jobs
88+
// at the same time and the add-index job concurrency is about min(10, `TiDB CPUs / 4`).
89+
const defaultRepairIndexSessionCount uint = 10
90+
8791
// LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration,
8892
// including concurrency management, checkpoint handling, and file importing for efficient log processing.
8993
type LogRestoreManager struct {
@@ -456,16 +460,48 @@ func (rc *LogClient) CleanUpKVFiles(
456460
return rc.logRestoreManager.fileImporter.ClearFiles(ctx, rc.pdClient, "v1")
457461
}
458462

459-
// Init create db connection and domain for storage.
460-
func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error {
461-
var err error
462-
rc.unsafeSession, err = g.CreateSession(store)
463+
func createSession(ctx context.Context, g glue.Glue, store kv.Storage) (glue.Session, error) {
464+
unsafeSession, err := g.CreateSession(store)
463465
if err != nil {
464-
return errors.Trace(err)
466+
return nil, errors.Trace(err)
465467
}
466-
467468
// Set SQL mode to None for avoiding SQL compatibility problem
468-
err = rc.unsafeSession.Execute(ctx, "set @@sql_mode=''")
469+
err = unsafeSession.Execute(ctx, "set @@sql_mode=''")
470+
if err != nil {
471+
return nil, errors.Trace(err)
472+
}
473+
return unsafeSession, nil
474+
}
475+
476+
func createSessions(ctx context.Context, g glue.Glue, store kv.Storage, count uint) (createdUnsafeSessions []glue.Session, createErr error) {
477+
unsafeSessions := make([]glue.Session, 0, count)
478+
defer func() {
479+
if createErr != nil {
480+
closeSessions(unsafeSessions)
481+
}
482+
}()
483+
for range count {
484+
unsafeSession, err := createSession(ctx, g, store)
485+
if err != nil {
486+
return nil, errors.Trace(err)
487+
}
488+
unsafeSessions = append(unsafeSessions, unsafeSession)
489+
}
490+
return unsafeSessions, nil
491+
}
492+
493+
func closeSessions(sessions []glue.Session) {
494+
for _, session := range sessions {
495+
if session != nil {
496+
session.Close()
497+
}
498+
}
499+
}
500+
501+
// Init create db connection and domain for storage.
502+
func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error {
503+
var err error
504+
rc.unsafeSession, err = createSession(ctx, g, store)
469505
if err != nil {
470506
return errors.Trace(err)
471507
}
@@ -1765,39 +1801,60 @@ func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *inge
17651801

17661802
info := rc.dom.InfoSchema()
17671803
console := glue.GetConsole(g)
1768-
NEXTSQL:
1769-
for _, sql := range sqls {
1770-
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
1771-
1804+
for i, sql := range sqls {
17721805
tableInfo, err := info.TableByName(ctx, sql.SchemaName, sql.TableName)
17731806
if err != nil {
17741807
return errors.Trace(err)
17751808
}
1776-
oldIndexIDFound := false
1809+
sqls[i].OldIndexIDFound = false
1810+
sqls[i].IndexRepaired = false
17771811
if fromCheckpoint {
17781812
for _, idx := range tableInfo.Indices() {
17791813
indexInfo := idx.Meta()
17801814
if indexInfo.ID == sql.IndexID {
17811815
// the original index id is not dropped
1782-
oldIndexIDFound = true
1816+
sqls[i].OldIndexIDFound = true
17831817
break
17841818
}
17851819
// what if index's state is not public?
17861820
if indexInfo.Name.O == sql.IndexName {
1821+
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
17871822
// find the same name index, but not the same index id,
17881823
// which means the repaired index id is created
17891824
if _, err := fmt.Fprintf(console.Out(), "%s ... %s\n", progressTitle, color.HiGreenString("SKIPPED DUE TO CHECKPOINT MODE")); err != nil {
17901825
return errors.Trace(err)
17911826
}
1792-
continue NEXTSQL
1827+
sqls[i].IndexRepaired = true
1828+
break
17931829
}
17941830
}
17951831
}
1832+
}
17961833

1797-
if err := func(sql checkpoint.CheckpointIngestIndexRepairSQL) error {
1798-
w := console.StartProgressBar(progressTitle, glue.OnlyOneTask)
1799-
defer w.Close()
1834+
sessionCount := defaultRepairIndexSessionCount
1835+
indexSessions, err := createSessions(ctx, g, rc.dom.Store(), sessionCount)
1836+
if err != nil {
1837+
return errors.Trace(err)
1838+
}
1839+
defer func() {
1840+
closeSessions(indexSessions)
1841+
}()
1842+
workerpool := tidbutil.NewWorkerPool(sessionCount, "repair ingest index")
1843+
eg, ectx := errgroup.WithContext(ctx)
1844+
mp := console.StartMultiProgress()
1845+
for _, sql := range sqls {
1846+
if sql.IndexRepaired {
1847+
continue
1848+
}
1849+
if ectx.Err() != nil {
1850+
break
1851+
}
1852+
progressTitle := fmt.Sprintf("repair ingest index %s for table %s.%s", sql.IndexName, sql.SchemaName, sql.TableName)
1853+
w := mp.AddTextBar(progressTitle, 1)
1854+
workerpool.ApplyWithIDInErrorGroup(eg, func(id uint64) error {
1855+
defer w.Done()
18001856

1857+
indexSession := indexSessions[id%uint64(len(indexSessions))]
18011858
// TODO: When the TiDB supports the DROP and CREATE the same name index in one SQL,
18021859
// the checkpoint for ingest recorder can be removed and directly use the SQL:
18031860
// ALTER TABLE db.tbl DROP INDEX `i_1`, ADD IDNEX `i_1` ...
@@ -1808,8 +1865,8 @@ NEXTSQL:
18081865
// restored metakv and then skips repairing it.
18091866

18101867
// only when first execution or old index id is not dropped
1811-
if !fromCheckpoint || oldIndexIDFound {
1812-
if err := rc.unsafeSession.ExecuteInternal(ctx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
1868+
if !fromCheckpoint || sql.OldIndexIDFound {
1869+
if err := indexSession.ExecuteInternal(ectx, alterTableDropIndexSQL, sql.SchemaName.O, sql.TableName.O, sql.IndexName); err != nil {
18131870
return errors.Trace(err)
18141871
}
18151872
}
@@ -1819,17 +1876,15 @@ NEXTSQL:
18191876
}
18201877
})
18211878
// create the repaired index when first execution or not found it
1822-
if err := rc.unsafeSession.ExecuteInternal(ctx, sql.AddSQL, sql.AddArgs...); err != nil {
1823-
return errors.Trace(err)
1824-
}
1825-
w.Inc()
1826-
if err := w.Wait(ctx); err != nil {
1879+
if err := indexSession.ExecuteInternal(ectx, sql.AddSQL, sql.AddArgs...); err != nil {
18271880
return errors.Trace(err)
18281881
}
1882+
w.Increment()
18291883
return nil
1830-
}(sql); err != nil {
1831-
return errors.Trace(err)
1832-
}
1884+
})
1885+
}
1886+
if err := eg.Wait(); err != nil {
1887+
return errors.Trace(err)
18331888
}
18341889

18351890
return nil

0 commit comments

Comments
 (0)