Skip to content

Commit 4ea266a

Browse files
YangKeaoti-chi-bot
authored andcommitted
This is an automated cherry-pick of #58943
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 272589d commit 4ea266a

File tree

4 files changed

+253
-8
lines changed

4 files changed

+253
-8
lines changed

pkg/ttl/ttlworker/BUILD.bazel

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "ttlworker",
5+
srcs = [
6+
"config.go",
7+
"del.go",
8+
"job.go",
9+
"job_manager.go",
10+
"scan.go",
11+
"session.go",
12+
"task_manager.go",
13+
"timer.go",
14+
"timer_sync.go",
15+
"worker.go",
16+
],
17+
importpath = "github.com/pingcap/tidb/pkg/ttl/ttlworker",
18+
visibility = ["//visibility:public"],
19+
deps = [
20+
"//pkg/infoschema",
21+
"//pkg/infoschema/context",
22+
"//pkg/kv",
23+
"//pkg/meta/model",
24+
"//pkg/metrics",
25+
"//pkg/parser/ast",
26+
"//pkg/parser/terror",
27+
"//pkg/sessionctx",
28+
"//pkg/sessionctx/variable",
29+
"//pkg/store/driver/error",
30+
"//pkg/timer/api",
31+
"//pkg/timer/runtime",
32+
"//pkg/timer/tablestore",
33+
"//pkg/ttl/cache",
34+
"//pkg/ttl/client",
35+
"//pkg/ttl/metrics",
36+
"//pkg/ttl/session",
37+
"//pkg/ttl/sqlbuilder",
38+
"//pkg/types",
39+
"//pkg/util",
40+
"//pkg/util/chunk",
41+
"//pkg/util/intest",
42+
"//pkg/util/logutil",
43+
"//pkg/util/sqlexec",
44+
"//pkg/util/timeutil",
45+
"@com_github_pingcap_errors//:errors",
46+
"@com_github_pingcap_failpoint//:failpoint",
47+
"@com_github_tikv_client_go_v2//tikv",
48+
"@com_github_tikv_client_go_v2//tikvrpc",
49+
"@io_etcd_go_etcd_client_v3//:client",
50+
"@org_golang_x_exp//maps",
51+
"@org_golang_x_time//rate",
52+
"@org_uber_go_multierr//:multierr",
53+
"@org_uber_go_zap//:zap",
54+
],
55+
)
56+
57+
go_test(
58+
name = "ttlworker_test",
59+
timeout = "moderate",
60+
srcs = [
61+
"del_test.go",
62+
"job_manager_integration_test.go",
63+
"job_manager_test.go",
64+
"scan_integration_test.go",
65+
"scan_test.go",
66+
"session_integration_test.go",
67+
"session_test.go",
68+
"task_manager_integration_test.go",
69+
"task_manager_test.go",
70+
"timer_sync_test.go",
71+
"timer_test.go",
72+
],
73+
embed = [":ttlworker"],
74+
flaky = True,
75+
race = "on",
76+
shard_count = 50,
77+
deps = [
78+
"//pkg/domain",
79+
"//pkg/infoschema",
80+
"//pkg/infoschema/context",
81+
"//pkg/kv",
82+
"//pkg/meta/model",
83+
"//pkg/metrics",
84+
"//pkg/parser/ast",
85+
"//pkg/parser/mysql",
86+
"//pkg/sessionctx",
87+
"//pkg/sessionctx/variable",
88+
"//pkg/statistics",
89+
"//pkg/store/mockstore",
90+
"//pkg/testkit",
91+
"//pkg/testkit/testfailpoint",
92+
"//pkg/testkit/testflag",
93+
"//pkg/timer/api",
94+
"//pkg/timer/tablestore",
95+
"//pkg/ttl/cache",
96+
"//pkg/ttl/client",
97+
"//pkg/ttl/metrics",
98+
"//pkg/ttl/session",
99+
"//pkg/types",
100+
"//pkg/util",
101+
"//pkg/util/chunk",
102+
"//pkg/util/logutil",
103+
"//pkg/util/mock",
104+
"//pkg/util/skip",
105+
"//pkg/util/sqlexec",
106+
"//pkg/util/timeutil",
107+
"@com_github_google_uuid//:uuid",
108+
"@com_github_ngaut_pools//:pools",
109+
"@com_github_pingcap_errors//:errors",
110+
"@com_github_pingcap_failpoint//:failpoint",
111+
"@com_github_prometheus_client_model//go",
112+
"@com_github_stretchr_testify//assert",
113+
"@com_github_stretchr_testify//mock",
114+
"@com_github_stretchr_testify//require",
115+
"@com_github_tikv_client_go_v2//testutils",
116+
"@com_github_tikv_client_go_v2//tikv",
117+
"@com_github_tikv_client_go_v2//tikvrpc",
118+
"@org_golang_x_time//rate",
119+
"@org_uber_go_atomic//:atomic",
120+
"@org_uber_go_zap//:zap",
121+
],
122+
)

ttl/session/session.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{
108108

109109
// RunInTxn executes the specified function in a txn
110110
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
111+
success := false
112+
defer func() {
113+
// Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed
114+
// after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed.
115+
if !success {
116+
// For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled.
117+
// Using another timeout context to avoid that this behavior will be changed in the future.
118+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
119+
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
120+
terror.Log(rollbackErr)
121+
cancel()
122+
}
123+
}()
124+
111125
tracer := metrics.PhaseTracerFromCtx(ctx)
112126
defer tracer.EnterPhase(tracer.Phase())
113127

@@ -126,14 +140,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode
126140
}
127141
tracer.EnterPhase(metrics.PhaseOther)
128142

129-
success := false
130-
defer func() {
131-
if !success {
132-
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
133-
terror.Log(rollbackErr)
134-
}
135-
}()
136-
137143
if err = fn(); err != nil {
138144
return err
139145
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2022 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package ttlworker_test
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"math/rand/v2"
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
"github.com/pingcap/tidb/pkg/parser/ast"
26+
"github.com/pingcap/tidb/pkg/testkit"
27+
"github.com/pingcap/tidb/pkg/testkit/testflag"
28+
"github.com/pingcap/tidb/pkg/ttl/cache"
29+
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
30+
"github.com/stretchr/testify/require"
31+
)
32+
33+
func TestCancelWhileScan(t *testing.T) {
34+
store, dom := testkit.CreateMockStoreAndDomain(t)
35+
tk := testkit.NewTestKit(t, store)
36+
37+
tk.MustExec("create table test.t (id int, created_at datetime) TTL= created_at + interval 1 hour")
38+
testTable, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
39+
require.NoError(t, err)
40+
for i := 0; i < 10000; i++ {
41+
tk.MustExec(fmt.Sprintf("insert into test.t values (%d, NOW() - INTERVAL 24 HOUR)", i))
42+
}
43+
testPhysicalTableCache, err := cache.NewPhysicalTable(ast.NewCIStr("test"), testTable.Meta(), ast.NewCIStr(""))
44+
require.NoError(t, err)
45+
46+
delCh := make(chan *ttlworker.TTLDeleteTask)
47+
wg := &sync.WaitGroup{}
48+
wg.Add(1)
49+
go func() {
50+
defer wg.Done()
51+
for range delCh {
52+
// do nothing
53+
}
54+
}()
55+
56+
testStart := time.Now()
57+
testDuration := time.Second
58+
if testflag.Long() {
59+
testDuration = time.Minute
60+
}
61+
for time.Since(testStart) < testDuration {
62+
ctx, cancel := context.WithCancel(context.Background())
63+
ttlTask := ttlworker.NewTTLScanTask(ctx, testPhysicalTableCache, &cache.TTLTask{
64+
JobID: "test",
65+
TableID: 1,
66+
ScanID: 1,
67+
ScanRangeStart: nil,
68+
ScanRangeEnd: nil,
69+
ExpireTime: time.Now().Add(-12 * time.Hour),
70+
OwnerID: "test",
71+
OwnerAddr: "test",
72+
OwnerHBTime: time.Now(),
73+
Status: cache.TaskStatusRunning,
74+
StatusUpdateTime: time.Now(),
75+
State: &cache.TTLTaskState{},
76+
CreatedTime: time.Now(),
77+
})
78+
79+
wg := &sync.WaitGroup{}
80+
wg.Add(1)
81+
go func() {
82+
defer wg.Done()
83+
84+
ttlTask.DoScan(ctx, delCh, dom.SysSessionPool())
85+
}()
86+
87+
// randomly sleep for a while and cancel the scan
88+
time.Sleep(time.Duration(rand.Int() % int(time.Millisecond*10)))
89+
startCancel := time.Now()
90+
cancel()
91+
92+
wg.Wait()
93+
// make sure the scan is canceled in time
94+
require.Less(t, time.Since(startCancel), time.Second)
95+
}
96+
97+
close(delCh)
98+
wg.Wait()
99+
}

ttl/ttlworker/scan_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,3 +494,21 @@ func TestScanTaskCancelStmt(t *testing.T) {
494494
task.ctx, cancel = context.WithCancel(context.Background())
495495
testCancel(context.Background(), cancel)
496496
}
497+
498+
// NewTTLScanTask creates a new TTL scan task for test.
499+
func NewTTLScanTask(ctx context.Context, tbl *cache.PhysicalTable, ttlTask *cache.TTLTask) *ttlScanTask {
500+
return &ttlScanTask{
501+
ctx: ctx,
502+
tbl: tbl,
503+
TTLTask: ttlTask,
504+
statistics: &ttlStatistics{},
505+
}
506+
}
507+
508+
// DoScan is an exported version of `doScan` for test.
509+
func (t *ttlScanTask) DoScan(ctx context.Context, delCh chan<- *TTLDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult {
510+
return t.doScan(ctx, delCh, sessPool)
511+
}
512+
513+
// TTLDeleteTask is an exported version of `ttlDeleteTask` for test.
514+
type TTLDeleteTask = ttlDeleteTask

0 commit comments

Comments
 (0)