From 4ea266a3245c90ed515c03e4e74276185183f23a Mon Sep 17 00:00:00 2001 From: YangKeao Date: Wed, 15 Jan 2025 17:34:43 +0800 Subject: [PATCH] This is an automated cherry-pick of #58943 Signed-off-by: ti-chi-bot --- pkg/ttl/ttlworker/BUILD.bazel | 122 +++++++++++++++++++++++++ ttl/session/session.go | 22 +++-- ttl/ttlworker/scan_integration_test.go | 99 ++++++++++++++++++++ ttl/ttlworker/scan_test.go | 18 ++++ 4 files changed, 253 insertions(+), 8 deletions(-) create mode 100644 pkg/ttl/ttlworker/BUILD.bazel create mode 100644 ttl/ttlworker/scan_integration_test.go diff --git a/pkg/ttl/ttlworker/BUILD.bazel b/pkg/ttl/ttlworker/BUILD.bazel new file mode 100644 index 0000000000000..aa5a14bc7875f --- /dev/null +++ b/pkg/ttl/ttlworker/BUILD.bazel @@ -0,0 +1,122 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "ttlworker", + srcs = [ + "config.go", + "del.go", + "job.go", + "job_manager.go", + "scan.go", + "session.go", + "task_manager.go", + "timer.go", + "timer_sync.go", + "worker.go", + ], + importpath = "github.com/pingcap/tidb/pkg/ttl/ttlworker", + visibility = ["//visibility:public"], + deps = [ + "//pkg/infoschema", + "//pkg/infoschema/context", + "//pkg/kv", + "//pkg/meta/model", + "//pkg/metrics", + "//pkg/parser/ast", + "//pkg/parser/terror", + "//pkg/sessionctx", + "//pkg/sessionctx/variable", + "//pkg/store/driver/error", + "//pkg/timer/api", + "//pkg/timer/runtime", + "//pkg/timer/tablestore", + "//pkg/ttl/cache", + "//pkg/ttl/client", + "//pkg/ttl/metrics", + "//pkg/ttl/session", + "//pkg/ttl/sqlbuilder", + "//pkg/types", + "//pkg/util", + "//pkg/util/chunk", + "//pkg/util/intest", + "//pkg/util/logutil", + "//pkg/util/sqlexec", + "//pkg/util/timeutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", + "@io_etcd_go_etcd_client_v3//:client", + "@org_golang_x_exp//maps", + "@org_golang_x_time//rate", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "ttlworker_test", + timeout = "moderate", + srcs = [ + "del_test.go", + "job_manager_integration_test.go", + "job_manager_test.go", + "scan_integration_test.go", + "scan_test.go", + "session_integration_test.go", + "session_test.go", + "task_manager_integration_test.go", + "task_manager_test.go", + "timer_sync_test.go", + "timer_test.go", + ], + embed = [":ttlworker"], + flaky = True, + race = "on", + shard_count = 50, + deps = [ + "//pkg/domain", + "//pkg/infoschema", + "//pkg/infoschema/context", + "//pkg/kv", + "//pkg/meta/model", + "//pkg/metrics", + "//pkg/parser/ast", + "//pkg/parser/mysql", + "//pkg/sessionctx", + "//pkg/sessionctx/variable", + "//pkg/statistics", + "//pkg/store/mockstore", + "//pkg/testkit", + "//pkg/testkit/testfailpoint", + "//pkg/testkit/testflag", + "//pkg/timer/api", + "//pkg/timer/tablestore", + "//pkg/ttl/cache", + "//pkg/ttl/client", + "//pkg/ttl/metrics", + "//pkg/ttl/session", + "//pkg/types", + "//pkg/util", + "//pkg/util/chunk", + "//pkg/util/logutil", + "//pkg/util/mock", + "//pkg/util/skip", + "//pkg/util/sqlexec", + "//pkg/util/timeutil", + "@com_github_google_uuid//:uuid", + "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_prometheus_client_model//go", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//mock", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", + "@org_golang_x_time//rate", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", + ], +) diff --git a/ttl/session/session.go b/ttl/session/session.go index 7a54f125ea7a1..5ee948f8fc6f3 100644 --- a/ttl/session/session.go +++ b/ttl/session/session.go @@ -108,6 +108,20 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{ // RunInTxn executes the specified function in a txn func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) { + success := false + defer func() { + // Always try to `ROLLBACK` the transaction even if only the `BEGIN` fails. If the `BEGIN` is killed + // after it runs the first `Next`, the transaction is already active and needs to be `ROLLBACK`ed. + if !success { + // For now, the "ROLLBACK" can execute successfully even when the context has already been cancelled. + // Using another timeout context to avoid that this behavior will be changed in the future. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + _, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK") + terror.Log(rollbackErr) + cancel() + } + }() + tracer := metrics.PhaseTracerFromCtx(ctx) defer tracer.EnterPhase(tracer.Phase()) @@ -126,14 +140,6 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode } tracer.EnterPhase(metrics.PhaseOther) - success := false - defer func() { - if !success { - _, rollbackErr := s.ExecuteSQL(ctx, "ROLLBACK") - terror.Log(rollbackErr) - } - }() - if err = fn(); err != nil { return err } diff --git a/ttl/ttlworker/scan_integration_test.go b/ttl/ttlworker/scan_integration_test.go new file mode 100644 index 0000000000000..181ac298d13f9 --- /dev/null +++ b/ttl/ttlworker/scan_integration_test.go @@ -0,0 +1,99 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker_test + +import ( + "context" + "fmt" + "math/rand/v2" + "sync" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testflag" + "github.com/pingcap/tidb/pkg/ttl/cache" + "github.com/pingcap/tidb/pkg/ttl/ttlworker" + "github.com/stretchr/testify/require" +) + +func TestCancelWhileScan(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("create table test.t (id int, created_at datetime) TTL= created_at + interval 1 hour") + testTable, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t")) + require.NoError(t, err) + for i := 0; i < 10000; i++ { + tk.MustExec(fmt.Sprintf("insert into test.t values (%d, NOW() - INTERVAL 24 HOUR)", i)) + } + testPhysicalTableCache, err := cache.NewPhysicalTable(ast.NewCIStr("test"), testTable.Meta(), ast.NewCIStr("")) + require.NoError(t, err) + + delCh := make(chan *ttlworker.TTLDeleteTask) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for range delCh { + // do nothing + } + }() + + testStart := time.Now() + testDuration := time.Second + if testflag.Long() { + testDuration = time.Minute + } + for time.Since(testStart) < testDuration { + ctx, cancel := context.WithCancel(context.Background()) + ttlTask := ttlworker.NewTTLScanTask(ctx, testPhysicalTableCache, &cache.TTLTask{ + JobID: "test", + TableID: 1, + ScanID: 1, + ScanRangeStart: nil, + ScanRangeEnd: nil, + ExpireTime: time.Now().Add(-12 * time.Hour), + OwnerID: "test", + OwnerAddr: "test", + OwnerHBTime: time.Now(), + Status: cache.TaskStatusRunning, + StatusUpdateTime: time.Now(), + State: &cache.TTLTaskState{}, + CreatedTime: time.Now(), + }) + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + ttlTask.DoScan(ctx, delCh, dom.SysSessionPool()) + }() + + // randomly sleep for a while and cancel the scan + time.Sleep(time.Duration(rand.Int() % int(time.Millisecond*10))) + startCancel := time.Now() + cancel() + + wg.Wait() + // make sure the scan is canceled in time + require.Less(t, time.Since(startCancel), time.Second) + } + + close(delCh) + wg.Wait() +} diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 9f196ec568a9a..2b3782f1bb77e 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -494,3 +494,21 @@ func TestScanTaskCancelStmt(t *testing.T) { task.ctx, cancel = context.WithCancel(context.Background()) testCancel(context.Background(), cancel) } + +// NewTTLScanTask creates a new TTL scan task for test. +func NewTTLScanTask(ctx context.Context, tbl *cache.PhysicalTable, ttlTask *cache.TTLTask) *ttlScanTask { + return &ttlScanTask{ + ctx: ctx, + tbl: tbl, + TTLTask: ttlTask, + statistics: &ttlStatistics{}, + } +} + +// DoScan is an exported version of `doScan` for test. +func (t *ttlScanTask) DoScan(ctx context.Context, delCh chan<- *TTLDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult { + return t.doScan(ctx, delCh, sessPool) +} + +// TTLDeleteTask is an exported version of `ttlDeleteTask` for test. +type TTLDeleteTask = ttlDeleteTask