Skip to content

Commit 27f8ff3

Browse files
authored
dxf: use global entry_size_limit when executing SQLs (#59516)
close #59506
1 parent e13fa57 commit 27f8ff3

File tree

3 files changed

+39
-4
lines changed

3 files changed

+39
-4
lines changed

pkg/disttask/framework/storage/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ go_test(
4242
embed = [":storage"],
4343
flaky = True,
4444
race = "on",
45-
shard_count = 23,
45+
shard_count = 24,
4646
deps = [
4747
"//pkg/config",
4848
"//pkg/disttask/framework/proto",

pkg/disttask/framework/storage/table_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,3 +1185,30 @@ func TestGetActiveTaskExecInfo(t *testing.T) {
11851185
require.NoError(t, err)
11861186
require.Empty(t, taskExecInfos)
11871187
}
1188+
1189+
func TestTaskManagerEntrySize(t *testing.T) {
1190+
store, tm, ctx := testutil.InitTableTest(t)
1191+
getMeta := func(l int) []byte {
1192+
meta := make([]byte, l)
1193+
for i := 0; i < l; i++ {
1194+
meta[i] = 'a'
1195+
}
1196+
return meta
1197+
}
1198+
insertSubtask := func(meta []byte) error {
1199+
return tm.WithNewSession(func(se sessionctx.Context) error {
1200+
_, err := sqlexec.ExecSQL(ctx, se.GetSQLExecutor(), `
1201+
insert into mysql.tidb_background_subtask(`+storage.InsertSubtaskColumns+`) values`+
1202+
`(%?, %?, %?, %?, %?, %?, %?, NULL, CURRENT_TIMESTAMP(), '{}', '{}')`,
1203+
1, "1", "execID", meta, proto.SubtaskStatePending, proto.Type2Int(proto.TaskTypeExample), 1)
1204+
return err
1205+
})
1206+
}
1207+
meta6m := getMeta(6 << 20)
1208+
require.ErrorContains(t, insertSubtask(meta6m), "entry too large")
1209+
tk := testkit.NewTestKit(t, store)
1210+
tk.MustExec(fmt.Sprintf("set global tidb_txn_entry_size_limit = %d", 7<<20))
1211+
require.NoError(t, insertSubtask(meta6m))
1212+
// TiKV also have a limit raftstore.raft-entry-max-size which is 8M by default,
1213+
// we won't test that param here
1214+
}

pkg/disttask/framework/storage/task_table.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,20 @@ func SetTaskManager(is *TaskManager) {
141141

142142
// WithNewSession executes the function with a new session.
143143
func (mgr *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) error {
144-
se, err := mgr.sePool.Get()
144+
v, err := mgr.sePool.Get()
145145
if err != nil {
146146
return err
147147
}
148-
defer mgr.sePool.Put(se)
149-
return fn(se.(sessionctx.Context))
148+
// when using global sort, the subtask meta might quite large as it include
149+
// filenames of all the generated kv/stat files.
150+
se := v.(sessionctx.Context)
151+
limitBak := se.GetSessionVars().TxnEntrySizeLimit
152+
defer func() {
153+
se.GetSessionVars().TxnEntrySizeLimit = limitBak
154+
mgr.sePool.Put(v)
155+
}()
156+
se.GetSessionVars().TxnEntrySizeLimit = vardef.TxnEntrySizeLimit.Load()
157+
return fn(se)
150158
}
151159

152160
// WithNewTxn executes the fn in a new transaction.

0 commit comments

Comments
 (0)