Skip to content

Commit d12a12e

Browse files
authored
lignthing/importinto: parallel reading files infos from data store (pingcap#59382) (pingcap#61920)
close pingcap#56104, close pingcap#60224
1 parent 65c78b2 commit d12a12e

File tree

9 files changed

+274
-110
lines changed

9 files changed

+274
-110
lines changed

lightning/pkg/importer/precheck.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func NewPrecheckItemBuilderFromConfig(
5757
for _, o := range opts {
5858
o(builderCfg)
5959
}
60+
builderCfg.MDLoaderSetupOptions = append(builderCfg.MDLoaderSetupOptions, mydump.WithScanFileConcurrency(cfg.App.RegionConcurrency*2))
6061
targetDB, err := DBFromConfig(ctx, cfg.TiDB)
6162
if err != nil {
6263
return nil, errors.Trace(err)

lightning/pkg/server/lightning.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,10 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
529529

530530
loadTask := o.logger.Begin(zap.InfoLevel, "load data source")
531531
var mdl *mydump.MDLoader
532-
mdl, err = mydump.NewLoaderWithStore(ctx, mydump.NewLoaderCfg(taskCfg), s)
532+
mdl, err = mydump.NewLoaderWithStore(
533+
ctx, mydump.NewLoaderCfg(taskCfg), s,
534+
mydump.WithScanFileConcurrency(l.curTask.App.RegionConcurrency*2),
535+
)
533536
loadTask.End(zap.ErrorLevel, err)
534537
if err != nil {
535538
return errors.Trace(err)

pkg/executor/importer/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ go_test(
100100
embed = [":importer"],
101101
flaky = True,
102102
race = "on",
103-
shard_count = 28,
103+
shard_count = 27,
104104
deps = [
105105
"//br/pkg/errors",
106106
"//br/pkg/mock",
@@ -150,7 +150,6 @@ go_test(
150150
"@com_github_pingcap_errors//:errors",
151151
"@com_github_pingcap_failpoint//:failpoint",
152152
"@com_github_pingcap_kvproto//pkg/metapb",
153-
"@com_github_pingcap_log//:log",
154153
"@com_github_prometheus_client_golang//prometheus",
155154
"@com_github_stretchr_testify//require",
156155
"@com_github_tikv_client_go_v2//tikv",

pkg/executor/importer/import.go

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
10981098
Compression: compressTp,
10991099
Type: sourceType,
11001100
}
1101-
fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s)
1101+
fileMeta.RealSize = mydump.EstimateRealSizeForFile(ctx, fileMeta, s)
11021102
dataFiles = append(dataFiles, &fileMeta)
11031103
totalSize = size
11041104
} else {
@@ -1112,28 +1112,38 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
11121112
// access, else walkDir will fail
11131113
// we only support '*', in order to reuse glob library manually escape the path
11141114
escapedPath := stringutil.EscapeGlobQuestionMark(fileNameKey)
1115-
err := s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix, SkipSubDir: true},
1115+
1116+
allFiles := make([]mydump.RawFile, 0, 16)
1117+
if err := s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix, SkipSubDir: true},
11161118
func(remotePath string, size int64) error {
11171119
// we have checked in LoadDataExec.Next
11181120
//nolint: errcheck
11191121
match, _ := filepath.Match(escapedPath, remotePath)
11201122
if !match {
11211123
return nil
11221124
}
1123-
compressTp := mydump.ParseCompressionOnFileExtension(remotePath)
1125+
allFiles = append(allFiles, mydump.RawFile{Path: remotePath, Size: size})
1126+
totalSize += size
1127+
return nil
1128+
}); err != nil {
1129+
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err), "failed to walk dir")
1130+
}
1131+
1132+
var err error
1133+
if dataFiles, err = mydump.ParallelProcess(ctx, allFiles, e.ThreadCnt*2,
1134+
func(ctx context.Context, f mydump.RawFile) (*mydump.SourceFileMeta, error) {
1135+
path, size := f.Path, f.Size
1136+
compressTp := mydump.ParseCompressionOnFileExtension(path)
11241137
fileMeta := mydump.SourceFileMeta{
1125-
Path: remotePath,
1138+
Path: path,
11261139
FileSize: size,
11271140
Compression: compressTp,
11281141
Type: sourceType,
11291142
}
1130-
fileMeta.RealSize = e.getFileRealSize(ctx, fileMeta, s)
1131-
dataFiles = append(dataFiles, &fileMeta)
1132-
totalSize += size
1133-
return nil
1134-
})
1135-
if err != nil {
1136-
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err), "failed to walk dir")
1143+
fileMeta.RealSize = mydump.EstimateRealSizeForFile(ctx, fileMeta, s)
1144+
return &fileMeta, nil
1145+
}); err != nil {
1146+
return err
11371147
}
11381148
}
11391149

@@ -1142,19 +1152,6 @@ func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
11421152
return nil
11431153
}
11441154

1145-
func (e *LoadDataController) getFileRealSize(ctx context.Context,
1146-
fileMeta mydump.SourceFileMeta, store storage.ExternalStorage) int64 {
1147-
if fileMeta.Compression == mydump.CompressionNone {
1148-
return fileMeta.FileSize
1149-
}
1150-
compressRatio, err := mydump.SampleFileCompressRatio(ctx, fileMeta, store)
1151-
if err != nil {
1152-
e.logger.Warn("failed to get compress ratio", zap.String("file", fileMeta.Path), zap.Error(err))
1153-
return fileMeta.FileSize
1154-
}
1155-
return int64(compressRatio * float64(fileMeta.FileSize))
1156-
}
1157-
11581155
func (e *LoadDataController) getSourceType() mydump.SourceType {
11591156
switch e.Format {
11601157
case DataFormatParquet:

pkg/executor/importer/import_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@ import (
2828

2929
"github.com/pingcap/errors"
3030
"github.com/pingcap/failpoint"
31-
"github.com/pingcap/log"
3231
berrors "github.com/pingcap/tidb/br/pkg/errors"
3332
"github.com/pingcap/tidb/pkg/expression"
3433
tidbkv "github.com/pingcap/tidb/pkg/kv"
3534
"github.com/pingcap/tidb/pkg/lightning/config"
36-
"github.com/pingcap/tidb/pkg/lightning/mydump"
3735
"github.com/pingcap/tidb/pkg/parser"
3836
"github.com/pingcap/tidb/pkg/parser/ast"
3937
plannercore "github.com/pingcap/tidb/pkg/planner/core"
@@ -236,22 +234,6 @@ func TestASTArgsFromStmt(t *testing.T) {
236234
require.Equal(t, astArgs.ColumnsAndUserVars, importIntoStmt.ColumnsAndUserVars)
237235
}
238236

239-
func TestGetFileRealSize(t *testing.T) {
240-
err := failpoint.Enable("github.com/pingcap/tidb/pkg/lightning/mydump/SampleFileCompressPercentage", "return(250)")
241-
require.NoError(t, err)
242-
defer func() {
243-
_ = failpoint.Disable("github.com/pingcap/tidb/pkg/lightning/mydump/SampleFileCompressPercentage")
244-
}()
245-
fileMeta := mydump.SourceFileMeta{Compression: mydump.CompressionNone, FileSize: 100}
246-
c := &LoadDataController{logger: log.L()}
247-
require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil))
248-
fileMeta.Compression = mydump.CompressionGZ
249-
require.Equal(t, int64(250), c.getFileRealSize(context.Background(), fileMeta, nil))
250-
err = failpoint.Enable("github.com/pingcap/tidb/pkg/lightning/mydump/SampleFileCompressPercentage", `return("test err")`)
251-
require.NoError(t, err)
252-
require.Equal(t, int64(100), c.getFileRealSize(context.Background(), fileMeta, nil))
253-
}
254-
255237
func urlEqual(t *testing.T, expected, actual string) {
256238
urlExpected, err := url.Parse(expected)
257239
require.NoError(t, err)

pkg/lightning/mydump/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ go_test(
8282
"//pkg/util/table-filter",
8383
"//pkg/util/table-router",
8484
"@com_github_pingcap_errors//:errors",
85+
"@com_github_pingcap_failpoint//:failpoint",
8586
"@com_github_stretchr_testify//assert",
8687
"@com_github_stretchr_testify//require",
8788
"@com_github_xitongsys_parquet_go//parquet",

0 commit comments

Comments
 (0)