Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -1225,10 +1226,15 @@ func collectRangeFiles(progressRangeTree *rtree.ProgressRangeTree, metaWriter *m
var rangeAscendErr error
progressRange.Res.Ascend(func(i btree.Item) bool {
r := i.(*rtree.Range)
cfCount := make(map[string]int)
for _, f := range r.Files {
cfCount[f.Cf] += 1
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
}
for cf, count := range cfCount {
summary.CollectInt(fmt.Sprintf("%s CF files", cf), count)
}
// we need keep the files in order after we support multi_ingest sst.
// default_sst and write_sst need to be together.
if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ type Progress interface {
// called.
Close()
}

// WithProgress execute some logic with the progress, and close it once the execution done.
func WithProgress(
ctx context.Context,
g Glue,
cmdName string,
total int64,
redirectLog bool,
cc func(p Progress) error,
) error {
p := g.StartProgress(ctx, cmdName, total, redirectLog)
defer p.Close()
return cc(p)
}
25 changes: 22 additions & 3 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ const (
strictPlacementPolicyMode = "STRICT"
ignorePlacementPolicyMode = "IGNORE"

defaultDDLConcurrency = 16
maxSplitKeysOnce = 10240
defaultDDLConcurrency = 16
maxSplitKeysOnce = 10240
resetSpeedLimitRetryTimes = 3
)

const minBatchDdlSize = 1
Expand Down Expand Up @@ -948,7 +949,25 @@ func (rc *SnapClient) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error
return nil
}

func (rc *SnapClient) ResetSpeedLimit(ctx context.Context) error {
func (rc *SnapClient) resetSpeedLimit(ctx context.Context) {
var resetErr error
// In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO
for retry := 0; retry < resetSpeedLimitRetryTimes; retry++ {
resetErr = rc.resetSpeedLimitInternal(ctx)
if resetErr != nil {
log.Warn("failed to reset speed limit, retry it",
zap.Int("retry time", retry), logutil.ShortError(resetErr))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
break
}
if resetErr != nil {
log.Error("failed to reset speed limit, please reset it manually", zap.Error(resetErr))
}
}

func (rc *SnapClient) resetSpeedLimitInternal(ctx context.Context) error {
rc.hasSpeedLimited = false
err := rc.setSpeedLimit(ctx, 0)
if err != nil {
Expand Down
122 changes: 122 additions & 0 deletions br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/engine"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -106,6 +107,125 @@ func defaultOutputTableChan() chan *CreatedTable {
return make(chan *CreatedTable, defaultChannelSize)
}

// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

type PipelineContext struct {
// pipeline item switch
Checksum bool
LoadStats bool
WaitTiflashReady bool

// pipeline item configuration
LogProgress bool
ChecksumConcurrency uint
StatsConcurrency uint

// pipeline item tool client
KvClient kv.Client
ExtStorage storage.ExternalStorage
Glue glue.Glue
}

// RestorePipeline do some work in pipeline, such as checkum, load stats and wait tiflash ready.
func (rc *SnapClient) RestorePipeline(ctx context.Context, plCtx PipelineContext, createdTables []*CreatedTable) (err error) {
// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
postHandleCh := afterTableRestoredCh(ctx, createdTables)
progressLen := int64(0)
if plCtx.Checksum {
progressLen += int64(len(createdTables))
}
progressLen += int64(len(createdTables)) // for pipeline item - update stats meta
if plCtx.WaitTiflashReady {
progressLen += int64(len(createdTables))
}

// Redirect to log if there is no log file to avoid unreadable output.
updateCh := plCtx.Glue.StartProgress(ctx, "Restore Pipeline", progressLen, !plCtx.LogProgress)
defer updateCh.Close()
// pipeline checksum
if plCtx.Checksum {
postHandleCh = rc.GoValidateChecksum(
ctx, postHandleCh, plCtx.KvClient, errCh, updateCh, plCtx.ChecksumConcurrency)
}

// pipeline update meta and load stats
postHandleCh = rc.GoUpdateMetaAndLoadStats(ctx, plCtx.ExtStorage, postHandleCh, errCh, updateCh, plCtx.StatsConcurrency, plCtx.LoadStats)

// pipeline wait Tiflash synced
if plCtx.WaitTiflashReady {
postHandleCh = rc.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh)
}

finish := dropToBlackhole(ctx, postHandleCh, errCh)

select {
case err = <-errCh:
err = multierr.Append(err, multierr.Combine(Exhaust(errCh)...))
case <-finish:
}

return errors.Trace(err)
}

func afterTableRestoredCh(ctx context.Context, createdTables []*CreatedTable) <-chan *CreatedTable {
outCh := make(chan *CreatedTable)

go func() {
defer close(outCh)

for _, createdTable := range createdTables {
select {
case <-ctx.Done():
return
default:
outCh <- createdTable
}
}
}()
return outCh
}

// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(
ctx context.Context,
inCh <-chan *CreatedTable,
errCh chan<- error,
) <-chan struct{} {
outCh := make(chan struct{}, 1)
go func() {
defer func() {
close(outCh)
}()
for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case _, ok := <-inCh:
if !ok {
return
}
}
}
}()
return outCh
}

func concurrentHandleTablesCh(
ctx context.Context,
inCh <-chan *CreatedTable,
Expand Down Expand Up @@ -183,6 +303,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
s storage.ExternalStorage,
inCh <-chan *CreatedTable,
errCh chan<- error,
updateCh glue.Progress,
statsConcurrency uint,
loadStats bool,
) chan *CreatedTable {
Expand Down Expand Up @@ -233,6 +354,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
}
}
updateCh.Inc()
return nil
}, func() {
log.Info("all stats updated")
Expand Down
Loading