Skip to content

Commit 34852ed

Browse files
committed
reduce rand NewSource
Signed-off-by: Ryan Leung <[email protected]>
1 parent 76dc560 commit 34852ed

17 files changed

+94
-76
lines changed

pkg/schedule/checker/replica_checker.go

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package checker
1616

1717
import (
1818
"fmt"
19+
"math/rand"
20+
"time"
1921

2022
"github.com/pingcap/kvproto/pkg/metapb"
2123
"github.com/pingcap/log"
@@ -45,6 +47,7 @@ type ReplicaChecker struct {
4547
cluster sche.CheckerCluster
4648
conf config.CheckerConfigProvider
4749
pendingProcessedRegions *cache.TTLUint64
50+
r *rand.Rand
4851
}
4952

5053
// NewReplicaChecker creates a replica checker.
@@ -53,6 +56,7 @@ func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigPro
5356
cluster: cluster,
5457
conf: conf,
5558
pendingProcessedRegions: pendingProcessedRegions,
59+
r: rand.New(rand.NewSource(time.Now().UnixNano())),
5660
}
5761
}
5862

@@ -67,40 +71,40 @@ func (*ReplicaChecker) GetType() types.CheckerSchedulerType {
6771
}
6872

6973
// Check verifies a region's replicas, creating an operator.Operator if need.
70-
func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
74+
func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
7175
replicaCheckerCounter.Inc()
72-
if r.IsPaused() {
76+
if c.IsPaused() {
7377
replicaCheckerPausedCounter.Inc()
7478
return nil
7579
}
76-
if op := r.checkDownPeer(region); op != nil {
80+
if op := c.checkDownPeer(region); op != nil {
7781
replicaCheckerNewOpCounter.Inc()
7882
op.SetPriorityLevel(constant.High)
7983
return op
8084
}
81-
if op := r.checkOfflinePeer(region); op != nil {
85+
if op := c.checkOfflinePeer(region); op != nil {
8286
replicaCheckerNewOpCounter.Inc()
8387
op.SetPriorityLevel(constant.High)
8488
return op
8589
}
86-
if op := r.checkMakeUpReplica(region); op != nil {
90+
if op := c.checkMakeUpReplica(region); op != nil {
8791
replicaCheckerNewOpCounter.Inc()
8892
op.SetPriorityLevel(constant.High)
8993
return op
9094
}
91-
if op := r.checkRemoveExtraReplica(region); op != nil {
95+
if op := c.checkRemoveExtraReplica(region); op != nil {
9296
replicaCheckerNewOpCounter.Inc()
9397
return op
9498
}
95-
if op := r.checkLocationReplacement(region); op != nil {
99+
if op := c.checkLocationReplacement(region); op != nil {
96100
replicaCheckerNewOpCounter.Inc()
97101
return op
98102
}
99103
return nil
100104
}
101105

102-
func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
103-
if !r.conf.IsRemoveDownReplicaEnabled() {
106+
func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
107+
if !c.conf.IsRemoveDownReplicaEnabled() {
104108
return nil
105109
}
106110

@@ -110,22 +114,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
110114
continue
111115
}
112116
storeID := peer.GetStoreId()
113-
store := r.cluster.GetStore(storeID)
117+
store := c.cluster.GetStore(storeID)
114118
if store == nil {
115119
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
116120
return nil
117121
}
118122
// Only consider the state of the Store, not `stats.DownSeconds`.
119-
if store.DownTime() < r.conf.GetMaxStoreDownTime() {
123+
if store.DownTime() < c.conf.GetMaxStoreDownTime() {
120124
continue
121125
}
122-
return r.fixPeer(region, storeID, downStatus)
126+
return c.fixPeer(region, storeID, downStatus)
123127
}
124128
return nil
125129
}
126130

127-
func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
128-
if !r.conf.IsReplaceOfflineReplicaEnabled() {
131+
func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
132+
if !c.conf.IsReplaceOfflineReplicaEnabled() {
129133
return nil
130134
}
131135

@@ -136,7 +140,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
136140

137141
for _, peer := range region.GetPeers() {
138142
storeID := peer.GetStoreId()
139-
store := r.cluster.GetStore(storeID)
143+
store := c.cluster.GetStore(storeID)
140144
if store == nil {
141145
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
142146
return nil
@@ -145,71 +149,71 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
145149
continue
146150
}
147151

148-
return r.fixPeer(region, storeID, offlineStatus)
152+
return c.fixPeer(region, storeID, offlineStatus)
149153
}
150154

151155
return nil
152156
}
153157

154-
func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
155-
if !r.conf.IsMakeUpReplicaEnabled() {
158+
func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
159+
if !c.conf.IsMakeUpReplicaEnabled() {
156160
return nil
157161
}
158-
if len(region.GetPeers()) >= r.conf.GetMaxReplicas() {
162+
if len(region.GetPeers()) >= c.conf.GetMaxReplicas() {
159163
return nil
160164
}
161165
log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
162-
regionStores := r.cluster.GetRegionStores(region)
163-
target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores)
166+
regionStores := c.cluster.GetRegionStores(region)
167+
target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores)
164168
if target == 0 {
165169
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
166170
replicaCheckerNoTargetStoreCounter.Inc()
167171
if filterByTempState {
168-
r.pendingProcessedRegions.Put(region.GetID(), nil)
172+
c.pendingProcessedRegions.Put(region.GetID(), nil)
169173
}
170174
return nil
171175
}
172176
newPeer := &metapb.Peer{StoreId: target}
173-
op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica)
177+
op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica)
174178
if err != nil {
175179
log.Debug("create make-up-replica operator fail", errs.ZapError(err))
176180
return nil
177181
}
178182
return op
179183
}
180184

181-
func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
182-
if !r.conf.IsRemoveExtraReplicaEnabled() {
185+
func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
186+
if !c.conf.IsRemoveExtraReplicaEnabled() {
183187
return nil
184188
}
185189
// when add learner peer, the number of peer will exceed max replicas for a while,
186190
// just comparing the the number of voters to avoid too many cancel add operator log.
187-
if len(region.GetVoters()) <= r.conf.GetMaxReplicas() {
191+
if len(region.GetVoters()) <= c.conf.GetMaxReplicas() {
188192
return nil
189193
}
190194
log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
191-
regionStores := r.cluster.GetRegionStores(region)
192-
old := r.strategy(region).SelectStoreToRemove(regionStores)
195+
regionStores := c.cluster.GetRegionStores(region)
196+
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
193197
if old == 0 {
194198
replicaCheckerNoWorstPeerCounter.Inc()
195-
r.pendingProcessedRegions.Put(region.GetID(), nil)
199+
c.pendingProcessedRegions.Put(region.GetID(), nil)
196200
return nil
197201
}
198-
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old)
202+
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
199203
if err != nil {
200204
replicaCheckerCreateOpFailedCounter.Inc()
201205
return nil
202206
}
203207
return op
204208
}
205209

206-
func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
207-
if !r.conf.IsLocationReplacementEnabled() {
210+
func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
211+
if !c.conf.IsLocationReplacementEnabled() {
208212
return nil
209213
}
210214

211-
strategy := r.strategy(region)
212-
regionStores := r.cluster.GetRegionStores(region)
215+
strategy := c.strategy(c.r, region)
216+
regionStores := c.cluster.GetRegionStores(region)
213217
oldStore := strategy.SelectStoreToRemove(regionStores)
214218
if oldStore == 0 {
215219
replicaCheckerAllRightCounter.Inc()
@@ -223,19 +227,19 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper
223227
}
224228

225229
newPeer := &metapb.Peer{StoreId: newStore}
226-
op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer)
230+
op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer)
227231
if err != nil {
228232
replicaCheckerCreateOpFailedCounter.Inc()
229233
return nil
230234
}
231235
return op
232236
}
233237

234-
func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
238+
func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
235239
// Check the number of replicas first.
236-
if len(region.GetVoters()) > r.conf.GetMaxReplicas() {
240+
if len(region.GetVoters()) > c.conf.GetMaxReplicas() {
237241
removeExtra := fmt.Sprintf("remove-extra-%s-replica", status)
238-
op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID)
242+
op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID)
239243
if err != nil {
240244
if status == offlineStatus {
241245
replicaCheckerRemoveExtraOfflineFailedCounter.Inc()
@@ -247,8 +251,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
247251
return op
248252
}
249253

250-
regionStores := r.cluster.GetRegionStores(region)
251-
target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID)
254+
regionStores := c.cluster.GetRegionStores(region)
255+
target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID)
252256
if target == 0 {
253257
if status == offlineStatus {
254258
replicaCheckerNoStoreOfflineCounter.Inc()
@@ -257,13 +261,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
257261
}
258262
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
259263
if filterByTempState {
260-
r.pendingProcessedRegions.Put(region.GetID(), nil)
264+
c.pendingProcessedRegions.Put(region.GetID(), nil)
261265
}
262266
return nil
263267
}
264268
newPeer := &metapb.Peer{StoreId: target}
265269
replace := fmt.Sprintf("replace-%s-replica", status)
266-
op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer)
270+
op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer)
267271
if err != nil {
268272
if status == offlineStatus {
269273
replicaCheckerReplaceOfflineFailedCounter.Inc()
@@ -275,12 +279,13 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
275279
return op
276280
}
277281

278-
func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy {
282+
func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
279283
return &ReplicaStrategy{
280-
checkerName: r.Name(),
281-
cluster: r.cluster,
282-
locationLabels: r.conf.GetLocationLabels(),
283-
isolationLevel: r.conf.GetIsolationLevel(),
284+
checkerName: c.Name(),
285+
cluster: c.cluster,
286+
locationLabels: c.conf.GetLocationLabels(),
287+
isolationLevel: c.conf.GetIsolationLevel(),
284288
region: region,
289+
r: r,
285290
}
286291
}

pkg/schedule/checker/replica_strategy.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package checker
1616

1717
import (
18+
"math/rand"
19+
1820
"github.com/pingcap/log"
1921
"github.com/tikv/pd/pkg/core"
2022
"github.com/tikv/pd/pkg/core/constant"
@@ -26,6 +28,7 @@ import (
2628
// ReplicaStrategy collects some utilities to manipulate region peers. It
2729
// exists to allow replica_checker and rule_checker to reuse common logics.
2830
type ReplicaStrategy struct {
31+
r *rand.Rand
2932
checkerName string // replica-checker / rule-checker
3033
cluster sche.CheckerCluster
3134
locationLabels []string
@@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e
7679

7780
isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
7881
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
79-
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
82+
targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()).
8083
FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...).
8184
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
8285
if targetCandidate.Len() == 0 {
@@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
143146
if s.fastFailover {
144147
level = constant.Urgent
145148
}
146-
source := filter.NewCandidates(coLocationStores).
149+
source := filter.NewCandidates(s.r, coLocationStores).
147150
FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
148151
KeepTheTopStores(isolationComparer, true).
149152
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false)

pkg/schedule/checker/rule_checker.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"math"
21+
"math/rand"
2122
"time"
2223

2324
"github.com/pingcap/kvproto/pkg/metapb"
@@ -56,6 +57,7 @@ type RuleChecker struct {
5657
pendingList cache.Cache
5758
switchWitnessCache *cache.TTLUint64
5859
record *recorder
60+
r *rand.Rand
5961
}
6062

6163
// NewRuleChecker creates a checker instance.
@@ -67,6 +69,7 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage
6769
pendingList: cache.NewDefaultCache(maxPendingListLen),
6870
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()),
6971
record: newRecord(),
72+
r: rand.New(rand.NewSource(time.Now().UnixNano())),
7073
}
7174
}
7275

@@ -201,7 +204,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
201204
ruleStores := c.getRuleFitStores(rf)
202205
isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
203206
// If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic.
204-
store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
207+
store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
205208
if store == 0 {
206209
ruleCheckerNoStoreAddCounter.Inc()
207210
c.handleFilterState(region, filterByTempState)
@@ -252,7 +255,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla
252255
fastFailover = false
253256
}
254257
ruleStores := c.getRuleFitStores(rf)
255-
store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
258+
store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
256259
if store == 0 {
257260
ruleCheckerNoStoreReplaceCounter.Inc()
258261
c.handleFilterState(region, filterByTempState)
@@ -393,7 +396,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R
393396

394397
isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
395398
// If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic.
396-
strategy := c.strategy(region, rf.Rule, isWitness)
399+
strategy := c.strategy(c.r, region, rf.Rule, isWitness)
397400
ruleStores := c.getRuleFitStores(rf)
398401
oldStore := strategy.SelectStoreToRemove(ruleStores)
399402
if oldStore == 0 {
@@ -618,7 +621,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb.
618621
return nil, false
619622
}
620623

621-
func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
624+
func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
622625
return &ReplicaStrategy{
623626
checkerName: c.Name(),
624627
cluster: c.cluster,
@@ -627,6 +630,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa
627630
region: region,
628631
extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)},
629632
fastFailover: fastFailover,
633+
r: r,
630634
}
631635
}
632636

pkg/schedule/filter/candidates.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package filter
1717
import (
1818
"math/rand"
1919
"sort"
20-
"time"
2120

2221
"github.com/tikv/pd/pkg/core"
2322
"github.com/tikv/pd/pkg/schedule/config"
@@ -32,8 +31,8 @@ type StoreCandidates struct {
3231
}
3332

3433
// NewCandidates creates StoreCandidates with store list.
35-
func NewCandidates(stores []*core.StoreInfo) *StoreCandidates {
36-
return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores}
34+
func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates {
35+
return &StoreCandidates{r: r, Stores: stores}
3736
}
3837

3938
// FilterSource keeps stores that can pass all source filters.

0 commit comments

Comments
 (0)