Skip to content

Commit d282dcb

Browse files
lcwangchaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#56518
Signed-off-by: ti-chi-bot <[email protected]>
1 parent eb9d3bf commit d282dcb

File tree

6 files changed

+204
-3
lines changed

6 files changed

+204
-3
lines changed

ttl/session/BUILD.bazel

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
importpath = "github.com/pingcap/tidb/ttl/session",
77
visibility = ["//visibility:public"],
88
deps = [
9+
<<<<<<< HEAD:ttl/session/BUILD.bazel
910
"//infoschema",
1011
"//kv",
1112
"//parser/terror",
@@ -15,6 +16,19 @@ go_library(
1516
"//ttl/metrics",
1617
"//util/chunk",
1718
"//util/sqlexec",
19+
=======
20+
"//pkg/infoschema",
21+
"//pkg/kv",
22+
"//pkg/parser/terror",
23+
"//pkg/sessionctx",
24+
"//pkg/sessionctx/variable",
25+
"//pkg/sessiontxn",
26+
"//pkg/ttl/metrics",
27+
"//pkg/util/chunk",
28+
"//pkg/util/sqlexec",
29+
"//pkg/util/sqlkiller",
30+
"//pkg/util/timeutil",
31+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/session/BUILD.bazel
1832
"@com_github_pingcap_errors//:errors",
1933
],
2034
)
@@ -28,12 +42,19 @@ go_test(
2842
"sysvar_test.go",
2943
],
3044
flaky = True,
31-
shard_count = 6,
45+
shard_count = 7,
3246
deps = [
3347
":session",
48+
<<<<<<< HEAD:ttl/session/BUILD.bazel
3449
"//sessionctx/variable",
3550
"//testkit",
3651
"//testkit/testsetup",
52+
=======
53+
"//pkg/sessionctx/variable",
54+
"//pkg/testkit",
55+
"//pkg/testkit/testsetup",
56+
"//pkg/util",
57+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/session/BUILD.bazel
3758
"@com_github_pingcap_errors//:errors",
3859
"@com_github_stretchr_testify//require",
3960
"@org_uber_go_goleak//:goleak",

ttl/session/session.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/pingcap/errors"
22+
<<<<<<< HEAD:ttl/session/session.go
2223
"github.com/pingcap/tidb/infoschema"
2324
"github.com/pingcap/tidb/kv"
2425
"github.com/pingcap/tidb/parser/terror"
@@ -28,6 +29,19 @@ import (
2829
"github.com/pingcap/tidb/ttl/metrics"
2930
"github.com/pingcap/tidb/util/chunk"
3031
"github.com/pingcap/tidb/util/sqlexec"
32+
=======
33+
"github.com/pingcap/tidb/pkg/infoschema"
34+
"github.com/pingcap/tidb/pkg/kv"
35+
"github.com/pingcap/tidb/pkg/parser/terror"
36+
"github.com/pingcap/tidb/pkg/sessionctx"
37+
"github.com/pingcap/tidb/pkg/sessionctx/variable"
38+
"github.com/pingcap/tidb/pkg/sessiontxn"
39+
"github.com/pingcap/tidb/pkg/ttl/metrics"
40+
"github.com/pingcap/tidb/pkg/util/chunk"
41+
"github.com/pingcap/tidb/pkg/util/sqlexec"
42+
"github.com/pingcap/tidb/pkg/util/sqlkiller"
43+
"github.com/pingcap/tidb/pkg/util/timeutil"
44+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/session/session.go
3145
)
3246

3347
// TxnMode represents using optimistic or pessimistic mode in the transaction
@@ -51,6 +65,13 @@ type Session interface {
5165
RunInTxn(ctx context.Context, fn func() error, mode TxnMode) (err error)
5266
// ResetWithGlobalTimeZone resets the session time zone to global time zone
5367
ResetWithGlobalTimeZone(ctx context.Context) error
68+
<<<<<<< HEAD:ttl/session/session.go
69+
=======
70+
// GlobalTimeZone returns the global timezone. It is used to compute expire time for TTL
71+
GlobalTimeZone(ctx context.Context) (*time.Location, error)
72+
// KillStmt kills the current statement execution
73+
KillStmt()
74+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/session/session.go
5475
// Close closes the session
5576
Close()
5677
// Now returns the current time in location specified by session var
@@ -168,6 +189,23 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
168189
return err
169190
}
170191

192+
<<<<<<< HEAD:ttl/session/session.go
193+
=======
194+
// GlobalTimeZone returns the global timezone
195+
func (s *session) GlobalTimeZone(ctx context.Context) (*time.Location, error) {
196+
str, err := s.GetSessionVars().GetGlobalSystemVar(ctx, "time_zone")
197+
if err != nil {
198+
return nil, err
199+
}
200+
return timeutil.ParseTimeZone(str)
201+
}
202+
203+
// KillStmt kills the current statement execution
204+
func (s *session) KillStmt() {
205+
s.GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted)
206+
}
207+
208+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/session/session.go
171209
// Close closes the session
172210
func (s *session) Close() {
173211
if s.closeFn != nil {

ttl/session/session_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,17 @@ package session_test
1717
import (
1818
"context"
1919
"testing"
20+
"time"
2021

2122
"github.com/pingcap/errors"
23+
<<<<<<< HEAD:ttl/session/session_test.go
2224
"github.com/pingcap/tidb/testkit"
2325
"github.com/pingcap/tidb/ttl/session"
26+
=======
27+
"github.com/pingcap/tidb/pkg/testkit"
28+
"github.com/pingcap/tidb/pkg/ttl/session"
29+
"github.com/pingcap/tidb/pkg/util"
30+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/session/session_test.go
2431
"github.com/stretchr/testify/require"
2532
)
2633

@@ -64,3 +71,28 @@ func TestSessionResetTimeZone(t *testing.T) {
6471
require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO()))
6572
tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC"))
6673
}
74+
75+
func TestSessionKill(t *testing.T) {
76+
store, do := testkit.CreateMockStoreAndDomain(t)
77+
tk := testkit.NewTestKit(t, store)
78+
se := session.NewSession(tk.Session(), tk.Session(), nil)
79+
sleepStmt := "select sleep(123)"
80+
wg := util.WaitGroupWrapper{}
81+
wg.Run(func() {
82+
start := time.Now()
83+
for time.Since(start) < 10*time.Second {
84+
time.Sleep(10 * time.Millisecond)
85+
processes := do.InfoSyncer().GetSessionManager().ShowProcessList()
86+
for _, proc := range processes {
87+
if proc.Info == sleepStmt {
88+
se.KillStmt()
89+
return
90+
}
91+
}
92+
}
93+
require.FailNow(t, "wait sleep stmt timeout")
94+
})
95+
// the killed sleep stmt will return "1"
96+
tk.MustQuery(sleepStmt).Check(testkit.Rows("1"))
97+
wg.Wait()
98+
}

ttl/ttlworker/scan.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,45 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
110110
if err != nil {
111111
return t.result(err)
112112
}
113-
defer rawSess.Close()
113+
114+
doScanFinished, setDoScanFinished := context.WithCancel(context.Background())
115+
wg := util.WaitGroupWrapper{}
116+
wg.Run(func() {
117+
select {
118+
case <-taskCtx.Done():
119+
case <-ctx.Done():
120+
case <-doScanFinished.Done():
121+
return
122+
}
123+
logger := logutil.BgLogger().With(
124+
zap.Int64("tableID", t.TableID),
125+
zap.String("table", t.tbl.Name.O),
126+
zap.String("partition", t.tbl.Partition.O),
127+
zap.String("jobID", t.JobID),
128+
zap.Int64("scanID", t.ScanID),
129+
)
130+
logger.Info("kill the running statement in scan task because the task or worker cancelled")
131+
rawSess.KillStmt()
132+
ticker := time.NewTicker(time.Minute)
133+
defer ticker.Stop()
134+
for {
135+
// Have a small probability that the kill signal will be lost when the session is idle.
136+
// So wait for a while and send the kill signal again if the scan is still running.
137+
select {
138+
case <-doScanFinished.Done():
139+
return
140+
case <-ticker.C:
141+
logger.Warn("scan task is still running after the kill signal sent, kill it again")
142+
rawSess.KillStmt()
143+
}
144+
}
145+
})
146+
147+
defer func() {
148+
setDoScanFinished()
149+
wg.Wait()
150+
rawSess.Close()
151+
}()
114152

115153
safeExpire, err := t.tbl.EvalExpireTime(taskCtx, rawSess, time.Now())
116154
if err != nil {
@@ -181,7 +219,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
181219
selectInterval := time.Since(sqlStart)
182220
if sqlErr != nil {
183221
metrics.SelectErrorDuration.Observe(selectInterval.Seconds())
184-
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil
222+
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil && t.ctx.Err() == nil
185223
logutil.BgLogger().Error("execute query for ttl scan task failed",
186224
zap.String("SQL", sql),
187225
zap.Int("retryTimes", retryTimes),

ttl/ttlworker/scan_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,20 @@ import (
2121
"time"
2222

2323
"github.com/pingcap/errors"
24+
<<<<<<< HEAD:ttl/ttlworker/scan_test.go
2425
"github.com/pingcap/tidb/parser/mysql"
2526
"github.com/pingcap/tidb/sessionctx/variable"
2627
"github.com/pingcap/tidb/ttl/cache"
2728
"github.com/pingcap/tidb/types"
2829
"github.com/pingcap/tidb/util/chunk"
30+
=======
31+
"github.com/pingcap/tidb/pkg/parser/mysql"
32+
"github.com/pingcap/tidb/pkg/sessionctx/variable"
33+
"github.com/pingcap/tidb/pkg/ttl/cache"
34+
"github.com/pingcap/tidb/pkg/types"
35+
"github.com/pingcap/tidb/pkg/util"
36+
"github.com/pingcap/tidb/pkg/util/chunk"
37+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/ttlworker/scan_test.go
2938
"github.com/stretchr/testify/require"
3039
)
3140

@@ -445,3 +454,51 @@ func TestScanTaskCheck(t *testing.T) {
445454
require.Equal(t, 1, len(ch))
446455
require.Equal(t, "Total Rows: 1, Success Rows: 0, Error Rows: 0", task.statistics.String())
447456
}
457+
458+
func TestScanTaskCancelStmt(t *testing.T) {
459+
task := &ttlScanTask{
460+
ctx: context.Background(),
461+
tbl: newMockTTLTbl(t, "t1"),
462+
TTLTask: &cache.TTLTask{
463+
ExpireTime: time.UnixMilli(0),
464+
ScanRangeStart: []types.Datum{types.NewIntDatum(0)},
465+
},
466+
statistics: &ttlStatistics{},
467+
}
468+
469+
testCancel := func(ctx context.Context, doCancel func()) {
470+
mockPool := newMockSessionPool(t)
471+
startExec := make(chan struct{})
472+
mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo)
473+
mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) {
474+
close(startExec)
475+
select {
476+
case <-mockPool.se.killed:
477+
return nil, errors.New("killed")
478+
case <-time.After(10 * time.Second):
479+
return nil, errors.New("timeout")
480+
}
481+
}
482+
wg := util.WaitGroupWrapper{}
483+
wg.Run(func() {
484+
select {
485+
case <-startExec:
486+
case <-time.After(10 * time.Second):
487+
require.FailNow(t, "timeout")
488+
}
489+
doCancel()
490+
})
491+
r := task.doScan(ctx, nil, mockPool)
492+
require.NotNil(t, r)
493+
require.EqualError(t, r.err, "killed")
494+
wg.Wait()
495+
}
496+
497+
// test cancel with input context
498+
ctx, cancel := context.WithCancel(context.Background())
499+
testCancel(ctx, cancel)
500+
501+
// test cancel with task context
502+
task.ctx, cancel = context.WithCancel(context.Background())
503+
testCancel(context.Background(), cancel)
504+
}

ttl/ttlworker/session_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ type mockSession struct {
140140
resetTimeZoneCalls int
141141
closed bool
142142
commitErr error
143+
killed chan struct{}
143144
}
144145

145146
func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
@@ -154,6 +155,7 @@ func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
154155
sessionInfoSchema: newMockInfoSchema(tbls...),
155156
evalExpire: time.Now(),
156157
sessionVars: sessVars,
158+
killed: make(chan struct{}),
157159
}
158160
}
159161

@@ -201,6 +203,19 @@ func (s *mockSession) ResetWithGlobalTimeZone(_ context.Context) (err error) {
201203
return nil
202204
}
203205

206+
<<<<<<< HEAD:ttl/ttlworker/session_test.go
207+
=======
208+
// GlobalTimeZone returns the global timezone
209+
func (s *mockSession) GlobalTimeZone(_ context.Context) (*time.Location, error) {
210+
return time.Local, nil
211+
}
212+
213+
// KillStmt kills the current statement execution
214+
func (s *mockSession) KillStmt() {
215+
close(s.killed)
216+
}
217+
218+
>>>>>>> e68c26a0e67 (ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)):pkg/ttl/ttlworker/session_test.go
204219
func (s *mockSession) Close() {
205220
s.closed = true
206221
}

0 commit comments

Comments
 (0)