Skip to content

Commit 72e5323

Browse files
authored
lightning: a way to estimate parquet file size (pingcap#46984) (pingcap#48908)
close pingcap#46980
1 parent 5c1e065 commit 72e5323

File tree

3 files changed

+123
-2
lines changed

3 files changed

+123
-2
lines changed

br/pkg/lightning/mydump/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ go_test(
8484
"@com_github_pingcap_errors//:errors",
8585
"@com_github_stretchr_testify//assert",
8686
"@com_github_stretchr_testify//require",
87+
"@com_github_xitongsys_parquet_go//parquet",
8788
"@com_github_xitongsys_parquet_go//writer",
8889
"@com_github_xitongsys_parquet_go_source//local",
8990
"@org_uber_go_goleak//:goleak",

br/pkg/lightning/mydump/loader.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ import (
3232
)
3333

3434
// sampleCompressedFileSize represents how many bytes need to be sampled for compressed files
35-
const sampleCompressedFileSize = 4 * 1024
35+
const (
36+
sampleCompressedFileSize = 4 * 1024
37+
maxSampleParquetDataSize = 8 * 1024
38+
maxSampleParquetRowCount = 500
39+
)
3640

3741
// MDDatabaseMeta contains some parsed metadata for a database in the source by MyDumper Loader.
3842
type MDDatabaseMeta struct {
@@ -484,7 +488,7 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
484488
s.tableSchemas = append(s.tableSchemas, info)
485489
case SourceTypeViewSchema:
486490
s.viewSchemas = append(s.viewSchemas, info)
487-
case SourceTypeSQL, SourceTypeCSV, SourceTypeParquet:
491+
case SourceTypeSQL, SourceTypeCSV:
488492
if info.FileMeta.Compression != CompressionNone {
489493
compressRatio, err2 := SampleFileCompressRatio(ctx, info.FileMeta, s.loader.GetStore())
490494
if err2 != nil {
@@ -495,6 +499,15 @@ func (s *mdLoaderSetup) constructFileInfo(ctx context.Context, path string, size
495499
}
496500
}
497501
s.tableDatas = append(s.tableDatas, info)
502+
case SourceTypeParquet:
503+
parquestDataSize, err2 := SampleParquetDataSize(ctx, info.FileMeta, s.loader.GetStore())
504+
if err2 != nil {
505+
logger.Error("fail to sample parquet data size", zap.String("category", "loader"),
506+
zap.String("schema", res.Schema), zap.String("table", res.Name), zap.Stringer("type", res.Type), zap.Error(err2))
507+
} else {
508+
info.FileMeta.RealSize = parquestDataSize
509+
}
510+
s.tableDatas = append(s.tableDatas, info)
498511
}
499512

500513
logger.Debug("file route result", zap.String("schema", res.Schema),
@@ -768,3 +781,47 @@ func SampleFileCompressRatio(ctx context.Context, fileMeta SourceFileMeta, store
768781
}
769782
return float64(tot) / float64(pos), nil
770783
}
784+
785+
// SampleParquetDataSize samples the data size of the parquet file.
786+
func SampleParquetDataSize(ctx context.Context, fileMeta SourceFileMeta, store storage.ExternalStorage) (int64, error) {
787+
totalRowCount, err := ReadParquetFileRowCountByFile(ctx, store, fileMeta)
788+
if err != nil {
789+
return 0, err
790+
}
791+
792+
reader, err := store.Open(ctx, fileMeta.Path)
793+
if err != nil {
794+
return 0, err
795+
}
796+
parser, err := NewParquetParser(ctx, store, reader, fileMeta.Path)
797+
if err != nil {
798+
//nolint: errcheck
799+
reader.Close()
800+
return 0, err
801+
}
802+
//nolint: errcheck
803+
defer parser.Close()
804+
805+
var (
806+
rowSize int64
807+
rowCount int64
808+
)
809+
for {
810+
err = parser.ReadRow()
811+
if err != nil {
812+
if errors.Cause(err) == io.EOF {
813+
break
814+
}
815+
return 0, err
816+
}
817+
lastRow := parser.LastRow()
818+
rowCount++
819+
rowSize += int64(lastRow.Length)
820+
parser.RecycleRow(lastRow)
821+
if rowSize > maxSampleParquetDataSize || rowCount > maxSampleParquetRowCount {
822+
break
823+
}
824+
}
825+
size := int64(float64(totalRowCount) / float64(rowCount) * float64(rowSize))
826+
return size, nil
827+
}

br/pkg/lightning/mydump/loader_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import (
1919
"compress/gzip"
2020
"context"
2121
"fmt"
22+
"math/rand"
2223
"os"
2324
"path/filepath"
2425
"testing"
26+
"time"
2527

2628
"github.com/pingcap/tidb/br/pkg/lightning/common"
2729
"github.com/pingcap/tidb/br/pkg/lightning/config"
@@ -32,6 +34,8 @@ import (
3234
router "github.com/pingcap/tidb/util/table-router"
3335
"github.com/stretchr/testify/assert"
3436
"github.com/stretchr/testify/require"
37+
"github.com/xitongsys/parquet-go/parquet"
38+
"github.com/xitongsys/parquet-go/writer"
3539
)
3640

3741
type testMydumpLoaderSuite struct {
@@ -1103,3 +1107,62 @@ func TestSampleFileCompressRatio(t *testing.T) {
11031107
require.NoError(t, err)
11041108
require.InDelta(t, ratio, 5000.0/float64(bf.Len()), 1e-5)
11051109
}
1110+
1111+
func TestSampleParquetDataSize(t *testing.T) {
1112+
s := newTestMydumpLoaderSuite(t)
1113+
store, err := storage.NewLocalStorage(s.sourceDir)
1114+
require.NoError(t, err)
1115+
1116+
type row struct {
1117+
ID int64 `parquet:"name=id, type=INT64"`
1118+
Key string `parquet:"name=key, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
1119+
Value string `parquet:"name=value, type=BYTE_ARRAY, encoding=PLAIN_DICTIONARY"`
1120+
}
1121+
1122+
ctx, cancel := context.WithCancel(context.Background())
1123+
defer cancel()
1124+
1125+
byteArray := make([]byte, 0, 40*1024)
1126+
bf := bytes.NewBuffer(byteArray)
1127+
pwriter, err := writer.NewParquetWriterFromWriter(bf, new(row), 4)
1128+
require.NoError(t, err)
1129+
pwriter.RowGroupSize = 128 * 1024 * 1024 //128M
1130+
pwriter.PageSize = 8 * 1024 //8K
1131+
pwriter.CompressionType = parquet.CompressionCodec_SNAPPY
1132+
seed := time.Now().Unix()
1133+
t.Logf("seed: %d", seed)
1134+
rand.Seed(seed)
1135+
totalRowSize := 0
1136+
for i := 0; i < 1000; i++ {
1137+
kl := rand.Intn(20) + 1
1138+
key := make([]byte, kl)
1139+
kl, err = rand.Read(key)
1140+
require.NoError(t, err)
1141+
vl := rand.Intn(20) + 1
1142+
value := make([]byte, vl)
1143+
vl, err = rand.Read(value)
1144+
require.NoError(t, err)
1145+
1146+
totalRowSize += kl + vl + 8
1147+
row := row{
1148+
ID: int64(i),
1149+
Key: string(key[:kl]),
1150+
Value: string(value[:vl]),
1151+
}
1152+
err = pwriter.Write(row)
1153+
require.NoError(t, err)
1154+
}
1155+
err = pwriter.WriteStop()
1156+
require.NoError(t, err)
1157+
1158+
fileName := "test_1.t1.parquet"
1159+
err = store.WriteFile(ctx, fileName, bf.Bytes())
1160+
require.NoError(t, err)
1161+
1162+
size, err := md.SampleParquetDataSize(ctx, md.SourceFileMeta{
1163+
Path: fileName,
1164+
}, store)
1165+
require.NoError(t, err)
1166+
// expected error within 10%, so delta = totalRowSize / 10
1167+
require.InDelta(t, totalRowSize, size, float64(totalRowSize)/10)
1168+
}

0 commit comments

Comments
 (0)