@@ -16,6 +16,8 @@ package checker
16
16
17
17
import (
18
18
"fmt"
19
+ "math/rand"
20
+ "time"
19
21
20
22
"github.com/pingcap/kvproto/pkg/metapb"
21
23
"github.com/pingcap/log"
@@ -61,17 +63,31 @@ var (
61
63
// Location management, mainly used for cross data center deployment.
62
64
type ReplicaChecker struct {
63
65
PauseController
66
+ << << << < HEAD
64
67
cluster schedule.Cluster
65
68
conf config.Config
66
69
regionWaitingList cache.Cache
70
+ == == == =
71
+ cluster sche.CheckerCluster
72
+ conf config.CheckerConfigProvider
73
+ pendingProcessedRegions * cache.TTLUint64
74
+ r * rand.Rand
75
+ >> >> >> > 25 dedabf5 (* : reduce rand NewSource (#8675 ))
67
76
}
68
77
69
78
// NewReplicaChecker creates a replica checker.
70
79
func NewReplicaChecker (cluster schedule.Cluster , conf config.Config , regionWaitingList cache.Cache ) * ReplicaChecker {
71
80
return & ReplicaChecker {
81
+ << << << < HEAD
72
82
cluster : cluster ,
73
83
conf : conf ,
74
84
regionWaitingList : regionWaitingList ,
85
+ == == == =
86
+ cluster : cluster ,
87
+ conf : conf ,
88
+ pendingProcessedRegions : pendingProcessedRegions ,
89
+ r : rand .New (rand .NewSource (time .Now ().UnixNano ())),
90
+ >> >> >> > 25 dedabf5 (* : reduce rand NewSource (#8675 ))
75
91
}
76
92
}
77
93
@@ -81,40 +97,40 @@ func (r *ReplicaChecker) GetType() string {
81
97
}
82
98
83
99
// Check verifies a region's replicas, creating an operator.Operator if need.
84
- func (r * ReplicaChecker ) Check (region * core.RegionInfo ) * operator.Operator {
100
+ func (c * ReplicaChecker ) Check (region * core .RegionInfo ) * operator.Operator {
85
101
replicaCheckerCounter .Inc ()
86
- if r .IsPaused () {
102
+ if c .IsPaused () {
87
103
replicaCheckerPausedCounter .Inc ()
88
104
return nil
89
105
}
90
- if op := r .checkDownPeer (region ); op != nil {
106
+ if op := c .checkDownPeer (region ); op != nil {
91
107
replicaCheckerNewOpCounter .Inc ()
92
108
op .SetPriorityLevel (constant .High )
93
109
return op
94
110
}
95
- if op := r .checkOfflinePeer (region ); op != nil {
111
+ if op := c .checkOfflinePeer (region ); op != nil {
96
112
replicaCheckerNewOpCounter .Inc ()
97
113
op .SetPriorityLevel (constant .High )
98
114
return op
99
115
}
100
- if op := r .checkMakeUpReplica (region ); op != nil {
116
+ if op := c .checkMakeUpReplica (region ); op != nil {
101
117
replicaCheckerNewOpCounter .Inc ()
102
118
op .SetPriorityLevel (constant .High )
103
119
return op
104
120
}
105
- if op := r .checkRemoveExtraReplica (region ); op != nil {
121
+ if op := c .checkRemoveExtraReplica (region ); op != nil {
106
122
replicaCheckerNewOpCounter .Inc ()
107
123
return op
108
124
}
109
- if op := r .checkLocationReplacement (region ); op != nil {
125
+ if op := c .checkLocationReplacement (region ); op != nil {
110
126
replicaCheckerNewOpCounter .Inc ()
111
127
return op
112
128
}
113
129
return nil
114
130
}
115
131
116
- func (r * ReplicaChecker ) checkDownPeer (region * core.RegionInfo ) * operator.Operator {
117
- if ! r .conf .IsRemoveDownReplicaEnabled () {
132
+ func (c * ReplicaChecker ) checkDownPeer (region * core .RegionInfo ) * operator.Operator {
133
+ if ! c .conf .IsRemoveDownReplicaEnabled () {
118
134
return nil
119
135
}
120
136
@@ -124,22 +140,22 @@ func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operat
124
140
continue
125
141
}
126
142
storeID := peer .GetStoreId ()
127
- store := r .cluster .GetStore (storeID )
143
+ store := c .cluster .GetStore (storeID )
128
144
if store == nil {
129
145
log .Warn ("lost the store, maybe you are recovering the PD cluster" , zap .Uint64 ("store-id" , storeID ))
130
146
return nil
131
147
}
132
148
// Only consider the state of the Store, not `stats.DownSeconds`.
133
- if store .DownTime () < r .conf .GetMaxStoreDownTime () {
149
+ if store .DownTime () < c .conf .GetMaxStoreDownTime () {
134
150
continue
135
151
}
136
- return r .fixPeer (region , storeID , downStatus )
152
+ return c .fixPeer (region , storeID , downStatus )
137
153
}
138
154
return nil
139
155
}
140
156
141
- func (r * ReplicaChecker ) checkOfflinePeer (region * core.RegionInfo ) * operator.Operator {
142
- if ! r .conf .IsReplaceOfflineReplicaEnabled () {
157
+ func (c * ReplicaChecker ) checkOfflinePeer (region * core .RegionInfo ) * operator.Operator {
158
+ if ! c .conf .IsReplaceOfflineReplicaEnabled () {
143
159
return nil
144
160
}
145
161
@@ -150,7 +166,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
150
166
151
167
for _, peer := range region .GetPeers () {
152
168
storeID := peer .GetStoreId ()
153
- store := r .cluster .GetStore (storeID )
169
+ store := c .cluster .GetStore (storeID )
154
170
if store == nil {
155
171
log .Warn ("lost the store, maybe you are recovering the PD cluster" , zap .Uint64 ("store-id" , storeID ))
156
172
return nil
@@ -159,71 +175,79 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
159
175
continue
160
176
}
161
177
162
- return r .fixPeer (region , storeID , offlineStatus )
178
+ return c .fixPeer (region , storeID , offlineStatus )
163
179
}
164
180
165
181
return nil
166
182
}
167
183
168
- func (r * ReplicaChecker ) checkMakeUpReplica (region * core.RegionInfo ) * operator.Operator {
169
- if ! r .conf .IsMakeUpReplicaEnabled () {
184
+ func (c * ReplicaChecker ) checkMakeUpReplica (region * core .RegionInfo ) * operator.Operator {
185
+ if ! c .conf .IsMakeUpReplicaEnabled () {
170
186
return nil
171
187
}
172
- if len (region .GetPeers ()) >= r .conf .GetMaxReplicas () {
188
+ if len (region .GetPeers ()) >= c .conf .GetMaxReplicas () {
173
189
return nil
174
190
}
175
191
log .Debug ("region has fewer than max replicas" , zap .Uint64 ("region-id" , region .GetID ()), zap .Int ("peers" , len (region .GetPeers ())))
176
- regionStores := r .cluster .GetRegionStores (region )
177
- target , filterByTempState := r .strategy (region ).SelectStoreToAdd (regionStores )
192
+ regionStores := c .cluster .GetRegionStores (region )
193
+ target , filterByTempState := c .strategy (c . r , region ).SelectStoreToAdd (regionStores )
178
194
if target == 0 {
179
195
log .Debug ("no store to add replica" , zap .Uint64 ("region-id" , region .GetID ()))
180
196
replicaCheckerNoTargetStoreCounter .Inc ()
181
197
if filterByTempState {
198
+ << << << < HEAD
182
199
r .regionWaitingList .Put (region .GetID (), nil )
200
+ == == == =
201
+ c .pendingProcessedRegions .Put (region .GetID (), nil )
202
+ >> >> >> > 25 dedabf5 (* : reduce rand NewSource (#8675 ))
183
203
}
184
204
return nil
185
205
}
186
206
newPeer := & metapb.Peer {StoreId : target }
187
- op , err := operator .CreateAddPeerOperator ("make-up-replica" , r .cluster , region , newPeer , operator .OpReplica )
207
+ op , err := operator .CreateAddPeerOperator ("make-up-replica" , c .cluster , region , newPeer , operator .OpReplica )
188
208
if err != nil {
189
209
log .Debug ("create make-up-replica operator fail" , errs .ZapError (err ))
190
210
return nil
191
211
}
192
212
return op
193
213
}
194
214
195
- func (r * ReplicaChecker ) checkRemoveExtraReplica (region * core.RegionInfo ) * operator.Operator {
196
- if ! r .conf .IsRemoveExtraReplicaEnabled () {
215
+ func (c * ReplicaChecker ) checkRemoveExtraReplica (region * core .RegionInfo ) * operator.Operator {
216
+ if ! c .conf .IsRemoveExtraReplicaEnabled () {
197
217
return nil
198
218
}
199
219
// when add learner peer, the number of peer will exceed max replicas for a while,
200
220
// just comparing the the number of voters to avoid too many cancel add operator log.
201
- if len (region .GetVoters ()) <= r .conf .GetMaxReplicas () {
221
+ if len (region .GetVoters ()) <= c .conf .GetMaxReplicas () {
202
222
return nil
203
223
}
204
224
log .Debug ("region has more than max replicas" , zap .Uint64 ("region-id" , region .GetID ()), zap .Int ("peers" , len (region .GetPeers ())))
205
- regionStores := r .cluster .GetRegionStores (region )
206
- old := r .strategy (region ).SelectStoreToRemove (regionStores )
225
+ regionStores := c .cluster .GetRegionStores (region )
226
+ old := c .strategy (c . r , region ).SelectStoreToRemove (regionStores )
207
227
if old == 0 {
208
228
replicaCheckerNoWorstPeerCounter .Inc ()
229
+ << << << < HEAD
209
230
r .regionWaitingList .Put (region .GetID (), nil )
231
+ == == == =
232
+ c .pendingProcessedRegions .Put (region .GetID (), nil )
233
+ >> >> >> > 25 dedabf5 (* : reduce rand NewSource (#8675 ))
210
234
return nil
211
235
}
212
- op , err := operator .CreateRemovePeerOperator ("remove-extra-replica" , r .cluster , operator .OpReplica , region , old )
236
+ op , err := operator .CreateRemovePeerOperator ("remove-extra-replica" , c .cluster , operator .OpReplica , region , old )
213
237
if err != nil {
214
238
replicaCheckerCreateOpFailedCounter .Inc ()
215
239
return nil
216
240
}
217
241
return op
218
242
}
219
243
220
- func (r * ReplicaChecker ) checkLocationReplacement (region * core.RegionInfo ) * operator.Operator {
221
- if ! r .conf .IsLocationReplacementEnabled () {
244
+ func (c * ReplicaChecker ) checkLocationReplacement (region * core .RegionInfo ) * operator.Operator {
245
+ if ! c .conf .IsLocationReplacementEnabled () {
222
246
return nil
223
247
}
224
248
225
- strategy := r .strategy (region )
226
- regionStores := r .cluster .GetRegionStores (region )
249
+ strategy := c .strategy (c . r , region )
250
+ regionStores := c .cluster .GetRegionStores (region )
227
251
oldStore := strategy.SelectStoreToRemove (regionStores )
228
252
if oldStore == 0 {
229
253
replicaCheckerAllRightCounter .Inc ()
@@ -237,19 +261,19 @@ func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *oper
237
261
}
238
262
239
263
newPeer := & metapb.Peer {StoreId : newStore }
240
- op , err := operator .CreateMovePeerOperator ("move-to-better-location" , r .cluster , region , operator .OpReplica , oldStore , newPeer )
264
+ op , err := operator .CreateMovePeerOperator ("move-to-better-location" , c .cluster , region , operator .OpReplica , oldStore , newPeer )
241
265
if err != nil {
242
266
replicaCheckerCreateOpFailedCounter .Inc ()
243
267
return nil
244
268
}
245
269
return op
246
270
}
247
271
248
- func (r * ReplicaChecker ) fixPeer (region * core.RegionInfo , storeID uint64 , status string ) * operator.Operator {
272
+ func (c * ReplicaChecker ) fixPeer (region * core .RegionInfo , storeID uint64 , status string ) * operator.Operator {
249
273
// Check the number of replicas first.
250
- if len (region .GetVoters ()) > r .conf .GetMaxReplicas () {
274
+ if len (region .GetVoters ()) > c .conf .GetMaxReplicas () {
251
275
removeExtra := fmt .Sprintf ("remove-extra-%s-replica" , status )
252
- op , err := operator .CreateRemovePeerOperator (removeExtra , r .cluster , operator .OpReplica , region , storeID )
276
+ op , err := operator .CreateRemovePeerOperator (removeExtra , c .cluster , operator .OpReplica , region , storeID )
253
277
if err != nil {
254
278
if status == offlineStatus {
255
279
replicaCheckerRemoveExtraOfflineFailedCounter .Inc ()
@@ -261,8 +285,8 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
261
285
return op
262
286
}
263
287
264
- regionStores := r .cluster .GetRegionStores (region )
265
- target , filterByTempState := r .strategy (region ).SelectStoreToFix (regionStores , storeID )
288
+ regionStores := c .cluster .GetRegionStores (region )
289
+ target , filterByTempState := c .strategy (c . r , region ).SelectStoreToFix (regionStores , storeID )
266
290
if target == 0 {
267
291
if status == offlineStatus {
268
292
replicaCheckerNoStoreOfflineCounter .Inc ()
@@ -271,13 +295,17 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
271
295
}
272
296
log .Debug ("no best store to add replica" , zap .Uint64 ("region-id" , region .GetID ()))
273
297
if filterByTempState {
298
+ << << << < HEAD
274
299
r .regionWaitingList .Put (region .GetID (), nil )
300
+ == == == =
301
+ c .pendingProcessedRegions .Put (region .GetID (), nil )
302
+ >> >> >> > 25 dedabf5 (* : reduce rand NewSource (#8675 ))
275
303
}
276
304
return nil
277
305
}
278
306
newPeer := & metapb.Peer {StoreId : target }
279
307
replace := fmt .Sprintf ("replace-%s-replica" , status )
280
- op , err := operator .CreateMovePeerOperator (replace , r .cluster , region , operator .OpReplica , storeID , newPeer )
308
+ op , err := operator .CreateMovePeerOperator (replace , c .cluster , region , operator .OpReplica , storeID , newPeer )
281
309
if err != nil {
282
310
if status == offlineStatus {
283
311
replicaCheckerReplaceOfflineFailedCounter .Inc ()
@@ -289,12 +317,20 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
289
317
return op
290
318
}
291
319
292
- func (r * ReplicaChecker ) strategy (region * core.RegionInfo ) * ReplicaStrategy {
320
+ func (c * ReplicaChecker ) strategy (r * rand . Rand , region * core .RegionInfo ) * ReplicaStrategy {
293
321
return & ReplicaStrategy {
322
+ << << << < HEAD
294
323
checkerName : replicaCheckerName ,
295
324
cluster : r .cluster ,
296
325
locationLabels : r .conf .GetLocationLabels (),
297
326
isolationLevel : r .conf .GetIsolationLevel (),
327
+ == == == =
328
+ checkerName : c .Name (),
329
+ cluster : c .cluster ,
330
+ locationLabels : c .conf .GetLocationLabels (),
331
+ isolationLevel : c .conf .GetIsolationLevel (),
332
+ >> >> >> > 25 dedabf5 (* : reduce rand NewSource (#8675 ))
298
333
region : region ,
334
+ r : r ,
299
335
}
300
336
}
0 commit comments