Skip to content

Commit fe5a517

Browse files
authored
importinto: disable switching to 'import mode' when global sort (#60363)
close #60361
1 parent 7e32850 commit fe5a517

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

pkg/executor/importer/import.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,30 @@ const (
8383
// 0 means no limit
8484
unlimitedWriteSpeed = config.ByteSize(0)
8585

86-
characterSetOption = "character_set"
87-
fieldsTerminatedByOption = "fields_terminated_by"
88-
fieldsEnclosedByOption = "fields_enclosed_by"
89-
fieldsEscapedByOption = "fields_escaped_by"
90-
fieldsDefinedNullByOption = "fields_defined_null_by"
91-
linesTerminatedByOption = "lines_terminated_by"
92-
skipRowsOption = "skip_rows"
93-
splitFileOption = "split_file"
94-
diskQuotaOption = "disk_quota"
95-
threadOption = "thread"
96-
maxWriteSpeedOption = "max_write_speed"
97-
checksumTableOption = "checksum_table"
98-
recordErrorsOption = "record_errors"
99-
detachedOption = "detached"
86+
characterSetOption = "character_set"
87+
fieldsTerminatedByOption = "fields_terminated_by"
88+
fieldsEnclosedByOption = "fields_enclosed_by"
89+
fieldsEscapedByOption = "fields_escaped_by"
90+
fieldsDefinedNullByOption = "fields_defined_null_by"
91+
linesTerminatedByOption = "lines_terminated_by"
92+
skipRowsOption = "skip_rows"
93+
splitFileOption = "split_file"
94+
diskQuotaOption = "disk_quota"
95+
threadOption = "thread"
96+
maxWriteSpeedOption = "max_write_speed"
97+
checksumTableOption = "checksum_table"
98+
recordErrorsOption = "record_errors"
99+
detachedOption = "detached"
100+
// if 'import mode' enabled, TiKV will:
101+
// - set level0_stop_writes_trigger = max(old, 1 << 30)
102+
// - set level0_slowdown_writes_trigger = max(old, 1 << 30)
103+
// - set soft_pending_compaction_bytes_limit = 0,
104+
// - set hard_pending_compaction_bytes_limit = 0,
105+
// - will not trigger flow control when SST count in L0 is large
106+
// - will not trigger region split, it might cause some region became
107+
// very large and be a hotspot, might cause latency spike.
108+
//
109+
// default false for local sort, true for global sort.
100110
disableTiKVImportModeOption = "disable_tikv_import_mode"
101111
cloudStorageURIOption = "cloud_storage_uri"
102112
disablePrecheckOption = "disable_precheck"
@@ -792,6 +802,9 @@ func (p *Plan) adjustOptions(targetNodeCPUCnt int) {
792802
zap.Int("before", p.ThreadCnt), zap.Int("after", limit))
793803
p.ThreadCnt = limit
794804
}
805+
if p.IsGlobalSort() {
806+
p.DisableTiKVImportMode = true
807+
}
795808
}
796809

797810
func (p *Plan) initParameters(plan *plannercore.ImportInto) error {

pkg/executor/importer/import_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,17 @@ func TestAdjustOptions(t *testing.T) {
189189
plan.adjustOptions(16)
190190
require.Equal(t, 16, plan.ThreadCnt)
191191
require.Equal(t, config.ByteSize(10), plan.MaxWriteSpeed) // not adjusted
192+
require.False(t, plan.DisableTiKVImportMode)
192193

193194
plan.ThreadCnt = 100000000
194195
plan.DataSourceType = DataSourceTypeQuery
195196
plan.adjustOptions(16)
196197
require.Equal(t, 32, plan.ThreadCnt)
198+
require.False(t, plan.DisableTiKVImportMode)
199+
200+
plan.CloudStorageURI = "s3://bucket/path"
201+
plan.adjustOptions(16)
202+
require.True(t, plan.DisableTiKVImportMode)
197203
}
198204

199205
func TestAdjustDiskQuota(t *testing.T) {

tests/realtikvtest/importintotest3/from_server_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,5 @@ func (s *mockGCSSuite) TestImportFromServer() {
7373
var taskMeta importinto.TaskMeta
7474
require.NoError(s.T(), json.Unmarshal(task.Meta, &taskMeta))
7575
require.Len(s.T(), taskMeta.ChunkMap, 2)
76+
require.False(s.T(), taskMeta.Plan.DisableTiKVImportMode)
7677
}

tests/realtikvtest/importintotest4/global_sort_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
124124
taskMeta := importinto.TaskMeta{}
125125
s.NoError(json.Unmarshal(task.Meta, &taskMeta))
126126
urlEqual(s.T(), redactedSortStorageURI, taskMeta.Plan.CloudStorageURI)
127+
require.True(s.T(), taskMeta.Plan.DisableTiKVImportMode)
127128

128129
// merge-sort data kv
129130
s.tk.MustExec("truncate table t")

0 commit comments

Comments
 (0)