Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "ttlworker",
srcs = [
"config.go",
"del.go",
"job.go",
"job_manager.go",
"scan.go",
"session.go",
"task_manager.go",
"timer.go",
"timer_sync.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/pkg/ttl/ttlworker",
visibility = ["//visibility:public"],
deps = [
"//pkg/infoschema",
"//pkg/infoschema/context",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/metrics",
"//pkg/parser/ast",
"//pkg/parser/terror",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/store/driver/error",
"//pkg/timer/api",
"//pkg/timer/runtime",
"//pkg/timer/tablestore",
"//pkg/ttl/cache",
"//pkg/ttl/client",
"//pkg/ttl/metrics",
"//pkg/ttl/session",
"//pkg/ttl/sqlbuilder",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"//pkg/util/timeutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_exp//maps",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "ttlworker_test",
timeout = "moderate",
srcs = [
"del_test.go",
"job_manager_integration_test.go",
"job_manager_test.go",
"scan_integration_test.go",
"scan_test.go",
"session_integration_test.go",
"session_test.go",
"task_manager_integration_test.go",
"task_manager_test.go",
"timer_sync_test.go",
"timer_test.go",
],
embed = [":ttlworker"],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//pkg/domain",
"//pkg/infoschema",
"//pkg/infoschema/context",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/metrics",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/store/mockstore",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testflag",
"//pkg/timer/api",
"//pkg/timer/tablestore",
"//pkg/ttl/cache",
"//pkg/ttl/client",
"//pkg/ttl/metrics",
"//pkg/ttl/session",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/mock",
"//pkg/util/skip",
"//pkg/util/sqlexec",
"//pkg/util/timeutil",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
22 changes: 14 additions & 8 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{

// RunInTxn executes the specified function in a txn
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
success := false
defer func() {
// Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed
// after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed.
if !success {
// For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled.
// Using another timeout context to avoid that this behavior will be changed in the future.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
terror.Log(rollbackErr)
cancel()
}
}()

tracer := metrics.PhaseTracerFromCtx(ctx)
defer tracer.EnterPhase(tracer.Phase())

Expand All @@ -126,14 +140,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode
}
tracer.EnterPhase(metrics.PhaseOther)

success := false
defer func() {
if !success {
_, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK")
terror.Log(rollbackErr)
}
}()

if err = fn(); err != nil {
return err
}
Expand Down
99 changes: 99 additions & 0 deletions ttl/ttlworker/scan_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttlworker_test

import (
"context"
"fmt"
"math/rand/v2"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testflag"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
"github.com/stretchr/testify/require"
)

func TestCancelWhileScan(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("create table test.t (id int, created_at datetime) TTL= created_at + interval 1 hour")
testTable, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
require.NoError(t, err)
for i := 0; i < 10000; i++ {
tk.MustExec(fmt.Sprintf("insert into test.t values (%d, NOW() - INTERVAL 24 HOUR)", i))
}
testPhysicalTableCache, err := cache.NewPhysicalTable(ast.NewCIStr("test"), testTable.Meta(), ast.NewCIStr(""))
require.NoError(t, err)

delCh := make(chan *ttlworker.TTLDeleteTask)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for range delCh {
// do nothing
}
}()

testStart := time.Now()
testDuration := time.Second
if testflag.Long() {
testDuration = time.Minute
}
for time.Since(testStart) < testDuration {
ctx, cancel := context.WithCancel(context.Background())
ttlTask := ttlworker.NewTTLScanTask(ctx, testPhysicalTableCache, &cache.TTLTask{
JobID: "test",
TableID: 1,
ScanID: 1,
ScanRangeStart: nil,
ScanRangeEnd: nil,
ExpireTime: time.Now().Add(-12 * time.Hour),
OwnerID: "test",
OwnerAddr: "test",
OwnerHBTime: time.Now(),
Status: cache.TaskStatusRunning,
StatusUpdateTime: time.Now(),
State: &cache.TTLTaskState{},
CreatedTime: time.Now(),
})

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

ttlTask.DoScan(ctx, delCh, dom.SysSessionPool())
}()

// randomly sleep for a while and cancel the scan
time.Sleep(time.Duration(rand.Int() % int(time.Millisecond*10)))
startCancel := time.Now()
cancel()

wg.Wait()
// make sure the scan is canceled in time
require.Less(t, time.Since(startCancel), time.Second)
}

close(delCh)
wg.Wait()
}
18 changes: 18 additions & 0 deletions ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,21 @@ func TestScanTaskCancelStmt(t *testing.T) {
task.ctx, cancel = context.WithCancel(context.Background())
testCancel(context.Background(), cancel)
}

// NewTTLScanTask creates a new TTL scan task for test.
func NewTTLScanTask(ctx context.Context, tbl *cache.PhysicalTable, ttlTask *cache.TTLTask) *ttlScanTask {
return &ttlScanTask{
ctx: ctx,
tbl: tbl,
TTLTask: ttlTask,
statistics: &ttlStatistics{},
}
}

// DoScan is an exported version of `doScan` for test.
func (t *ttlScanTask) DoScan(ctx context.Context, delCh chan<- *TTLDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult {
return t.doScan(ctx, delCh, sessPool)
}

// TTLDeleteTask is an exported version of `ttlDeleteTask` for test.
type TTLDeleteTask = ttlDeleteTask