Skip to content

Commit 30c2ca0

Browse files
authored
store: remove stores that have no region before balance (#52314) (#53573)
close #52313
1 parent ffc8ca3 commit 30c2ca0

File tree

3 files changed

+127
-35
lines changed

3 files changed

+127
-35
lines changed

pkg/store/copr/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ go_test(
8282
embed = [":copr"],
8383
flaky = True,
8484
race = "on",
85-
shard_count = 29,
85+
shard_count = 30,
8686
deps = [
8787
"//pkg/kv",
8888
"//pkg/store/driver/backoff",
@@ -92,6 +92,7 @@ go_test(
9292
"//pkg/util/trxevents",
9393
"@com_github_pingcap_errors//:errors",
9494
"@com_github_pingcap_kvproto//pkg/coprocessor",
95+
"@com_github_pingcap_kvproto//pkg/metapb",
9596
"@com_github_pingcap_kvproto//pkg/mpp",
9697
"@com_github_stathat_consistent//:consistent",
9798
"@com_github_stretchr_testify//require",

pkg/store/copr/batch_coprocessor.go

Lines changed: 85 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
304304
//
305305
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
306306
// If balanceWithContinuity is true, the second balance strategy is enable.
307-
func balanceBatchCopTask(ctx context.Context, aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
307+
func balanceBatchCopTask(aliveStores []*tikv.Store, originalTasks []*batchCopTask, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
308308
if len(originalTasks) == 0 {
309309
log.Info("Batch cop task balancer got an empty task set.")
310310
return originalTasks
@@ -819,12 +819,24 @@ func filterAllStoresAccordingToTiFlashReplicaRead(allStores []uint64, aliveStore
819819
return
820820
}
821821

822+
func getAllUsedTiFlashStores(allTiFlashStores []*tikv.Store, allUsedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
823+
allUsedTiFlashStores := make([]*tikv.Store, 0, len(allUsedTiFlashStoresMap))
824+
for _, store := range allTiFlashStores {
825+
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
826+
if ok {
827+
allUsedTiFlashStores = append(allUsedTiFlashStores, store)
828+
}
829+
}
830+
return allUsedTiFlashStores
831+
}
832+
822833
// getAliveStoresAndStoreIDs gets alive TiFlash stores and their IDs.
823834
// If tiflashReplicaReadPolicy is not all_replicas, it will also return the IDs of the alive TiFlash stores in TiDB zone.
824-
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
835+
func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, allUsedTiFlashStoresMap map[uint64]struct{}, ttl time.Duration, store *kvStore, tiflashReplicaReadPolicy tiflash.ReplicaRead, tidbZone string) (aliveStores *aliveStoresBundle) {
825836
aliveStores = new(aliveStoresBundle)
826837
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
827-
aliveStores.storesInAllZones = filterAliveStores(ctx, allTiFlashStores, ttl, store)
838+
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
839+
aliveStores.storesInAllZones = filterAliveStores(ctx, allUsedTiFlashStores, ttl, store)
828840

829841
if !tiflashReplicaReadPolicy.IsAllReplicas() {
830842
aliveStores.storeIDsInTiDBZone = make(map[uint64]struct{}, len(aliveStores.storesInAllZones))
@@ -849,11 +861,28 @@ func getAliveStoresAndStoreIDs(ctx context.Context, cache *RegionCache, ttl time
849861
// 1. tiflash_replica_read policy
850862
// 2. whether the store is alive
851863
// After filtering, it will build the RegionInfo.
852-
func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer, task *copTask, rpcCtx *tikv.RPCContext, aliveStores *aliveStoresBundle, isTiDBLabelZoneSet bool, tiflashReplicaReadPolicy tiflash.ReplicaRead, regionInfoNeedsReloadOnSendFail []RegionInfo, regionsInOtherZones []uint64, maxRemoteReadCountAllowed int, tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
864+
func filterAccessibleStoresAndBuildRegionInfo(
865+
cache *RegionCache,
866+
allStores []uint64,
867+
bo *Backoffer,
868+
task *copTask,
869+
rpcCtx *tikv.RPCContext,
870+
aliveStores *aliveStoresBundle,
871+
tiflashReplicaReadPolicy tiflash.ReplicaRead,
872+
regionInfoNeedsReloadOnSendFail []RegionInfo,
873+
regionsInOtherZones []uint64,
874+
maxRemoteReadCountAllowed int,
875+
tidbZone string) (regionInfo RegionInfo, _ []RegionInfo, _ []uint64, err error) {
853876
needCrossZoneAccess := false
854-
allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
855877
allStores, needCrossZoneAccess = filterAllStoresAccordingToTiFlashReplicaRead(allStores, aliveStores, tiflashReplicaReadPolicy)
856-
regionInfo = RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores, PartitionIndex: task.partitionIndex}
878+
879+
regionInfo = RegionInfo{
880+
Region: task.region,
881+
Meta: rpcCtx.Meta,
882+
Ranges: task.ranges,
883+
AllStores: allStores,
884+
PartitionIndex: task.partitionIndex}
885+
857886
if needCrossZoneAccess {
858887
regionsInOtherZones = append(regionsInOtherZones, task.region.GetID())
859888
regionInfoNeedsReloadOnSendFail = append(regionInfoNeedsReloadOnSendFail, regionInfo)
@@ -862,7 +891,9 @@ func filterAccessibleStoresAndBuildRegionInfo(cache *RegionCache, bo *Backoffer,
862891
for i := 0; i < 3 && i < len(regionsInOtherZones); i++ {
863892
regionIDErrMsg += fmt.Sprintf("%d, ", regionsInOtherZones[i])
864893
}
865-
err = errors.Errorf("no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc", len(regionsInOtherZones), tidbZone, regionIDErrMsg)
894+
err = errors.Errorf(
895+
"no less than %d region(s) can not be accessed by TiFlash in the zone [%s]: %setc",
896+
len(regionsInOtherZones), tidbZone, regionIDErrMsg)
866897
// We need to reload the region cache here to avoid the failure throughout the region cache refresh TTL.
867898
cache.OnSendFailForBatchRegions(bo, rpcCtx.Store, regionInfoNeedsReloadOnSendFail, true, err)
868899
return regionInfo, nil, nil, err
@@ -895,10 +926,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
895926
if !isTiDBLabelZoneSet {
896927
tiflashReplicaReadPolicy = tiflash.AllReplicas
897928
}
898-
aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, ttl, store, tiflashReplicaReadPolicy, tidbZone)
899-
if tiflashReplicaReadPolicy.IsClosestReplicas() {
900-
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
901-
}
929+
902930
for {
903931
var tasks []*copTask
904932
rangesLen = 0
@@ -919,17 +947,16 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
919947
}
920948
}
921949

922-
var batchTasks []*batchCopTask
923-
var regionIDsInOtherZones []uint64
924-
var regionInfosNeedReloadOnSendFail []RegionInfo
925-
storeTaskMap := make(map[string]*batchCopTask)
950+
rpcCtxs := make([]*tikv.RPCContext, 0, len(tasks))
951+
usedTiFlashStores := make([][]uint64, 0, len(tasks))
952+
usedTiFlashStoresMap := make(map[uint64]struct{}, 0)
926953
needRetry := false
927-
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
928954
for _, task := range tasks {
929955
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
930956
if err != nil {
931957
return nil, errors.Trace(err)
932958
}
959+
933960
// When rpcCtx is nil, it's not only attributed to the miss region, but also
934961
// some TiFlash stores crash and can't be recovered.
935962
// That is not an error that can be easily recovered, so we regard this error
@@ -941,36 +968,62 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
941968
// Then `splitRegion` will reloads these regions.
942969
continue
943970
}
971+
972+
allStores, _ := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store, tikv.LabelFilterNoTiFlashWriteNode)
973+
for _, storeID := range allStores {
974+
usedTiFlashStoresMap[storeID] = struct{}{}
975+
}
976+
rpcCtxs = append(rpcCtxs, rpcCtx)
977+
usedTiFlashStores = append(usedTiFlashStores, allStores)
978+
}
979+
980+
if needRetry {
981+
// As mentioned above, nil rpcCtx is always attributed to failed stores.
982+
// It's equal to long poll the store but get no response. Here we'd better use
983+
// TiFlash error to trigger the TiKV fallback mechanism.
984+
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
985+
if err != nil {
986+
return nil, errors.Trace(err)
987+
}
988+
continue
989+
}
990+
991+
aliveStores = getAliveStoresAndStoreIDs(bo.GetCtx(), cache, usedTiFlashStoresMap, ttl, store, tiflashReplicaReadPolicy, tidbZone)
992+
if tiflashReplicaReadPolicy.IsClosestReplicas() {
993+
if len(aliveStores.storeIDsInTiDBZone) == 0 {
994+
return nil, errors.Errorf("There is no region in tidb zone(%s)", tidbZone)
995+
}
996+
maxRemoteReadCountAllowed = len(aliveStores.storeIDsInTiDBZone) * tiflash.MaxRemoteReadCountPerNodeForClosestReplicas
997+
}
998+
999+
var batchTasks []*batchCopTask
1000+
var regionIDsInOtherZones []uint64
1001+
var regionInfosNeedReloadOnSendFail []RegionInfo
1002+
storeTaskMap := make(map[string]*batchCopTask)
1003+
storeIDsUnionSetForAllTasks := make(map[uint64]struct{})
1004+
for idx, task := range tasks {
1005+
var err error
9441006
var regionInfo RegionInfo
945-
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, bo, task, rpcCtx, aliveStores, isTiDBLabelZoneSet, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
1007+
regionInfo, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, err = filterAccessibleStoresAndBuildRegionInfo(cache, usedTiFlashStores[idx], bo, task, rpcCtxs[idx], aliveStores, tiflashReplicaReadPolicy, regionInfosNeedReloadOnSendFail, regionIDsInOtherZones, maxRemoteReadCountAllowed, tidbZone)
9461008
if err != nil {
9471009
return nil, err
9481010
}
949-
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
1011+
if batchCop, ok := storeTaskMap[rpcCtxs[idx].Addr]; ok {
9501012
batchCop.regionInfos = append(batchCop.regionInfos, regionInfo)
9511013
} else {
9521014
batchTask := &batchCopTask{
953-
storeAddr: rpcCtx.Addr,
1015+
storeAddr: rpcCtxs[idx].Addr,
9541016
cmdType: cmdType,
955-
ctx: rpcCtx,
1017+
ctx: rpcCtxs[idx],
9561018
regionInfos: []RegionInfo{regionInfo},
9571019
}
958-
storeTaskMap[rpcCtx.Addr] = batchTask
1020+
storeTaskMap[rpcCtxs[idx].Addr] = batchTask
9591021
}
9601022
for _, storeID := range regionInfo.AllStores {
9611023
storeIDsUnionSetForAllTasks[storeID] = struct{}{}
9621024
}
9631025
}
964-
if needRetry {
965-
// As mentioned above, nil rpcCtx is always attributed to failed stores.
966-
// It's equal to long poll the store but get no response. Here we'd better use
967-
// TiFlash error to trigger the TiKV fallback mechanism.
968-
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
969-
if err != nil {
970-
return nil, errors.Trace(err)
971-
}
972-
continue
973-
}
1026+
9741027
if len(regionIDsInOtherZones) != 0 {
9751028
warningMsg := fmt.Sprintf("total %d region(s) can not be accessed by TiFlash in the zone [%s]:", len(regionIDsInOtherZones), tidbZone)
9761029
regionIDErrMsg := ""
@@ -998,7 +1051,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
9981051
storesUnionSetForAllTasks = append(storesUnionSetForAllTasks, store)
9991052
}
10001053
}
1001-
batchTasks = balanceBatchCopTask(bo.GetCtx(), storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount)
1054+
batchTasks = balanceBatchCopTask(storesUnionSetForAllTasks, batchTasks, balanceWithContinuity, balanceContinuousRegionCount)
10021055
balanceElapsed := time.Since(balanceStart)
10031056
if log.GetLevel() <= zap.DebugLevel {
10041057
msg := "After region balance:"

pkg/store/copr/batch_coprocessor_test.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ import (
2323
"time"
2424

2525
"github.com/pingcap/errors"
26+
"github.com/pingcap/kvproto/pkg/metapb"
2627
"github.com/pingcap/tidb/pkg/kv"
2728
"github.com/pingcap/tidb/pkg/store/driver/backoff"
2829
"github.com/pingcap/tidb/pkg/util/logutil"
2930
"github.com/stathat/consistent"
3031
"github.com/stretchr/testify/require"
32+
"github.com/tikv/client-go/v2/testutils"
3133
"github.com/tikv/client-go/v2/tikv"
34+
"github.com/tikv/client-go/v2/tikvrpc"
3235
"go.uber.org/zap"
3336
)
3437

@@ -125,13 +128,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
125128
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
126129
{
127130
var nilTaskSet []*batchCopTask
128-
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, 0)
131+
nilResult := balanceBatchCopTask(nil, nilTaskSet, false, 0)
129132
require.True(t, nilResult == nil)
130133
}
131134

132135
{
133136
emptyTaskSet := make([]*batchCopTask, 0)
134-
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, 0)
137+
emptyResult := balanceBatchCopTask(nil, emptyTaskSet, false, 0)
135138
require.True(t, emptyResult != nil)
136139
require.True(t, len(emptyResult) == 0)
137140
}
@@ -282,3 +285,38 @@ func TestTopoFetcherBackoff(t *testing.T) {
282285
require.GreaterOrEqual(t, dura, 30*time.Second)
283286
require.LessOrEqual(t, dura, 50*time.Second)
284287
}
288+
289+
func TestGetAllUsedTiFlashStores(t *testing.T) {
290+
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
291+
require.NoError(t, err)
292+
defer func() {
293+
pdClient.Close()
294+
err = mockClient.Close()
295+
require.NoError(t, err)
296+
}()
297+
298+
pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
299+
defer pdCli.Close()
300+
301+
cache := NewRegionCache(tikv.NewRegionCache(pdCli))
302+
defer cache.Close()
303+
304+
label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash}
305+
label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute}
306+
307+
cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
308+
cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
309+
cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
310+
311+
allUsedTiFlashStoresMap := make(map[uint64]struct{})
312+
allUsedTiFlashStoresMap[2] = struct{}{}
313+
allUsedTiFlashStoresMap[3] = struct{}{}
314+
allTiFlashStores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
315+
require.Equal(t, 3, len(allTiFlashStores))
316+
allUsedTiFlashStores := getAllUsedTiFlashStores(allTiFlashStores, allUsedTiFlashStoresMap)
317+
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
318+
for _, store := range allUsedTiFlashStores {
319+
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
320+
require.True(t, ok)
321+
}
322+
}

0 commit comments

Comments
 (0)