Skip to content

Commit f43a133

Browse files
authored
ddl: get scatter variable from executor session context (#61331) (#61388)
close #61321
1 parent 5a82d47 commit f43a133

File tree

9 files changed

+246
-20
lines changed

9 files changed

+246
-20
lines changed

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,12 @@ bazel_flashbacktest: failpoint-enable bazel_ci_simple_prepare
794794
-- //tests/realtikvtest/flashbacktest/...
795795
./build/jenkins_collect_coverage.sh
796796

797+
# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
798+
.PHONY: bazel_ddltest
799+
bazel_ddltest: failpoint-enable bazel_ci_simple_prepare
800+
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=$(REAL_TIKV_TEST_TAGS) \
801+
-- //tests/realtikvtest/ddltest/...
802+
797803
.PHONY: bazel_lint
798804
bazel_lint: bazel_prepare
799805
bazel build //... --//build:with_nogo_flag=$(NOGO_FLAG)

pkg/ddl/executor.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,7 +1117,9 @@ func (e *executor) createTableWithInfoJob(
11171117
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
11181118
InvolvingSchemaInfo: involvingSchemas,
11191119
SQLMode: ctx.GetSessionVars().SQLMode,
1120+
SessionVars: make(map[string]string),
11201121
}
1122+
job.AddSessionVars(variable.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
11211123
args := &model.CreateTableArgs{
11221124
TableInfo: tbInfo,
11231125
OnExistReplace: cfg.OnExist == OnExistReplace,
@@ -1149,13 +1151,14 @@ func (e *executor) createTableWithInfoPost(
11491151
ctx sessionctx.Context,
11501152
tbInfo *model.TableInfo,
11511153
schemaID int64,
1154+
scatterScope string,
11521155
) error {
11531156
var err error
11541157
var partitions []model.PartitionDefinition
11551158
if pi := tbInfo.GetPartitionInfo(); pi != nil {
11561159
partitions = pi.Definitions
11571160
}
1158-
preSplitAndScatter(ctx, e.store, tbInfo, partitions)
1161+
preSplitAndScatter(ctx, e.store, tbInfo, partitions, scatterScope)
11591162
if tbInfo.AutoIncID > 1 {
11601163
// Default tableAutoIncID base is 0.
11611164
// If the first ID is expected to greater than 1, we need to do rebase.
@@ -1210,7 +1213,11 @@ func (e *executor) CreateTableWithInfo(
12101213
err = nil
12111214
}
12121215
} else {
1213-
err = e.createTableWithInfoPost(ctx, tbInfo, jobW.SchemaID)
1216+
var scatterScope string
1217+
if val, ok := jobW.GetSessionVars(variable.TiDBScatterRegion); ok {
1218+
scatterScope = val
1219+
}
1220+
err = e.createTableWithInfoPost(ctx, tbInfo, jobW.SchemaID, scatterScope)
12141221
}
12151222

12161223
return errors.Trace(err)
@@ -1234,7 +1241,9 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context,
12341241
BinlogInfo: &model.HistoryInfo{},
12351242
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
12361243
SQLMode: ctx.GetSessionVars().SQLMode,
1244+
SessionVars: make(map[string]string),
12371245
}
1246+
job.AddSessionVars(variable.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
12381247

12391248
var err error
12401249

@@ -1300,9 +1309,12 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context,
13001309
}
13011310
return errors.Trace(err)
13021311
}
1303-
1312+
var scatterScope string
1313+
if val, ok := jobW.GetSessionVars(variable.TiDBScatterRegion); ok {
1314+
scatterScope = val
1315+
}
13041316
for _, tblArgs := range args.Tables {
1305-
if err = e.createTableWithInfoPost(ctx, tblArgs.TableInfo, jobW.SchemaID); err != nil {
1317+
if err = e.createTableWithInfoPost(ctx, tblArgs.TableInfo, jobW.SchemaID, scatterScope); err != nil {
13061318
return errors.Trace(err)
13071319
}
13081320
}
@@ -1365,24 +1377,16 @@ func (e *executor) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy
13651377

13661378
// preSplitAndScatter performs pre-split and scatter of the table's regions.
13671379
// If `pi` is not nil, will only split region for `pi`, this is used when add partition.
1368-
func preSplitAndScatter(ctx sessionctx.Context, store kv.Storage, tbInfo *model.TableInfo, parts []model.PartitionDefinition) {
1380+
func preSplitAndScatter(ctx sessionctx.Context, store kv.Storage, tbInfo *model.TableInfo, parts []model.PartitionDefinition, scatterScope string) {
1381+
failpoint.InjectCall("preSplitAndScatter", scatterScope)
13691382
if tbInfo.TempTableType != model.TempTableNone {
13701383
return
13711384
}
13721385
sp, ok := store.(kv.SplittableStore)
13731386
if !ok || atomic.LoadUint32(&EnableSplitTableRegion) == 0 {
13741387
return
13751388
}
1376-
var (
1377-
preSplit func()
1378-
scatterScope string
1379-
)
1380-
val, ok := ctx.GetSessionVars().GetSystemVar(variable.TiDBScatterRegion)
1381-
if !ok {
1382-
logutil.DDLLogger().Warn("get system variable met problem, won't scatter region")
1383-
} else {
1384-
scatterScope = val
1385-
}
1389+
var preSplit func()
13861390
if len(parts) > 0 {
13871391
preSplit = func() { splitPartitionTableRegion(ctx, sp, tbInfo, parts, scatterScope) }
13881392
} else {
@@ -2275,7 +2279,9 @@ func (e *executor) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, s
22752279
BinlogInfo: &model.HistoryInfo{},
22762280
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
22772281
SQLMode: ctx.GetSessionVars().SQLMode,
2282+
SessionVars: make(map[string]string),
22782283
}
2284+
job.AddSessionVars(variable.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
22792285
args := &model.TablePartitionArgs{
22802286
PartInfo: partInfo,
22812287
}
@@ -2786,7 +2792,9 @@ func (e *executor) TruncateTablePartition(ctx sessionctx.Context, ident ast.Iden
27862792
BinlogInfo: &model.HistoryInfo{},
27872793
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
27882794
SQLMode: ctx.GetSessionVars().SQLMode,
2795+
SessionVars: make(map[string]string),
27892796
}
2797+
job.AddSessionVars(variable.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
27902798
args := &model.TruncateTableArgs{
27912799
OldPartitionIDs: pids,
27922800
// job submitter will fill new partition IDs.
@@ -4282,7 +4290,9 @@ func (e *executor) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
42824290
BinlogInfo: &model.HistoryInfo{},
42834291
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
42844292
SQLMode: ctx.GetSessionVars().SQLMode,
4293+
SessionVars: make(map[string]string),
42854294
}
4295+
job.AddSessionVars(variable.TiDBScatterRegion, getScatterScopeFromSessionctx(ctx))
42864296
args := &model.TruncateTableArgs{
42874297
FKCheck: fkCheck,
42884298
OldPartitionIDs: oldPartitionIDs,
@@ -6874,3 +6884,14 @@ func NewDDLReorgMeta(ctx sessionctx.Context) *model.DDLReorgMeta {
68746884
Version: model.CurrentReorgMetaVersion,
68756885
}
68766886
}
6887+
6888+
func getScatterScopeFromSessionctx(sctx sessionctx.Context) string {
6889+
var scatterScope string
6890+
val, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBScatterRegion)
6891+
if !ok {
6892+
logutil.DDLLogger().Info("won't scatter region since system variable didn't set")
6893+
} else {
6894+
scatterScope = val
6895+
}
6896+
return scatterScope
6897+
}

pkg/ddl/partition.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,11 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
216216
}
217217
// For normal and replica finished table, move the `addingDefinitions` into `Definitions`.
218218
updatePartitionInfo(tblInfo)
219-
220-
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions)
219+
var scatterScope string
220+
if val, ok := job.GetSessionVars(variable.TiDBScatterRegion); ok {
221+
scatterScope = val
222+
}
223+
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions, scatterScope)
221224

222225
tblInfo.Partition.DDLState = model.StateNone
223226
tblInfo.Partition.DDLAction = model.ActionNone
@@ -2563,7 +2566,11 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
25632566
if err != nil {
25642567
return ver, errors.Trace(err)
25652568
}
2566-
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newDefinitions)
2569+
var scatterScope string
2570+
if val, ok := job.GetSessionVars(variable.TiDBScatterRegion); ok {
2571+
scatterScope = val
2572+
}
2573+
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newDefinitions, scatterScope)
25672574
failpoint.Inject("truncatePartFail1", func(val failpoint.Value) {
25682575
if val.(bool) {
25692576
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2

pkg/ddl/table.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,11 @@ func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64,
581581
if pi := tblInfo.GetPartitionInfo(); pi != nil {
582582
partitions = tblInfo.GetPartitionInfo().Definitions
583583
}
584-
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, partitions)
584+
var scatterScope string
585+
if val, ok := job.GetSessionVars(variable.TiDBScatterRegion); ok {
586+
scatterScope = val
587+
}
588+
preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, partitions, scatterScope)
585589

586590
ver, err = updateSchemaVersion(jobCtx, job)
587591
if err != nil {

pkg/meta/model/job.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ type Job struct {
384384

385385
// SQLMode for executing DDL query.
386386
SQLMode mysql.SQLMode `json:"sql_mode"`
387+
388+
// SessionVars store session variables
389+
SessionVars map[string]string `json:"session_vars,omitempty"`
387390
}
388391

389392
// FinishTableJob is called when a job is finished.
@@ -784,6 +787,17 @@ func (job *Job) InFinalState() bool {
784787
return job.State == JobStateSynced || job.State == JobStateCancelled || job.State == JobStatePaused
785788
}
786789

790+
// AddSessionVars add a session variable in DDL job.
791+
func (job *Job) AddSessionVars(name, value string) {
792+
job.SessionVars[name] = value
793+
}
794+
795+
// GetSessionVars get a session variable in DDL job.
796+
func (job *Job) GetSessionVars(name string) (string, bool) {
797+
value, ok := job.SessionVars[name]
798+
return value, ok
799+
}
800+
787801
// MayNeedReorg indicates that this job may need to reorganize the data.
788802
func (job *Job) MayNeedReorg() bool {
789803
switch job.Type {

pkg/meta/model/job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func TestJobSize(t *testing.T) {
328328
- SubJob.ToProxyJob()
329329
`
330330
job := Job{}
331-
require.Equal(t, 400, int(unsafe.Sizeof(job)), msg)
331+
require.Equal(t, 408, int(unsafe.Sizeof(job)), msg)
332332
}
333333

334334
func TestBackfillMetaCodec(t *testing.T) {
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_test")
2+
3+
go_test(
4+
name = "ddltest_test",
5+
timeout = "short",
6+
srcs = [
7+
"main_test.go",
8+
"scatter_region_test.go",
9+
],
10+
flaky = True,
11+
deps = [
12+
"//pkg/config",
13+
"//pkg/ddl",
14+
"//pkg/testkit",
15+
"//pkg/testkit/testfailpoint",
16+
"//tests/realtikvtest",
17+
"@com_github_stretchr_testify//require",
18+
],
19+
)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ddltest
16+
17+
import (
18+
"testing"
19+
20+
"github.com/pingcap/tidb/pkg/config"
21+
"github.com/pingcap/tidb/tests/realtikvtest"
22+
)
23+
24+
func TestMain(m *testing.M) {
25+
config.UpdateGlobal(func(conf *config.Config) {
26+
conf.Store = "tikv"
27+
})
28+
realtikvtest.RunTestMain(m)
29+
}

0 commit comments

Comments
 (0)