Skip to content

Commit 23d41f5

Browse files
authored
ttl: rollback the scan trasnaction even if the BEGIN failed. (#58943) (#58949)
close #58900
1 parent fbe4137 commit 23d41f5

File tree

4 files changed

+128
-8
lines changed

4 files changed

+128
-8
lines changed

pkg/ttl/session/session.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{
107107

108108
// RunInTxn executes the specified function in a txn
109109
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
110+
success := false
111+
defer func() {
112+
// Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed
113+
// after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed.
114+
if !success {
115+
// For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled.
116+
// Using another timeout context to avoid that this behavior will be changed in the future.
117+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
118+
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
119+
terror.Log(rollbackErr)
120+
cancel()
121+
}
122+
}()
123+
110124
tracer := metrics.PhaseTracerFromCtx(ctx)
111125
defer tracer.EnterPhase(tracer.Phase())
112126

@@ -125,14 +139,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode
125139
}
126140
tracer.EnterPhase(metrics.PhaseOther)
127141

128-
success := false
129-
defer func() {
130-
if !success {
131-
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
132-
terror.Log(rollbackErr)
133-
}
134-
}()
135-
136142
if err = fn(); err != nil {
137143
return err
138144
}

pkg/ttl/ttlworker/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ go_test(
6060
"del_test.go",
6161
"job_manager_integration_test.go",
6262
"job_manager_test.go",
63+
"scan_integration_test.go",
6364
"scan_test.go",
6465
"session_test.go",
6566
"task_manager_integration_test.go",
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2022 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ttlworker_test
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"math/rand"
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
"github.com/pingcap/tidb/pkg/parser/model"
26+
"github.com/pingcap/tidb/pkg/testkit"
27+
"github.com/pingcap/tidb/pkg/ttl/cache"
28+
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
29+
"github.com/stretchr/testify/require"
30+
)
31+
32+
func TestCancelWhileScan(t *testing.T) {
33+
store, dom := testkit.CreateMockStoreAndDomain(t)
34+
tk := testkit.NewTestKit(t, store)
35+
36+
tk.MustExec("create table test.t (id int, created_at datetime) TTL= created_at + interval 1 hour")
37+
testTable, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
38+
require.NoError(t, err)
39+
for i := 0; i < 10000; i++ {
40+
tk.MustExec(fmt.Sprintf("insert into test.t values (%d, NOW() - INTERVAL 24 HOUR)", i))
41+
}
42+
testPhysicalTableCache, err := cache.NewPhysicalTable(model.NewCIStr("test"), testTable.Meta(), model.NewCIStr(""))
43+
require.NoError(t, err)
44+
45+
delCh := make(chan *ttlworker.TTLDeleteTask)
46+
wg := &sync.WaitGroup{}
47+
wg.Add(1)
48+
go func() {
49+
defer wg.Done()
50+
for range delCh {
51+
// do nothing
52+
}
53+
}()
54+
55+
testStart := time.Now()
56+
testDuration := time.Second
57+
for time.Since(testStart) < testDuration {
58+
ctx, cancel := context.WithCancel(context.Background())
59+
ttlTask := ttlworker.NewTTLScanTask(ctx, testPhysicalTableCache, &cache.TTLTask{
60+
JobID: "test",
61+
TableID: 1,
62+
ScanID: 1,
63+
ScanRangeStart: nil,
64+
ScanRangeEnd: nil,
65+
ExpireTime: time.Now().Add(-12 * time.Hour),
66+
OwnerID: "test",
67+
OwnerAddr: "test",
68+
OwnerHBTime: time.Now(),
69+
Status: cache.TaskStatusRunning,
70+
StatusUpdateTime: time.Now(),
71+
State: &cache.TTLTaskState{},
72+
CreatedTime: time.Now(),
73+
})
74+
75+
wg := &sync.WaitGroup{}
76+
wg.Add(1)
77+
go func() {
78+
defer wg.Done()
79+
80+
ttlTask.DoScan(ctx, delCh, dom.SysSessionPool())
81+
}()
82+
83+
// randomly sleep for a while and cancel the scan
84+
time.Sleep(time.Duration(rand.Int() % int(time.Millisecond*10)))
85+
startCancel := time.Now()
86+
cancel()
87+
88+
wg.Wait()
89+
// make sure the scan is canceled in time
90+
require.Less(t, time.Since(startCancel), time.Second)
91+
}
92+
93+
close(delCh)
94+
wg.Wait()
95+
}

pkg/ttl/ttlworker/scan_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,3 +494,21 @@ func TestScanTaskCancelStmt(t *testing.T) {
494494
task.ctx, cancel = context.WithCancel(context.Background())
495495
testCancel(context.Background(), cancel)
496496
}
497+
498+
// NewTTLScanTask creates a new TTL scan task for test.
499+
func NewTTLScanTask(ctx context.Context, tbl *cache.PhysicalTable, ttlTask *cache.TTLTask) *ttlScanTask {
500+
return &ttlScanTask{
501+
ctx: ctx,
502+
tbl: tbl,
503+
TTLTask: ttlTask,
504+
statistics: &ttlStatistics{},
505+
}
506+
}
507+
508+
// DoScan is an exported version of `doScan` for test.
509+
func (t *ttlScanTask) DoScan(ctx context.Context, delCh chan<- *TTLDeleteTask, sessPool sessionPool) *ttlScanTaskExecResult {
510+
return t.doScan(ctx, delCh, sessPool)
511+
}
512+
513+
// TTLDeleteTask is an exported version of `ttlDeleteTask` for test.
514+
type TTLDeleteTask = ttlDeleteTask

0 commit comments

Comments
 (0)