Skip to content

Commit b847ccf

Browse files
YangKeaozeminzhou
authored andcommitted
ttl: rollback the scan trasnaction even if the BEGIN failed. (pingcap#58943)
close pingcap#58900
1 parent 343e2b9 commit b847ccf

File tree

4 files changed

+132
-8
lines changed

4 files changed

+132
-8
lines changed

pkg/ttl/session/session.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...any) ([]ch
111111

112112
// RunInTxn executes the specified function in a txn
113113
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
114+
success := false
115+
defer func() {
116+
// Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed
117+
// after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed.
118+
if !success {
119+
// For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled.
120+
// Using another timeout context to avoid that this behavior will be changed in the future.
121+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
122+
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
123+
terror.Log(rollbackErr)
124+
cancel()
125+
}
126+
}()
127+
114128
tracer := metrics.PhaseTracerFromCtx(ctx)
115129
defer tracer.EnterPhase(tracer.Phase())
116130

@@ -129,14 +143,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode
129143
}
130144
tracer.EnterPhase(metrics.PhaseOther)
131145

132-
success := false
133-
defer func() {
134-
if !success {
135-
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
136-
terror.Log(rollbackErr)
137-
}
138-
}()
139-
140146
if err = fn(); err != nil {
141147
return err
142148
}

pkg/ttl/ttlworker/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ go_test(
6161
"del_test.go",
6262
"job_manager_integration_test.go",
6363
"job_manager_test.go",
64+
"scan_integration_test.go",
6465
"scan_test.go",
6566
"session_integration_test.go",
6667
"session_test.go",
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2022 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ttlworker_test
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"math/rand/v2"
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
"github.com/pingcap/tidb/pkg/parser/ast"
26+
"github.com/pingcap/tidb/pkg/testkit"
27+
"github.com/pingcap/tidb/pkg/testkit/testflag"
28+
"github.com/pingcap/tidb/pkg/ttl/cache"
29+
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
30+
"github.com/stretchr/testify/require"
31+
)
32+
33+
func TestCancelWhileScan(t *testing.T) {
34+
store, dom := testkit.CreateMockStoreAndDomain(t)
35+
tk := testkit.NewTestKit(t, store)
36+
37+
tk.MustExec("create table test.t (id int, created_at datetime) TTL= created_at + interval 1 hour")
38+
testTable, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
39+
require.NoError(t, err)
40+
for i := 0; i < 10000; i++ {
41+
tk.MustExec(fmt.Sprintf("insert into test.t values (%d, NOW() - INTERVAL 24 HOUR)", i))
42+
}
43+
testPhysicalTableCache, err := cache.NewPhysicalTable(ast.NewCIStr("test"), testTable.Meta(), ast.NewCIStr(""))
44+
require.NoError(t, err)
45+
46+
delCh := make(chan *ttlworker.TTLDeleteTask)
47+
wg := &sync.WaitGroup{}
48+
wg.Add(1)
49+
go func() {
50+
defer wg.Done()
51+
for range delCh {
52+
// do nothing
53+
}
54+
}()
55+
56+
testStart := time.Now()
57+
testDuration := time.Second
58+
if testflag.Long() {
59+
testDuration = time.Minute
60+
}
61+
for time.Since(testStart) < testDuration {
62+
ctx, cancel := context.WithCancel(context.Background())
63+
ttlTask := ttlworker.NewTTLScanTask(ctx, testPhysicalTableCache, &cache.TTLTask{
64+
JobID: "test",
65+
TableID: 1,
66+
ScanID: 1,
67+
ScanRangeStart: nil,
68+
ScanRangeEnd: nil,
69+
ExpireTime: time.Now().Add(-12 * time.Hour),
70+
OwnerID: "test",
71+
OwnerAddr: "test",
72+
OwnerHBTime: time.Now(),
73+
Status: cache.TaskStatusRunning,
74+
StatusUpdateTime: time.Now(),
75+
State: &cache.TTLTaskState{},
76+
CreatedTime: time.Now(),
77+
})
78+
79+
wg := &sync.WaitGroup{}
80+
wg.Add(1)
81+
go func() {
82+
defer wg.Done()
83+
84+
ttlTask.DoScan(ctx, delCh, dom.SysSessionPool())
85+
}()
86+
87+
// randomly sleep for a while and cancel the scan
88+
time.Sleep(time.Duration(rand.Int() % int(time.Millisecond*10)))
89+
startCancel := time.Now()
90+
cancel()
91+
92+
wg.Wait()
93+
// make sure the scan is canceled in time
94+
require.Less(t, time.Since(startCancel), time.Second)
95+
}
96+
97+
close(delCh)
98+
wg.Wait()
99+
}

pkg/ttl/ttlworker/scan_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,3 +554,21 @@ func TestScanTaskCancelStmt(t *testing.T) {
554554
task.ctx, cancel = context.WithCancel(context.Background())
555555
testCancel(context.Background(), cancel)
556556
}
557+
558+
// NewTTLScanTask creates a new TTL scan task for test.
559+
func NewTTLScanTask(ctx context.Context, tbl *cache.PhysicalTable, ttlTask *cache.TTLTask) *ttlScanTask {
560+
return &ttlScanTask{
561+
ctx: ctx,
562+
tbl: tbl,
563+
TTLTask: ttlTask,
564+
statistics: &ttlStatistics{},
565+
}
566+
}
567+
568+
// DoScan is an exported version of `doScan` for test.
569+
func (t *ttlScanTask) DoScan(ctx context.Context, delCh chan<- *TTLDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult {
570+
return t.doScan(ctx, delCh, sessPool)
571+
}
572+
573+
// TTLDeleteTask is an exported version of `ttlDeleteTask` for test.
574+
type TTLDeleteTask = ttlDeleteTask

0 commit comments

Comments
 (0)