Skip to content

Commit b5c0f24

Browse files
authored
ttl: force to kill SQL in scan task when canceling TTL job/task (#56518) (#56578)
close #56511
1 parent eb9d3bf commit b5c0f24

File tree

7 files changed

+136
-4
lines changed

7 files changed

+136
-4
lines changed

ttl/session/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ go_test(
2828
"sysvar_test.go",
2929
],
3030
flaky = True,
31-
shard_count = 6,
31+
shard_count = 7,
3232
deps = [
3333
":session",
3434
"//sessionctx/variable",
3535
"//testkit",
3636
"//testkit/testsetup",
37+
"//util",
3738
"@com_github_pingcap_errors//:errors",
3839
"@com_github_stretchr_testify//require",
3940
"@org_uber_go_goleak//:goleak",

ttl/session/session.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package session
1616

1717
import (
1818
"context"
19+
"sync/atomic"
1920
"time"
2021

2122
"github.com/pingcap/errors"
@@ -51,6 +52,8 @@ type Session interface {
5152
RunInTxn(ctx context.Context, fn func() error, mode TxnMode) (err error)
5253
// ResetWithGlobalTimeZone resets the session time zone to global time zone
5354
ResetWithGlobalTimeZone(ctx context.Context) error
55+
// KillStmt kills the current statement execution
56+
KillStmt()
5457
// Close closes the session
5558
Close()
5659
// Now returns the current time in location specified by session var
@@ -168,6 +171,11 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
168171
return err
169172
}
170173

174+
// KillStmt kills the current statement execution
175+
func (s *session) KillStmt() {
176+
atomic.StoreUint32(&s.GetSessionVars().Killed, 1)
177+
}
178+
171179
// Close closes the session
172180
func (s *session) Close() {
173181
if s.closeFn != nil {

ttl/session/session_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ package session_test
1717
import (
1818
"context"
1919
"testing"
20+
"time"
2021

2122
"github.com/pingcap/errors"
2223
"github.com/pingcap/tidb/testkit"
2324
"github.com/pingcap/tidb/ttl/session"
25+
"github.com/pingcap/tidb/util"
2426
"github.com/stretchr/testify/require"
2527
)
2628

@@ -64,3 +66,28 @@ func TestSessionResetTimeZone(t *testing.T) {
6466
require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO()))
6567
tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC"))
6668
}
69+
70+
func TestSessionKill(t *testing.T) {
71+
store, do := testkit.CreateMockStoreAndDomain(t)
72+
tk := testkit.NewTestKit(t, store)
73+
se := session.NewSession(tk.Session(), tk.Session(), nil)
74+
sleepStmt := "select sleep(123)"
75+
wg := util.WaitGroupWrapper{}
76+
wg.Run(func() {
77+
start := time.Now()
78+
for time.Since(start) < 10*time.Second {
79+
time.Sleep(10 * time.Millisecond)
80+
processes := do.InfoSyncer().GetSessionManager().ShowProcessList()
81+
for _, proc := range processes {
82+
if proc.Info == sleepStmt {
83+
se.KillStmt()
84+
return
85+
}
86+
}
87+
}
88+
require.FailNow(t, "wait sleep stmt timeout")
89+
})
90+
// the killed sleep stmt will return "1"
91+
tk.MustQuery(sleepStmt).Check(testkit.Rows("1"))
92+
wg.Wait()
93+
}

ttl/ttlworker/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ go_test(
5959
embed = [":ttlworker"],
6060
flaky = True,
6161
race = "on",
62-
shard_count = 38,
62+
shard_count = 39,
6363
deps = [
6464
"//domain",
6565
"//infoschema",
@@ -78,6 +78,7 @@ go_test(
7878
"//ttl/metrics",
7979
"//ttl/session",
8080
"//types",
81+
"//util",
8182
"//util/chunk",
8283
"//util/logutil",
8384
"@com_github_google_uuid//:uuid",

ttl/ttlworker/scan.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/pingcap/tidb/ttl/metrics"
2929
"github.com/pingcap/tidb/ttl/sqlbuilder"
3030
"github.com/pingcap/tidb/types"
31+
"github.com/pingcap/tidb/util"
3132
"github.com/pingcap/tidb/util/chunk"
3233
"github.com/pingcap/tidb/util/logutil"
3334
"go.uber.org/zap"
@@ -110,7 +111,45 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
110111
if err != nil {
111112
return t.result(err)
112113
}
113-
defer rawSess.Close()
114+
115+
doScanFinished, setDoScanFinished := context.WithCancel(context.Background())
116+
wg := util.WaitGroupWrapper{}
117+
wg.Run(func() {
118+
select {
119+
case <-taskCtx.Done():
120+
case <-ctx.Done():
121+
case <-doScanFinished.Done():
122+
return
123+
}
124+
logger := logutil.BgLogger().With(
125+
zap.Int64("tableID", t.TableID),
126+
zap.String("table", t.tbl.Name.O),
127+
zap.String("partition", t.tbl.Partition.O),
128+
zap.String("jobID", t.JobID),
129+
zap.Int64("scanID", t.ScanID),
130+
)
131+
logger.Info("kill the running statement in scan task because the task or worker cancelled")
132+
rawSess.KillStmt()
133+
ticker := time.NewTicker(time.Minute)
134+
defer ticker.Stop()
135+
for {
136+
// Have a small probability that the kill signal will be lost when the session is idle.
137+
// So wait for a while and send the kill signal again if the scan is still running.
138+
select {
139+
case <-doScanFinished.Done():
140+
return
141+
case <-ticker.C:
142+
logger.Warn("scan task is still running after the kill signal sent, kill it again")
143+
rawSess.KillStmt()
144+
}
145+
}
146+
})
147+
148+
defer func() {
149+
setDoScanFinished()
150+
wg.Wait()
151+
rawSess.Close()
152+
}()
114153

115154
safeExpire, err := t.tbl.EvalExpireTime(taskCtx, rawSess, time.Now())
116155
if err != nil {
@@ -181,7 +220,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
181220
selectInterval := time.Since(sqlStart)
182221
if sqlErr != nil {
183222
metrics.SelectErrorDuration.Observe(selectInterval.Seconds())
184-
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil
223+
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil && t.ctx.Err() == nil
185224
logutil.BgLogger().Error("execute query for ttl scan task failed",
186225
zap.String("SQL", sql),
187226
zap.Int("retryTimes", retryTimes),

ttl/ttlworker/scan_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/pingcap/tidb/sessionctx/variable"
2626
"github.com/pingcap/tidb/ttl/cache"
2727
"github.com/pingcap/tidb/types"
28+
"github.com/pingcap/tidb/util"
2829
"github.com/pingcap/tidb/util/chunk"
2930
"github.com/stretchr/testify/require"
3031
)
@@ -445,3 +446,51 @@ func TestScanTaskCheck(t *testing.T) {
445446
require.Equal(t, 1, len(ch))
446447
require.Equal(t, "Total Rows: 1, Success Rows: 0, Error Rows: 0", task.statistics.String())
447448
}
449+
450+
func TestScanTaskCancelStmt(t *testing.T) {
451+
task := &ttlScanTask{
452+
ctx: context.Background(),
453+
tbl: newMockTTLTbl(t, "t1"),
454+
TTLTask: &cache.TTLTask{
455+
ExpireTime: time.UnixMilli(0),
456+
ScanRangeStart: []types.Datum{types.NewIntDatum(0)},
457+
},
458+
statistics: &ttlStatistics{},
459+
}
460+
461+
testCancel := func(ctx context.Context, doCancel func()) {
462+
mockPool := newMockSessionPool(t)
463+
startExec := make(chan struct{})
464+
mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo)
465+
mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) {
466+
close(startExec)
467+
select {
468+
case <-mockPool.se.killed:
469+
return nil, errors.New("killed")
470+
case <-time.After(10 * time.Second):
471+
return nil, errors.New("timeout")
472+
}
473+
}
474+
wg := util.WaitGroupWrapper{}
475+
wg.Run(func() {
476+
select {
477+
case <-startExec:
478+
case <-time.After(10 * time.Second):
479+
require.FailNow(t, "timeout")
480+
}
481+
doCancel()
482+
})
483+
r := task.doScan(ctx, nil, mockPool)
484+
require.NotNil(t, r)
485+
require.EqualError(t, r.err, "killed")
486+
wg.Wait()
487+
}
488+
489+
// test cancel with input context
490+
ctx, cancel := context.WithCancel(context.Background())
491+
testCancel(ctx, cancel)
492+
493+
// test cancel with task context
494+
task.ctx, cancel = context.WithCancel(context.Background())
495+
testCancel(context.Background(), cancel)
496+
}

ttl/ttlworker/session_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ type mockSession struct {
140140
resetTimeZoneCalls int
141141
closed bool
142142
commitErr error
143+
killed chan struct{}
143144
}
144145

145146
func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
@@ -154,6 +155,7 @@ func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
154155
sessionInfoSchema: newMockInfoSchema(tbls...),
155156
evalExpire: time.Now(),
156157
sessionVars: sessVars,
158+
killed: make(chan struct{}),
157159
}
158160
}
159161

@@ -201,6 +203,11 @@ func (s *mockSession) ResetWithGlobalTimeZone(_ context.Context) (err error) {
201203
return nil
202204
}
203205

206+
// KillStmt kills the current statement execution
207+
func (s *mockSession) KillStmt() {
208+
close(s.killed)
209+
}
210+
204211
func (s *mockSession) Close() {
205212
s.closed = true
206213
}

0 commit comments

Comments
 (0)