Skip to content

Commit 0062953

Browse files
authored
snap_restore: resend recover_region while there are TiKV restarts (#45361) (#45722)
close #45206
1 parent 015241b commit 0062953

File tree

7 files changed

+373
-34
lines changed

7 files changed

+373
-34
lines changed

br/pkg/conn/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type Mgr struct {
7171
}
7272

7373
func GetAllTiKVStoresWithRetry(ctx context.Context,
74-
pdClient pd.Client,
74+
pdClient util.StoreMeta,
7575
storeBehavior util.StoreBehavior,
7676
) ([]*metapb.Store, error) {
7777
stores := make([]*metapb.Store, 0)

br/pkg/conn/util/util.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,20 @@ const (
2828
TiFlashOnly StoreBehavior = 2
2929
)
3030

31+
// StoreMeta is the required interface for a watcher.
32+
// It is striped from pd.Client.
33+
type StoreMeta interface {
34+
// GetAllStores gets all stores from pd.
35+
// The store may expire later. Caller is responsible for caching and taking care
36+
// of store change.
37+
GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error)
38+
}
39+
3140
// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
3241
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
3342
func GetAllTiKVStores(
3443
ctx context.Context,
35-
pdClient pd.Client,
44+
pdClient StoreMeta,
3645
storeBehavior StoreBehavior,
3746
) ([]*metapb.Store, error) {
3847
// get all live stores.

br/pkg/restore/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ go_library(
4545
"//br/pkg/summary",
4646
"//br/pkg/utils",
4747
"//br/pkg/utils/iter",
48+
"//br/pkg/utils/storewatch",
4849
"//br/pkg/version",
4950
"//config",
5051
"//ddl",

br/pkg/restore/data.go

Lines changed: 85 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package restore
44
import (
55
"context"
66
"io"
7+
"time"
78

89
"github.com/pingcap/errors"
910
"github.com/pingcap/kvproto/pkg/metapb"
@@ -12,7 +13,9 @@ import (
1213
"github.com/pingcap/tidb/br/pkg/common"
1314
"github.com/pingcap/tidb/br/pkg/conn"
1415
"github.com/pingcap/tidb/br/pkg/glue"
16+
"github.com/pingcap/tidb/br/pkg/logutil"
1517
"github.com/pingcap/tidb/br/pkg/utils"
18+
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
1619
"github.com/pingcap/tidb/ddl"
1720
"github.com/pingcap/tidb/util/mathutil"
1821
tikvstore "github.com/tikv/client-go/v2/kv"
@@ -48,6 +51,9 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor
4851
return totalRegions, errors.Trace(err)
4952
}
5053

54+
// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
55+
// This wathcher will retrigger `RecoveryRegions` for those stores.
56+
recovery.SpawnTiKVShutDownWatchers(ctx)
5157
if err := recovery.RecoverRegions(ctx); err != nil {
5258
return totalRegions, errors.Trace(err)
5359
}
@@ -213,6 +219,39 @@ func (recovery *Recovery) GetTotalRegions() int {
213219
return len(regions)
214220
}
215221

222+
func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {
223+
storeAddr := getStoreAddress(recovery.allStores, storeID)
224+
recoveryClient, conn, err := recovery.newRecoveryClient(ctx, storeAddr)
225+
if err != nil {
226+
log.Error("create tikv client failed", zap.Uint64("store id", storeID))
227+
return errors.Trace(err)
228+
}
229+
defer conn.Close()
230+
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeID))
231+
stream, err := recoveryClient.RecoverRegion(ctx)
232+
if err != nil {
233+
log.Error("create recover region failed", zap.Uint64("store id", storeID))
234+
return errors.Trace(err)
235+
}
236+
237+
// for a TiKV, send the stream
238+
for _, s := range plan {
239+
if err = stream.Send(s); err != nil {
240+
log.Error("send recover region failed", zap.Error(err))
241+
return errors.Trace(err)
242+
}
243+
}
244+
245+
reply, err := stream.CloseAndRecv()
246+
if err != nil {
247+
log.Error("close the stream failed")
248+
return errors.Trace(err)
249+
}
250+
recovery.progress.Inc()
251+
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
252+
return nil
253+
}
254+
216255
// RecoverRegions send the recovery plan to recovery region (force leader etc)
217256
// only tikvs have regions whose have to recover be sent
218257
func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
@@ -224,46 +263,60 @@ func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
224263
if err := ectx.Err(); err != nil {
225264
break
226265
}
266+
storeId := storeId
267+
plan := plan
227268

228-
storeAddr := getStoreAddress(recovery.allStores, storeId)
229-
recoveryPlan := plan
230-
recoveryStoreId := storeId
231269
workers.ApplyOnErrorGroup(eg, func() error {
232-
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
233-
if err != nil {
234-
log.Error("create tikv client failed", zap.Uint64("store id", recoveryStoreId))
235-
return errors.Trace(err)
236-
}
237-
defer conn.Close()
238-
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", recoveryStoreId))
239-
stream, err := recoveryClient.RecoverRegion(ectx)
240-
if err != nil {
241-
log.Error("create recover region failed", zap.Uint64("store id", recoveryStoreId))
242-
return errors.Trace(err)
243-
}
244-
245-
// for a TiKV, send the stream
246-
for _, s := range recoveryPlan {
247-
if err = stream.Send(s); err != nil {
248-
log.Error("send recover region failed", zap.Error(err))
249-
return errors.Trace(err)
250-
}
251-
}
252-
253-
reply, err := stream.CloseAndRecv()
254-
if err != nil {
255-
log.Error("close the stream failed")
256-
return errors.Trace(err)
257-
}
258-
recovery.progress.Inc()
259-
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
260-
return nil
270+
return recovery.RecoverRegionOfStore(ectx, storeId, plan)
261271
})
262272
}
263273
// Wait for all TiKV instances force leader and wait apply to last log.
264274
return eg.Wait()
265275
}
266276

277+
func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {
278+
rebootStores := map[uint64]struct{}{}
279+
cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) {
280+
log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId()))
281+
rebootStores[s.Id] = struct{}{}
282+
}), storewatch.WithOnDisconnect(func(s *metapb.Store) {
283+
log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress()))
284+
}), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) {
285+
log.Info("Start to observing the state of store.", zap.Uint64("id", s.GetId()))
286+
}))
287+
watcher := storewatch.New(recovery.mgr.PDClient(), cb)
288+
tick := time.NewTicker(30 * time.Second)
289+
mainLoop := func() {
290+
for {
291+
select {
292+
case <-ctx.Done():
293+
return
294+
case <-tick.C:
295+
err := watcher.Step(ctx)
296+
if err != nil {
297+
log.Warn("Failed to step watcher.", logutil.ShortError(err))
298+
}
299+
for id := range rebootStores {
300+
plan, ok := recovery.RecoveryPlan[id]
301+
if !ok {
302+
log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id))
303+
continue
304+
}
305+
err := recovery.RecoverRegionOfStore(ctx, id, plan)
306+
if err != nil {
307+
log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", id), logutil.ShortError(err))
308+
continue
309+
}
310+
log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id))
311+
delete(rebootStores, id)
312+
}
313+
}
314+
}
315+
}
316+
317+
go mainLoop()
318+
}
319+
267320
// WaitApply send wait apply to all tikv ensure all region peer apply log into the last
268321
func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
269322
eg, ectx := errgroup.WithContext(ctx)

br/pkg/utils/storewatch/BUILD.bazel

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "storewatch",
5+
srcs = ["watching.go"],
6+
importpath = "github.com/pingcap/tidb/br/pkg/utils/storewatch",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//br/pkg/conn",
10+
"//br/pkg/conn/util",
11+
"@com_github_pingcap_errors//:errors",
12+
"@com_github_pingcap_kvproto//pkg/metapb",
13+
],
14+
)
15+
16+
go_test(
17+
name = "storewatch_test",
18+
timeout = "short",
19+
srcs = ["watching_test.go"],
20+
flaky = True,
21+
shard_count = 3,
22+
deps = [
23+
":storewatch",
24+
"//br/pkg/conn/util",
25+
"@com_github_pingcap_kvproto//pkg/metapb",
26+
"@com_github_stretchr_testify//require",
27+
"@com_github_tikv_pd_client//:client",
28+
],
29+
)

br/pkg/utils/storewatch/watching.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// package storewatch provides a `Watcher` type which allows
2+
// the user to listen the events of lifetime of stores.
3+
package storewatch
4+
5+
import (
6+
"context"
7+
8+
"github.com/pingcap/errors"
9+
"github.com/pingcap/kvproto/pkg/metapb"
10+
"github.com/pingcap/tidb/br/pkg/conn"
11+
"github.com/pingcap/tidb/br/pkg/conn/util"
12+
)
13+
14+
// Callback will be called the supported event triggered.
15+
type Callback interface {
16+
OnNewStoreRegistered(store *metapb.Store)
17+
OnDisconnect(store *metapb.Store)
18+
OnReboot(store *metapb.Store)
19+
}
20+
21+
// DynCallback is a function based callback set.
22+
type DynCallback struct {
23+
onNewStoreRegistered func(*metapb.Store)
24+
onDisconnect func(*metapb.Store)
25+
onReboot func(*metapb.Store)
26+
}
27+
28+
// OnNewStoreRegistered will be called once new region added to be watched.
29+
func (cb *DynCallback) OnNewStoreRegistered(store *metapb.Store) {
30+
if cb.onNewStoreRegistered != nil {
31+
cb.onNewStoreRegistered(store)
32+
}
33+
}
34+
35+
// OnDisconnect will be called once the store is disconnected.
36+
func (cb *DynCallback) OnDisconnect(store *metapb.Store) {
37+
if cb.onDisconnect != nil {
38+
cb.onDisconnect(store)
39+
}
40+
}
41+
42+
// OnReboot will be called once the store is rebooted.
43+
func (cb *DynCallback) OnReboot(store *metapb.Store) {
44+
if cb.onReboot != nil {
45+
cb.onReboot(store)
46+
}
47+
}
48+
49+
// DynCallbackOpt is the option for DynCallback.
50+
type DynCallbackOpt func(*DynCallback)
51+
52+
// WithOnNewStoreRegistered adds a hook to the callback.
53+
func WithOnNewStoreRegistered(f func(*metapb.Store)) DynCallbackOpt {
54+
return func(cb *DynCallback) {
55+
cb.onNewStoreRegistered = f
56+
}
57+
}
58+
59+
// WithOnDisconnect adds a hook to the callback.
60+
func WithOnDisconnect(f func(*metapb.Store)) DynCallbackOpt {
61+
return func(cb *DynCallback) {
62+
cb.onDisconnect = f
63+
}
64+
}
65+
66+
// WithOnReboot adds a hook to the callback.
67+
func WithOnReboot(f func(*metapb.Store)) DynCallbackOpt {
68+
return func(cb *DynCallback) {
69+
cb.onReboot = f
70+
}
71+
}
72+
73+
// MakeCallback creates a callback with the given options.
74+
// Allowed options: WithOnNewStoreRegistered, WithOnDisconnect, WithOnReboot.
75+
func MakeCallback(opts ...DynCallbackOpt) Callback {
76+
cb := &DynCallback{}
77+
for _, opt := range opts {
78+
opt(cb)
79+
}
80+
return cb
81+
}
82+
83+
// Watcher watches the lifetime of stores.
84+
// generally it should be advanced by calling the `Step` call.
85+
type Watcher struct {
86+
cli util.StoreMeta
87+
cb Callback
88+
89+
lastStores map[uint64]*metapb.Store
90+
}
91+
92+
func New(cli util.StoreMeta, cb Callback) *Watcher {
93+
return &Watcher{
94+
cli: cli,
95+
cb: cb,
96+
lastStores: make(map[uint64]*metapb.Store),
97+
}
98+
}
99+
100+
func (w *Watcher) Step(ctx context.Context) error {
101+
liveStores, err := conn.GetAllTiKVStoresWithRetry(ctx, w.cli, util.SkipTiFlash)
102+
if err != nil {
103+
return errors.Annotate(err, "failed to update store list")
104+
}
105+
recorded := map[uint64]struct{}{}
106+
for _, store := range liveStores {
107+
w.updateStore(store)
108+
recorded[store.GetId()] = struct{}{}
109+
}
110+
w.retain(recorded)
111+
return nil
112+
}
113+
114+
// updateStore updates the current store. and call the hooks needed.
115+
func (w *Watcher) updateStore(newStore *metapb.Store) {
116+
lastStore, ok := w.lastStores[newStore.GetId()]
117+
w.lastStores[newStore.GetId()] = newStore
118+
if !ok {
119+
w.cb.OnNewStoreRegistered(newStore)
120+
return
121+
}
122+
if lastStore.GetState() == metapb.StoreState_Up && newStore.GetState() == metapb.StoreState_Offline {
123+
w.cb.OnDisconnect(newStore)
124+
}
125+
if lastStore.StartTimestamp != newStore.StartTimestamp {
126+
w.cb.OnReboot(newStore)
127+
}
128+
}
129+
130+
func (w *Watcher) retain(storeSet map[uint64]struct{}) {
131+
for id := range w.lastStores {
132+
if _, ok := storeSet[id]; !ok {
133+
delete(w.lastStores, id)
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)