Skip to content

Commit 56b2329

Browse files
lance6716ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#44333
Signed-off-by: ti-chi-bot <[email protected]>
1 parent bd94fb8 commit 56b2329

File tree

9 files changed

+158
-1
lines changed

9 files changed

+158
-1
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "mydump",
5+
srcs = [
6+
"bytes.go",
7+
"charset_convertor.go",
8+
"csv_parser.go",
9+
"loader.go",
10+
"parquet_parser.go",
11+
"parser.go",
12+
"parser_generated.go",
13+
"reader.go",
14+
"region.go",
15+
"router.go",
16+
],
17+
importpath = "github.com/pingcap/tidb/br/pkg/lightning/mydump",
18+
visibility = ["//visibility:public"],
19+
deps = [
20+
"//br/pkg/lightning/common",
21+
"//br/pkg/lightning/config",
22+
"//br/pkg/lightning/log",
23+
"//br/pkg/lightning/metric",
24+
"//br/pkg/lightning/worker",
25+
"//br/pkg/storage",
26+
"//config",
27+
"//parser/mysql",
28+
"//types",
29+
"//util/filter",
30+
"//util/mathutil",
31+
"//util/regexpr-router",
32+
"//util/slice",
33+
"//util/table-filter",
34+
"@com_github_pingcap_errors//:errors",
35+
"@com_github_spkg_bom//:bom",
36+
"@com_github_xitongsys_parquet_go//parquet",
37+
"@com_github_xitongsys_parquet_go//reader",
38+
"@com_github_xitongsys_parquet_go//source",
39+
"@org_golang_x_text//encoding",
40+
"@org_golang_x_text//encoding/simplifiedchinese",
41+
"@org_uber_go_zap//:zap",
42+
"@org_uber_go_zap//zapcore",
43+
],
44+
)
45+
46+
go_test(
47+
name = "mydump_test",
48+
timeout = "short",
49+
srcs = [
50+
"charset_convertor_test.go",
51+
"csv_parser_test.go",
52+
"loader_test.go",
53+
"main_test.go",
54+
"parquet_parser_test.go",
55+
"parser_test.go",
56+
"reader_test.go",
57+
"region_test.go",
58+
"router_test.go",
59+
],
60+
data = glob([
61+
"csv/*",
62+
"examples/*",
63+
"parquet/*",
64+
]),
65+
embed = [":mydump"],
66+
flaky = True,
67+
deps = [
68+
"//br/pkg/lightning/common",
69+
"//br/pkg/lightning/config",
70+
"//br/pkg/lightning/log",
71+
"//br/pkg/lightning/worker",
72+
"//br/pkg/mock/storage",
73+
"//br/pkg/storage",
74+
"//parser/mysql",
75+
"//testkit/testsetup",
76+
"//types",
77+
"//util/filter",
78+
"//util/table-filter",
79+
"//util/table-router",
80+
"@com_github_golang_mock//gomock",
81+
"@com_github_pingcap_errors//:errors",
82+
"@com_github_stretchr_testify//assert",
83+
"@com_github_stretchr_testify//require",
84+
"@com_github_xitongsys_parquet_go//writer",
85+
"@com_github_xitongsys_parquet_go_source//local",
86+
"@org_uber_go_goleak//:goleak",
87+
"@org_uber_go_zap//:zap",
88+
],
89+
)

br/pkg/lightning/mydump/csv_parser.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pingcap/errors"
2323
"github.com/pingcap/tidb/br/pkg/lightning/config"
2424
"github.com/pingcap/tidb/br/pkg/lightning/worker"
25+
tidbconfig "github.com/pingcap/tidb/config"
2526
"github.com/pingcap/tidb/types"
2627
"github.com/pingcap/tidb/util/mathutil"
2728
)
@@ -30,8 +31,14 @@ var (
3031
errUnterminatedQuotedField = errors.NewNoStackError("syntax error: unterminated quoted field")
3132
errDanglingBackslash = errors.NewNoStackError("syntax error: no character after backslash")
3233
errUnexpectedQuoteField = errors.NewNoStackError("syntax error: cannot have consecutive fields without separator")
34+
// LargestEntryLimit is the max size for reading file to buf
35+
LargestEntryLimit int
3336
)
3437

38+
func init() {
39+
LargestEntryLimit = tidbconfig.MaxTxnEntrySizeLimit
40+
}
41+
3542
// CSVParser is basically a copy of encoding/csv, but special-cased for MySQL-like input.
3643
type CSVParser struct {
3744
blockParser
@@ -331,6 +338,9 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) {
331338
var buf []byte
332339
for {
333340
buf = append(buf, parser.buf...)
341+
if len(buf) > LargestEntryLimit {
342+
return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit")
343+
}
334344
parser.buf = nil
335345
if err := parser.readBlock(); err != nil || len(parser.buf) == 0 {
336346
if err == nil {
@@ -442,9 +452,18 @@ outside:
442452

443453
func (parser *CSVParser) readQuotedField() error {
444454
for {
455+
prevPos := parser.pos
445456
content, terminator, err := parser.readUntil(&parser.quoteByteSet)
446-
err = parser.replaceEOF(err, errUnterminatedQuotedField)
447457
if err != nil {
458+
if errors.Cause(err) == io.EOF {
459+
// return the position of quote to the caller.
460+
// because we return an error here, the parser won't
461+
// use the `pos` again, so it's safe to modify it here.
462+
parser.pos = prevPos - 1
463+
// set buf to parser.buf in order to print err log
464+
parser.buf = content
465+
err = parser.replaceEOF(err, errUnterminatedQuotedField)
466+
}
448467
return err
449468
}
450469
parser.recordBuffer = append(parser.recordBuffer, content...)

br/pkg/lightning/mydump/csv_parser_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package mydump_test
22

33
import (
4+
"bytes"
45
"context"
56
"encoding/csv"
67
"fmt"
@@ -680,6 +681,29 @@ func TestConsecutiveFields(t *testing.T) {
680681
})
681682
}
682683

684+
func TestTooLargeRow(t *testing.T) {
685+
cfg := config.MydumperRuntime{
686+
CSV: config.CSVConfig{
687+
Separator: ",",
688+
Delimiter: `"`,
689+
},
690+
}
691+
var testCase bytes.Buffer
692+
testCase.WriteString("a,b,c,d")
693+
// WARN: will take up 10KB memory here.
694+
mydump.LargestEntryLimit = 10 * 1024
695+
for i := 0; i < mydump.LargestEntryLimit; i++ {
696+
testCase.WriteByte('d')
697+
}
698+
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
699+
require.NoError(t, err)
700+
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor)
701+
require.NoError(t, err)
702+
e := parser.ReadRow()
703+
require.Error(t, e)
704+
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
705+
}
706+
683707
func TestSpecialChars(t *testing.T) {
684708
cfg := config.MydumperRuntime{
685709
CSV: config.CSVConfig{Separator: ",", Delimiter: `"`},
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
create database if not exists db;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
create table test(a int primary key, b int, c int, d int);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
1,2,3,4
2+
2,10,4,5
3+
1111,",7,8

br/tests/lightning_csv/run.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,11 @@ for BACKEND in tidb local; do
4141
check_not_contains 'id:'
4242

4343
done
44+
45+
set +e
46+
run_lightning --backend local -d "tests/$TEST_NAME/errData" --log-file "$TEST_DIR/lightning-err.log" 2>/dev/null
47+
set -e
48+
# err content presented
49+
grep ",7,8" "$TEST_DIR/lightning-err.log"
50+
# pos should not set to end
51+
grep "[\"syntax error\"] [pos=22]" "$TEST_DIR/lightning-err.log"

config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ import (
4444
// Config number limitations
4545
const (
4646
MaxLogFileSize = 4096 // MB
47+
// MaxTxnEntrySize is the max value of TxnEntrySizeLimit.
48+
MaxTxnEntrySizeLimit = 120 * 1024 * 1024 // 120MB
4749
// DefTxnEntrySizeLimit is the default value of TxnEntrySizeLimit.
4850
DefTxnEntrySizeLimit = 6 * 1024 * 1024
4951
// DefTxnTotalSizeLimit is the default value of TxnTxnTotalSizeLimit.

tidb-server/main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,18 @@ func setGlobalVars() {
612612
}
613613
plannercore.AllowCartesianProduct.Store(cfg.Performance.CrossJoin)
614614
privileges.SkipWithGrant = cfg.Security.SkipGrantTable
615+
<<<<<<< HEAD
615616
kv.TxnTotalSizeLimit = cfg.Performance.TxnTotalSizeLimit
616617
if cfg.Performance.TxnEntrySizeLimit > 120*1024*1024 {
618+
=======
619+
if cfg.Performance.TxnTotalSizeLimit == config.DefTxnTotalSizeLimit {
620+
// practically deprecate the config, let the new session memory tracker take charge of it.
621+
kv.TxnTotalSizeLimit = config.SuperLargeTxnSize
622+
} else {
623+
kv.TxnTotalSizeLimit = cfg.Performance.TxnTotalSizeLimit
624+
}
625+
if cfg.Performance.TxnEntrySizeLimit > config.MaxTxnEntrySizeLimit {
626+
>>>>>>> bb2e845f712 (lightning: fix risk of OOM (#40443) (#44333))
617627
log.Fatal("cannot set txn entry size limit larger than 120M")
618628
}
619629
kv.TxnEntrySizeLimit = cfg.Performance.TxnEntrySizeLimit

0 commit comments

Comments
 (0)