diff --git a/br/pkg/checksum/BUILD.bazel b/br/pkg/checksum/BUILD.bazel index bbc0da5b44c1d..0ca4089ead916 100644 --- a/br/pkg/checksum/BUILD.bazel +++ b/br/pkg/checksum/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", "@com_github_pingcap_tipb//go-tipb", + "@com_github_tikv_client_go_v2//util", "@org_uber_go_zap//:zap", ], ) @@ -27,11 +28,13 @@ go_test( name = "checksum_test", timeout = "short", srcs = [ + "executor_nokit_test.go", "executor_test.go", "main_test.go", ], embed = [":checksum"], flaky = True, + shard_count = 3, deps = [ "//br/pkg/metautil", "//br/pkg/mock", @@ -44,6 +47,7 @@ go_test( "//pkg/testkit/testsetup", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//util", "@org_uber_go_goleak//:goleak", ], ) diff --git a/br/pkg/checksum/executor.go b/br/pkg/checksum/executor.go index 4e994e7e1a3d7..b39c198e13644 100644 --- a/br/pkg/checksum/executor.go +++ b/br/pkg/checksum/executor.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/ranger" "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -35,8 +36,8 @@ type ExecutorBuilder struct { oldKeyspace []byte newKeyspace []byte - resourceGroupName string - explicitRequestSourceType string + resourceGroupName string + requestSource util.RequestSource } // NewExecutorBuilder returns a new executor builder. @@ -82,8 +83,13 @@ func (builder *ExecutorBuilder) SetResourceGroupName(name string) *ExecutorBuild return builder } +func (builder *ExecutorBuilder) SetRequestSource(reqSource util.RequestSource) *ExecutorBuilder { + builder.requestSource = reqSource + return builder +} + func (builder *ExecutorBuilder) SetExplicitRequestSourceType(name string) *ExecutorBuilder { - builder.explicitRequestSourceType = name + builder.requestSource.ExplicitRequestSourceType = name return builder } @@ -97,7 +103,7 @@ func (builder *ExecutorBuilder) Build() (*Executor, error) { builder.oldKeyspace, builder.newKeyspace, builder.resourceGroupName, - builder.explicitRequestSourceType, + builder.requestSource, ) if err != nil { return nil, errors.Trace(err) @@ -112,7 +118,8 @@ func buildChecksumRequest( concurrency uint, oldKeyspace []byte, newKeyspace []byte, - resourceGroupName, explicitRequestSourceType string, + resourceGroupName string, + requestSource util.RequestSource, ) ([]*kv.Request, error) { var partDefs []model.PartitionDefinition if part := newTable.Partition; part != nil { @@ -125,7 +132,7 @@ func buildChecksumRequest( oldTableID = oldTable.Info.ID } rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency, - oldKeyspace, newKeyspace, resourceGroupName, explicitRequestSourceType) + oldKeyspace, newKeyspace, resourceGroupName, requestSource) if err != nil { return nil, errors.Trace(err) } @@ -140,7 +147,7 @@ func buildChecksumRequest( } } rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency, - oldKeyspace, newKeyspace, resourceGroupName, explicitRequestSourceType) + oldKeyspace, newKeyspace, resourceGroupName, requestSource) if err != nil { return nil, errors.Trace(err) } @@ -159,11 +166,12 @@ func buildRequest( concurrency uint, oldKeyspace []byte, newKeyspace []byte, - resourceGroupName, explicitRequestSourceType string, + resourceGroupName string, + requestSource util.RequestSource, ) ([]*kv.Request, error) { reqs := make([]*kv.Request, 0) req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency, - oldKeyspace, newKeyspace, resourceGroupName, explicitRequestSourceType) + oldKeyspace, newKeyspace, resourceGroupName, requestSource) if err != nil { return nil, errors.Trace(err) } @@ -193,7 +201,7 @@ func buildRequest( } req, err = buildIndexRequest( tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency, - oldKeyspace, newKeyspace, resourceGroupName, explicitRequestSourceType) + oldKeyspace, newKeyspace, resourceGroupName, requestSource) if err != nil { return nil, errors.Trace(err) } @@ -212,7 +220,8 @@ func buildTableRequest( concurrency uint, oldKeyspace []byte, newKeyspace []byte, - resourceGroupName, explicitRequestSourceType string, + resourceGroupName string, + requestSource util.RequestSource, ) (*kv.Request, error) { var rule *tipb.ChecksumRewriteRule if oldTable != nil { @@ -243,7 +252,7 @@ func buildTableRequest( SetChecksumRequest(checksum). SetConcurrency(int(concurrency)). SetResourceGroupName(resourceGroupName). - SetExplicitRequestSourceType(explicitRequestSourceType). + SetRequestSource(requestSource). Build() } @@ -256,7 +265,8 @@ func buildIndexRequest( concurrency uint, oldKeyspace []byte, newKeyspace []byte, - resourceGroupName, ExplicitRequestSourceType string, + resourceGroupName string, + requestSource util.RequestSource, ) (*kv.Request, error) { var rule *tipb.ChecksumRewriteRule if oldIndexInfo != nil { @@ -281,7 +291,7 @@ func buildIndexRequest( SetChecksumRequest(checksum). SetConcurrency(int(concurrency)). SetResourceGroupName(resourceGroupName). - SetExplicitRequestSourceType(ExplicitRequestSourceType). + SetRequestSource(requestSource). Build() } diff --git a/br/pkg/checksum/executor_nokit_test.go b/br/pkg/checksum/executor_nokit_test.go new file mode 100644 index 0000000000000..68a964aa36ca5 --- /dev/null +++ b/br/pkg/checksum/executor_nokit_test.go @@ -0,0 +1,34 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checksum + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" +) + +func TestBuilderRequestSource(t *testing.T) { + b := NewExecutorBuilder(nil, 0) + require.Equal(t, util.RequestSource{}, b.requestSource) + + b.SetExplicitRequestSourceType("aaa") + require.EqualValues(t, util.RequestSource{ExplicitRequestSourceType: "aaa"}, b.requestSource) + + reqSource := util.RequestSource{RequestSourceInternal: true, RequestSourceType: "type", ExplicitRequestSourceType: "bbb"} + b.SetRequestSource(reqSource) + require.EqualValues(t, reqSource, b.requestSource) +} diff --git a/br/pkg/checksum/main_test.go b/br/pkg/checksum/main_test.go index d68585d6a152b..2aab3587c728a 100644 --- a/br/pkg/checksum/main_test.go +++ b/br/pkg/checksum/main_test.go @@ -28,6 +28,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), goleak.IgnoreTopFunction("github.com/klauspost/compress/zstd.(*blockDec).startDecoder"), goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), } testsetup.SetupForCommonTest() goleak.VerifyTestMain(m, opts...) diff --git a/pkg/distsql/request_builder.go b/pkg/distsql/request_builder.go index 139e6a90192bf..4560e067e556d 100644 --- a/pkg/distsql/request_builder.go +++ b/pkg/distsql/request_builder.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/util" ) // RequestBuilder is used to build a "kv.Request". @@ -397,6 +398,12 @@ func (builder *RequestBuilder) SetResourceGroupName(name string) *RequestBuilder return builder } +// SetRequestSource sets the request source. +func (builder *RequestBuilder) SetRequestSource(reqSource util.RequestSource) *RequestBuilder { + builder.RequestSource = reqSource + return builder +} + // SetExplicitRequestSourceType sets the explicit request source type. func (builder *RequestBuilder) SetExplicitRequestSourceType(sourceType string) *RequestBuilder { builder.RequestSource.ExplicitRequestSourceType = sourceType diff --git a/pkg/lightning/backend/local/checksum.go b/pkg/lightning/backend/local/checksum.go index f25138ba9c264..1fa8fc9a24b8b 100644 --- a/pkg/lightning/backend/local/checksum.go +++ b/pkg/lightning/backend/local/checksum.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tipb/go-tipb" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" + tikvutil "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" pderrs "github.com/tikv/pd/client/errs" "go.uber.org/atomic" @@ -279,12 +280,12 @@ func updateGCLifeTime(ctx context.Context, db *sql.DB, gcLifeTime string) error // TiKVChecksumManager is a manager that can compute checksum of a table using TiKV. type TiKVChecksumManager struct { - client kv.Client - manager *gcTTLManager - distSQLScanConcurrency uint - backoffWeight int - resourceGroupName string - explicitRequestSourceType string + client kv.Client + manager *gcTTLManager + distSQLScanConcurrency uint + backoffWeight int + resourceGroupName string + requestSource tikvutil.RequestSource } var _ ChecksumManager = &TiKVChecksumManager{} @@ -292,12 +293,14 @@ var _ ChecksumManager = &TiKVChecksumManager{} // NewTiKVChecksumManager return a new tikv checksum manager func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int, resourceGroupName, explicitRequestSourceType string) *TiKVChecksumManager { return &TiKVChecksumManager{ - client: client, - manager: newGCTTLManager(pdClient, lightningServicePrefix), - distSQLScanConcurrency: distSQLScanConcurrency, - backoffWeight: backoffWeight, - resourceGroupName: resourceGroupName, - explicitRequestSourceType: explicitRequestSourceType, + client: client, + manager: newGCTTLManager(pdClient, lightningServicePrefix), + distSQLScanConcurrency: distSQLScanConcurrency, + backoffWeight: backoffWeight, + resourceGroupName: resourceGroupName, + requestSource: tikvutil.RequestSource{ + ExplicitRequestSourceType: explicitRequestSourceType, + }, } } @@ -305,12 +308,15 @@ func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon func NewTiKVChecksumManagerForImportInto(store kv.Storage, taskID int64, distSQLScanConcurrency uint, backoffWeight int, resourceGroupName string) *TiKVChecksumManager { prefix := fmt.Sprintf("%s-%d", importIntoServicePrefix, taskID) return &TiKVChecksumManager{ - client: store.GetClient(), - manager: newGCTTLManager(store.(kv.StorageWithPD).GetPDClient(), prefix), - distSQLScanConcurrency: distSQLScanConcurrency, - backoffWeight: backoffWeight, - resourceGroupName: resourceGroupName, - explicitRequestSourceType: importIntoServicePrefix, + client: store.GetClient(), + manager: newGCTTLManager(store.(kv.StorageWithPD).GetPDClient(), prefix), + distSQLScanConcurrency: distSQLScanConcurrency, + backoffWeight: backoffWeight, + resourceGroupName: resourceGroupName, + requestSource: tikvutil.RequestSource{ + RequestSourceInternal: true, + RequestSourceType: kv.InternalImportInto, + }, } } @@ -319,7 +325,7 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo SetConcurrency(e.distSQLScanConcurrency). SetBackoffWeight(e.backoffWeight). SetResourceGroupName(e.resourceGroupName). - SetExplicitRequestSourceType(e.explicitRequestSourceType). + SetRequestSource(e.requestSource). Build() if err != nil { return nil, errors.Trace(err)