Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/ttl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/ttl/metrics",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/timeutil",
"@com_github_pingcap_errors//:errors",
],
Expand All @@ -29,12 +30,13 @@ go_test(
"sysvar_test.go",
],
flaky = True,
shard_count = 6,
shard_count = 7,
deps = [
":session",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
Expand Down
8 changes: 8 additions & 0 deletions pkg/ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/metrics"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/timeutil"
)

Expand All @@ -54,6 +55,8 @@ type Session interface {
ResetWithGlobalTimeZone(ctx context.Context) error
// GlobalTimeZone returns the global timezone. It is used to compute expire time for TTL
GlobalTimeZone(ctx context.Context) (*time.Location, error)
// KillStmt kills the current statement execution
KillStmt()
// Close closes the session
Close()
// Now returns the current time in location specified by session var
Expand Down Expand Up @@ -180,6 +183,11 @@ func (s *session) GlobalTimeZone(ctx context.Context) (*time.Location, error) {
return timeutil.ParseTimeZone(str)
}

// KillStmt kills the current statement execution
func (s *session) KillStmt() {
s.GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted)
}

// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/ttl/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package session_test
import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -64,3 +66,28 @@ func TestSessionResetTimeZone(t *testing.T) {
require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO()))
tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC"))
}

func TestSessionKill(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
se := session.NewSession(tk.Session(), tk.Session(), nil)
sleepStmt := "select sleep(123)"
wg := util.WaitGroupWrapper{}
wg.Run(func() {
start := time.Now()
for time.Since(start) < 10*time.Second {
time.Sleep(10 * time.Millisecond)
processes := do.InfoSyncer().GetSessionManager().ShowProcessList()
for _, proc := range processes {
if proc.Info == sleepStmt {
se.KillStmt()
return
}
}
}
require.FailNow(t, "wait sleep stmt timeout")
})
// the killed sleep stmt will return "1"
tk.MustQuery(sleepStmt).Check(testkit.Rows("1"))
wg.Wait()
}
42 changes: 40 additions & 2 deletions pkg/ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,45 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
if err != nil {
return t.result(err)
}
defer rawSess.Close()

doScanFinished, setDoScanFinished := context.WithCancel(context.Background())
wg := util.WaitGroupWrapper{}
wg.Run(func() {
select {
case <-taskCtx.Done():
case <-ctx.Done():
case <-doScanFinished.Done():
return
}
logger := logutil.BgLogger().With(
zap.Int64("tableID", t.TableID),
zap.String("table", t.tbl.Name.O),
zap.String("partition", t.tbl.Partition.O),
zap.String("jobID", t.JobID),
zap.Int64("scanID", t.ScanID),
)
logger.Info("kill the running statement in scan task because the task or worker cancelled")
rawSess.KillStmt()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
// Have a small probability that the kill signal will be lost when the session is idle.
// So wait for a while and send the kill signal again if the scan is still running.
select {
case <-doScanFinished.Done():
return
case <-ticker.C:
logger.Warn("scan task is still running after the kill signal sent, kill it again")
rawSess.KillStmt()
}
}
})

defer func() {
setDoScanFinished()
wg.Wait()
rawSess.Close()
}()

safeExpire, err := t.tbl.EvalExpireTime(taskCtx, rawSess, rawSess.Now())
if err != nil {
Expand Down Expand Up @@ -182,7 +220,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
selectInterval := time.Since(sqlStart)
if sqlErr != nil {
metrics.SelectErrorDuration.Observe(selectInterval.Seconds())
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil && t.ctx.Err() == nil
logutil.BgLogger().Error("execute query for ttl scan task failed",
zap.String("SQL", sql),
zap.Int("retryTimes", retryTimes),
Expand Down
49 changes: 49 additions & 0 deletions pkg/ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -446,3 +447,51 @@ func TestScanTaskCheck(t *testing.T) {
require.Equal(t, 1, len(ch))
require.Equal(t, "Total Rows: 1, Success Rows: 0, Error Rows: 0", task.statistics.String())
}

func TestScanTaskCancelStmt(t *testing.T) {
task := &ttlScanTask{
ctx: context.Background(),
tbl: newMockTTLTbl(t, "t1"),
TTLTask: &cache.TTLTask{
ExpireTime: time.UnixMilli(0),
ScanRangeStart: []types.Datum{types.NewIntDatum(0)},
},
statistics: &ttlStatistics{},
}

testCancel := func(ctx context.Context, doCancel func()) {
mockPool := newMockSessionPool(t)
startExec := make(chan struct{})
mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo)
mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) {
close(startExec)
select {
case <-mockPool.se.killed:
return nil, errors.New("killed")
case <-time.After(10 * time.Second):
return nil, errors.New("timeout")
}
}
wg := util.WaitGroupWrapper{}
wg.Run(func() {
select {
case <-startExec:
case <-time.After(10 * time.Second):
require.FailNow(t, "timeout")
}
doCancel()
})
r := task.doScan(ctx, nil, mockPool)
require.NotNil(t, r)
require.EqualError(t, r.err, "killed")
wg.Wait()
}

// test cancel with input context
ctx, cancel := context.WithCancel(context.Background())
testCancel(ctx, cancel)

// test cancel with task context
task.ctx, cancel = context.WithCancel(context.Background())
testCancel(context.Background(), cancel)
}
7 changes: 7 additions & 0 deletions pkg/ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type mockSession struct {
resetTimeZoneCalls int
closed bool
commitErr error
killed chan struct{}
}

func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
Expand All @@ -168,6 +169,7 @@ func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
t: t,
sessionInfoSchema: newMockInfoSchema(tbls...),
sessionVars: sessVars,
killed: make(chan struct{}),
}
}

Expand Down Expand Up @@ -224,6 +226,11 @@ func (s *mockSession) GlobalTimeZone(_ context.Context) (*time.Location, error)
return time.Local, nil
}

// KillStmt kills the current statement execution
func (s *mockSession) KillStmt() {
close(s.killed)
}

func (s *mockSession) Close() {
s.closed = true
}
Expand Down