Skip to content

Commit eabda06

Browse files
authored
importinto: disable switching to 'import mode' when global sort (#60363) (#62028)
close #60361
1 parent 61d7f3c commit eabda06

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

pkg/executor/importer/import.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,30 @@ const (
7979
// 0 means no limit
8080
unlimitedWriteSpeed = config.ByteSize(0)
8181

82-
characterSetOption = "character_set"
83-
fieldsTerminatedByOption = "fields_terminated_by"
84-
fieldsEnclosedByOption = "fields_enclosed_by"
85-
fieldsEscapedByOption = "fields_escaped_by"
86-
fieldsDefinedNullByOption = "fields_defined_null_by"
87-
linesTerminatedByOption = "lines_terminated_by"
88-
skipRowsOption = "skip_rows"
89-
splitFileOption = "split_file"
90-
diskQuotaOption = "disk_quota"
91-
threadOption = "thread"
92-
maxWriteSpeedOption = "max_write_speed"
93-
checksumTableOption = "checksum_table"
94-
recordErrorsOption = "record_errors"
95-
detachedOption = "detached"
82+
characterSetOption = "character_set"
83+
fieldsTerminatedByOption = "fields_terminated_by"
84+
fieldsEnclosedByOption = "fields_enclosed_by"
85+
fieldsEscapedByOption = "fields_escaped_by"
86+
fieldsDefinedNullByOption = "fields_defined_null_by"
87+
linesTerminatedByOption = "lines_terminated_by"
88+
skipRowsOption = "skip_rows"
89+
splitFileOption = "split_file"
90+
diskQuotaOption = "disk_quota"
91+
threadOption = "thread"
92+
maxWriteSpeedOption = "max_write_speed"
93+
checksumTableOption = "checksum_table"
94+
recordErrorsOption = "record_errors"
95+
detachedOption = "detached"
96+
// if 'import mode' enabled, TiKV will:
97+
// - set level0_stop_writes_trigger = max(old, 1 << 30)
98+
// - set level0_slowdown_writes_trigger = max(old, 1 << 30)
99+
// - set soft_pending_compaction_bytes_limit = 0,
100+
// - set hard_pending_compaction_bytes_limit = 0,
101+
// - will not trigger flow control when SST count in L0 is large
102+
// - will not trigger region split, it might cause some region became
103+
// very large and be a hotspot, might cause latency spike.
104+
//
105+
// default false for local sort, true for global sort.
96106
disableTiKVImportModeOption = "disable_tikv_import_mode"
97107
cloudStorageURIOption = "cloud_storage_uri"
98108
disablePrecheckOption = "disable_precheck"
@@ -780,6 +790,9 @@ func (p *Plan) adjustOptions(targetNodeCPUCnt int) {
780790
zap.Int("before", p.ThreadCnt), zap.Int("after", limit))
781791
p.ThreadCnt = limit
782792
}
793+
if p.IsGlobalSort() {
794+
p.DisableTiKVImportMode = true
795+
}
783796
}
784797

785798
func (p *Plan) initParameters(plan *plannercore.ImportInto) error {

pkg/executor/importer/import_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,17 @@ func TestAdjustOptions(t *testing.T) {
189189
plan.adjustOptions(16)
190190
require.Equal(t, 16, plan.ThreadCnt)
191191
require.Equal(t, config.ByteSize(10), plan.MaxWriteSpeed) // not adjusted
192+
require.False(t, plan.DisableTiKVImportMode)
192193

193194
plan.ThreadCnt = 100000000
194195
plan.DataSourceType = DataSourceTypeQuery
195196
plan.adjustOptions(16)
196197
require.Equal(t, 32, plan.ThreadCnt)
198+
require.False(t, plan.DisableTiKVImportMode)
199+
200+
plan.CloudStorageURI = "s3://bucket/path"
201+
plan.adjustOptions(16)
202+
require.True(t, plan.DisableTiKVImportMode)
197203
}
198204

199205
func TestAdjustDiskQuota(t *testing.T) {

tests/realtikvtest/importintotest3/from_server_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,5 @@ func (s *mockGCSSuite) TestImportFromServer() {
7373
var taskMeta importinto.TaskMeta
7474
require.NoError(s.T(), json.Unmarshal(task.Meta, &taskMeta))
7575
require.Len(s.T(), taskMeta.ChunkMap, 2)
76+
require.False(s.T(), taskMeta.Plan.DisableTiKVImportMode)
7677
}

tests/realtikvtest/importintotest4/global_sort_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
9696
taskMeta := importinto.TaskMeta{}
9797
s.NoError(json.Unmarshal(task.Meta, &taskMeta))
9898
urlEqual(s.T(), redactedSortStorageURI, taskMeta.Plan.CloudStorageURI)
99+
require.True(s.T(), taskMeta.Plan.DisableTiKVImportMode)
99100

100101
// merge-sort data kv
101102
s.tk.MustExec("truncate table t")

0 commit comments

Comments
 (0)