Skip to content

Commit eeccd2e

Browse files
authored
ddl: get scatter variable from executor session context (#61331)
close #61321
1 parent f60f68a commit eeccd2e

File tree

9 files changed

+249
-20
lines changed

9 files changed

+249
-20
lines changed

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,12 @@ bazel_flashbacktest: failpoint-enable bazel_ci_simple_prepare
783783
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=$(REAL_TIKV_TEST_TAGS) \
784784
-- //tests/realtikvtest/flashbacktest/...
785785

786+
# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
787+
.PHONY: bazel_ddltest
788+
bazel_ddltest: failpoint-enable bazel_ci_simple_prepare
789+
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=$(REAL_TIKV_TEST_TAGS) \
790+
-- //tests/realtikvtest/ddltest/...
791+
786792
.PHONY: bazel_lint
787793
bazel_lint: bazel_prepare
788794
bazel build //... --//build:with_nogo_flag=$(NOGO_FLAG)

pkg/ddl/executor.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,9 @@ func (e *executor) createTableWithInfoJob(
11281128
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
11291129
InvolvingSchemaInfo: involvingSchemas,
11301130
SQLMode: ctx.GetSessionVars().SQLMode,
1131+
SessionVars: make(map[string]string),
11311132
}
1133+
job.AddSessionVars(vardef.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
11321134
args := &model.CreateTableArgs{
11331135
TableInfo: tbInfo,
11341136
OnExistReplace: cfg.OnExist == OnExistReplace,
@@ -1160,13 +1162,14 @@ func (e *executor) createTableWithInfoPost(
11601162
ctx sessionctx.Context,
11611163
tbInfo *model.TableInfo,
11621164
schemaID int64,
1165+
scatterScope string,
11631166
) error {
11641167
var err error
11651168
var partitions []model.PartitionDefinition
11661169
if pi := tbInfo.GetPartitionInfo(); pi != nil {
11671170
partitions = pi.Definitions
11681171
}
1169-
preSplitAndScatter(ctx, e.store, tbInfo, partitions)
1172+
preSplitAndScatter(ctx, e.store, tbInfo, partitions, scatterScope)
11701173
if tbInfo.AutoIncID > 1 {
11711174
// Default tableAutoIncID base is 0.
11721175
// If the first ID is expected to greater than 1, we need to do rebase.
@@ -1221,7 +1224,11 @@ func (e *executor) CreateTableWithInfo(
12211224
err = nil
12221225
}
12231226
} else {
1224-
err = e.createTableWithInfoPost(ctx, tbInfo, jobW.SchemaID)
1227+
var scatterScope string
1228+
if val, ok := jobW.GetSessionVars(vardef.TiDBScatterRegion); ok {
1229+
scatterScope = val
1230+
}
1231+
err = e.createTableWithInfoPost(ctx, tbInfo, jobW.SchemaID, scatterScope)
12251232
}
12261233

12271234
return errors.Trace(err)
@@ -1245,7 +1252,9 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context,
12451252
BinlogInfo: &model.HistoryInfo{},
12461253
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
12471254
SQLMode: ctx.GetSessionVars().SQLMode,
1255+
SessionVars: make(map[string]string),
12481256
}
1257+
job.AddSessionVars(vardef.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
12491258

12501259
var err error
12511260

@@ -1311,9 +1320,12 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context,
13111320
}
13121321
return errors.Trace(err)
13131322
}
1314-
1323+
var scatterScope string
1324+
if val, ok := jobW.GetSessionVars(vardef.TiDBScatterRegion); ok {
1325+
scatterScope = val
1326+
}
13151327
for _, tblArgs := range args.Tables {
1316-
if err = e.createTableWithInfoPost(ctx, tblArgs.TableInfo, jobW.SchemaID); err != nil {
1328+
if err = e.createTableWithInfoPost(ctx, tblArgs.TableInfo, jobW.SchemaID, scatterScope); err != nil {
13171329
return errors.Trace(err)
13181330
}
13191331
}
@@ -1376,24 +1388,16 @@ func (e *executor) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy
13761388

13771389
// preSplitAndScatter performs pre-split and scatter of the table's regions.
13781390
// If `pi` is not nil, will only split region for `pi`, this is used when add partition.
1379-
func preSplitAndScatter(ctx sessionctx.Context, store kv.Storage, tbInfo *model.TableInfo, parts []model.PartitionDefinition) {
1391+
func preSplitAndScatter(ctx sessionctx.Context, store kv.Storage, tbInfo *model.TableInfo, parts []model.PartitionDefinition, scatterScope string) {
1392+
failpoint.InjectCall("preSplitAndScatter", scatterScope)
13801393
if tbInfo.TempTableType != model.TempTableNone {
13811394
return
13821395
}
13831396
sp, ok := store.(kv.SplittableStore)
13841397
if !ok || atomic.LoadUint32(&EnableSplitTableRegion) == 0 {
13851398
return
13861399
}
1387-
var (
1388-
preSplit func()
1389-
scatterScope string
1390-
)
1391-
val, ok := ctx.GetSessionVars().GetSystemVar(vardef.TiDBScatterRegion)
1392-
if !ok {
1393-
logutil.DDLLogger().Warn("get system variable met problem, won't scatter region")
1394-
} else {
1395-
scatterScope = val
1396-
}
1400+
var preSplit func()
13971401
if len(parts) > 0 {
13981402
preSplit = func() { splitPartitionTableRegion(ctx, sp, tbInfo, parts, scatterScope) }
13991403
} else {
@@ -2286,7 +2290,9 @@ func (e *executor) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, s
22862290
BinlogInfo: &model.HistoryInfo{},
22872291
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
22882292
SQLMode: ctx.GetSessionVars().SQLMode,
2293+
SessionVars: make(map[string]string),
22892294
}
2295+
job.AddSessionVars(vardef.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
22902296
args := &model.TablePartitionArgs{
22912297
PartInfo: partInfo,
22922298
}
@@ -2797,7 +2803,9 @@ func (e *executor) TruncateTablePartition(ctx sessionctx.Context, ident ast.Iden
27972803
BinlogInfo: &model.HistoryInfo{},
27982804
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
27992805
SQLMode: ctx.GetSessionVars().SQLMode,
2806+
SessionVars: make(map[string]string),
28002807
}
2808+
job.AddSessionVars(vardef.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
28012809
args := &model.TruncateTableArgs{
28022810
OldPartitionIDs: pids,
28032811
// job submitter will fill new partition IDs.
@@ -4299,7 +4307,9 @@ func (e *executor) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
42994307
BinlogInfo: &model.HistoryInfo{},
43004308
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
43014309
SQLMode: ctx.GetSessionVars().SQLMode,
4310+
SessionVars: make(map[string]string),
43024311
}
4312+
job.AddSessionVars(vardef.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
43034313
args := &model.TruncateTableArgs{
43044314
FKCheck: fkCheck,
43054315
OldPartitionIDs: oldPartitionIDs,
@@ -7080,3 +7090,14 @@ func (e *executor) RefreshMeta(sctx sessionctx.Context, args *model.RefreshMetaA
70807090
err := e.doDDLJob2(sctx, job, args)
70817091
return errors.Trace(err)
70827092
}
7093+
7094+
func getScatterScopeFromSessionctx(sctx sessionctx.Context) string {
7095+
var scatterScope string
7096+
val, ok := sctx.GetSessionVars().GetSystemVar(vardef.TiDBScatterRegion)
7097+
if !ok {
7098+
logutil.DDLLogger().Info("won't scatter region since system variable didn't set")
7099+
} else {
7100+
scatterScope = val
7101+
}
7102+
return scatterScope
7103+
}

pkg/ddl/partition.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,11 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
217217
}
218218
// For normal and replica finished table, move the `addingDefinitions` into `Definitions`.
219219
updatePartitionInfo(tblInfo)
220-
221-
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions)
220+
var scatterScope string
221+
if val, ok := job.GetSessionVars(vardef.TiDBScatterRegion); ok {
222+
scatterScope = val
223+
}
224+
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions, scatterScope)
222225

223226
tblInfo.Partition.DDLState = model.StateNone
224227
tblInfo.Partition.DDLAction = model.ActionNone
@@ -2551,7 +2554,11 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
25512554
if err != nil {
25522555
return ver, errors.Trace(err)
25532556
}
2554-
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newDefinitions)
2557+
var scatterScope string
2558+
if val, ok := job.GetSessionVars(vardef.TiDBScatterRegion); ok {
2559+
scatterScope = val
2560+
}
2561+
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newDefinitions, scatterScope)
25552562
failpoint.Inject("truncatePartFail1", func(val failpoint.Value) {
25562563
if val.(bool) {
25572564
job.ErrorCount += vardef.GetDDLErrorCountLimit() / 2

pkg/ddl/table.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,11 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
581581
if pi := tblInfo.GetPartitionInfo(); pi != nil {
582582
partitions = tblInfo.GetPartitionInfo().Definitions
583583
}
584-
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, partitions)
584+
var scatterScope string
585+
if val, ok := job.GetSessionVars(vardef.TiDBScatterRegion); ok {
586+
scatterScope = val
587+
}
588+
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, partitions, scatterScope)
585589

586590
ver, err = updateSchemaVersion(jobCtx, job)
587591
if err != nil {

pkg/meta/model/job.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,9 @@ type Job struct {
389389

390390
// SQLMode for executing DDL query.
391391
SQLMode mysql.SQLMode `json:"sql_mode"`
392+
393+
// SessionVars store session variables
394+
SessionVars map[string]string `json:"session_vars,omitempty"`
392395
}
393396

394397
// FinishTableJob is called when a job is finished.
@@ -693,6 +696,17 @@ func (job *Job) InFinalState() bool {
693696
return job.State == JobStateSynced || job.State == JobStateCancelled || job.State == JobStatePaused
694697
}
695698

699+
// AddSessionVars add a session variable in DDL job.
700+
func (job *Job) AddSessionVars(name, value string) {
701+
job.SessionVars[name] = value
702+
}
703+
704+
// GetSessionVars get a session variable in DDL job.
705+
func (job *Job) GetSessionVars(name string) (string, bool) {
706+
value, ok := job.SessionVars[name]
707+
return value, ok
708+
}
709+
696710
// MayNeedReorg indicates that this job may need to reorganize the data.
697711
func (job *Job) MayNeedReorg() bool {
698712
switch job.Type {

pkg/meta/model/job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func TestJobSize(t *testing.T) {
161161
- SubJob.FromProxyJob()
162162
- SubJob.ToProxyJob()
163163
`
164-
require.Equal(t, 400, int(unsafe.Sizeof(Job{})), msg)
164+
require.Equal(t, 408, int(unsafe.Sizeof(Job{})), msg)
165165
}
166166

167167
func TestBackfillMetaCodec(t *testing.T) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_test")
2+
3+
go_test(
4+
name = "ddltest_test",
5+
timeout = "short",
6+
srcs = [
7+
"main_test.go",
8+
"scatter_region_test.go",
9+
],
10+
flaky = True,
11+
deps = [
12+
"//pkg/config",
13+
"//pkg/ddl",
14+
"//pkg/testkit",
15+
"//pkg/testkit/testfailpoint",
16+
"//tests/realtikvtest",
17+
"//tests/realtikvtest/testutils",
18+
"@com_github_stretchr_testify//require",
19+
],
20+
)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ddltest
16+
17+
import (
18+
"testing"
19+
20+
"github.com/pingcap/tidb/pkg/config"
21+
"github.com/pingcap/tidb/tests/realtikvtest"
22+
"github.com/pingcap/tidb/tests/realtikvtest/testutils"
23+
)
24+
25+
func TestMain(m *testing.M) {
26+
config.UpdateGlobal(func(conf *config.Config) {
27+
conf.Store = config.StoreTypeTiKV
28+
})
29+
testutils.UpdateTiDBConfig()
30+
realtikvtest.RunTestMain(m)
31+
}

0 commit comments

Comments
 (0)