Skip to content

Commit 2ffc0f3

Browse files
authored
puller: fix retry logic when check store version failed (#11903) (#11930)
close #11766
1 parent 5969c65 commit 2ffc0f3

File tree

5 files changed

+161
-18
lines changed

5 files changed

+161
-18
lines changed

cdc/kv/client.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ var (
104104
metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest")
105105
metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown")
106106
metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable")
107+
metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr")
107108
metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore")
108109
metricConnectToStoreErr = eventFeedErrorCounter.WithLabelValues("ConnectToStore")
109110
)
@@ -660,8 +661,17 @@ func (s *eventFeedSession) requestRegionToStore(
660661
time.Sleep(delay)
661662
}
662663
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
663-
s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err)
664-
errInfo := newRegionErrorInfo(sri, &connectToStoreErr{})
664+
var regionErr error
665+
var scheduleReload bool
666+
if cerror.Is(err, cerror.ErrGetAllStoresFailed) {
667+
regionErr = &getStoreErr{}
668+
scheduleReload = true
669+
} else {
670+
regionErr = &connectToStoreErr{}
671+
scheduleReload = regionScheduleReload
672+
}
673+
s.client.regionCache.OnSendFail(bo, rpcCtx, scheduleReload, err)
674+
errInfo := newRegionErrorInfo(sri, regionErr)
665675
s.onRegionFail(ctx, errInfo)
666676
continue
667677
}
@@ -1492,3 +1502,7 @@ func (e *connectToStoreErr) Error() string { return "connect to store error" }
14921502
type sendRequestToStoreErr struct{}
14931503

14941504
func (e *sendRequestToStoreErr) Error() string { return "send request to store error" }
1505+
1506+
type getStoreErr struct{}
1507+
1508+
func (e *getStoreErr) Error() string { return "get store error" }

cdc/kv/shared_client.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,9 @@ func (s *SharedClient) divideAndScheduleRegions(
464464
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
465465
regions, err := s.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.StartKey, nextSpan.EndKey, limit)
466466
if err != nil {
467+
if errors.Cause(err) == context.Canceled {
468+
return nil
469+
}
467470
log.Warn("event feed load regions failed",
468471
zap.String("namespace", s.changefeed.Namespace),
469472
zap.String("changefeed", s.changefeed.ID),
@@ -624,6 +627,13 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo)
624627
metricFeedRPCCtxUnavailable.Inc()
625628
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable)
626629
return nil
630+
case *getStoreErr:
631+
metricGetStoreErr.Inc()
632+
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
633+
// cannot get the store the region belongs to, so we need to reload the region.
634+
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err)
635+
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.requestedTable)
636+
return nil
627637
case *sendRequestToStoreErr:
628638
metricStoreSendRequestErr.Inc()
629639
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
@@ -658,6 +668,10 @@ func (s *SharedClient) resolveLock(ctx context.Context) error {
658668
}
659669

660670
doResolve := func(regionID uint64, state *regionlock.LockedRange, maxVersion uint64) {
671+
if state == nil {
672+
log.Warn("found nil state in resolve lock", zap.Uint64("regionID", regionID))
673+
return
674+
}
661675
if state.ResolvedTs.Load() > maxVersion || !state.Initialzied.Load() {
662676
return
663677
}

cdc/kv/shared_client_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020
"time"
2121

22+
"github.com/pingcap/failpoint"
2223
"github.com/pingcap/kvproto/pkg/cdcpb"
2324
"github.com/pingcap/tidb/pkg/store/mockstore/mockcopr"
2425
"github.com/pingcap/tiflow/cdc/kv/regionlock"
@@ -212,6 +213,105 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) {
212213
}
213214
}
214215

216+
func TestGetStoreFailed(t *testing.T) {
217+
ctx, cancel := context.WithCancel(context.Background())
218+
wg := &sync.WaitGroup{}
219+
220+
events1 := make(chan *cdcpb.ChangeDataEvent, 10)
221+
srv1 := newMockChangeDataServer(events1)
222+
server1, addr1 := newMockService(ctx, t, srv1, wg)
223+
224+
rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
225+
226+
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
227+
228+
grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil)
229+
230+
regionCache := tikv.NewRegionCache(pdClient)
231+
232+
pdClock := pdutil.NewClock4Test()
233+
234+
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
235+
require.Nil(t, err)
236+
lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{})
237+
238+
invalidStore1 := "localhost:1"
239+
invalidStore2 := "localhost:2"
240+
cluster.AddStore(1, addr1)
241+
cluster.AddStore(2, invalidStore1)
242+
cluster.AddStore(3, invalidStore2)
243+
cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4)
244+
245+
client := NewSharedClient(
246+
model.ChangeFeedID{ID: "test"},
247+
&config.ServerConfig{
248+
KVClient: &config.KVClientConfig{
249+
WorkerConcurrent: 1,
250+
GrpcStreamConcurrent: 1,
251+
AdvanceIntervalInMs: 10,
252+
},
253+
Debug: &config.DebugConfig{Puller: &config.PullerConfig{}},
254+
},
255+
false, pdClient, grpcPool, regionCache, pdClock, lockResolver,
256+
)
257+
258+
defer func() {
259+
cancel()
260+
client.Close()
261+
_ = kvStorage.Close()
262+
regionCache.Close()
263+
pdClient.Close()
264+
srv1.wg.Wait()
265+
server1.Stop()
266+
wg.Wait()
267+
}()
268+
269+
wg.Add(1)
270+
go func() {
271+
defer wg.Done()
272+
err := client.Run(ctx)
273+
require.Equal(t, context.Canceled, errors.Cause(err))
274+
}()
275+
276+
failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`)
277+
subID := client.AllocSubscriptionID()
278+
span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")}
279+
eventCh := make(chan MultiplexingEvent, 50)
280+
client.Subscribe(subID, span, 1, eventCh)
281+
282+
makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent {
283+
return &cdcpb.ChangeDataEvent{
284+
Events: []*cdcpb.Event{
285+
{
286+
RegionId: regionID,
287+
RequestId: requestID,
288+
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts},
289+
},
290+
},
291+
}
292+
}
293+
294+
checkTsEvent := func(event model.RegionFeedEvent, ts uint64) {
295+
require.Equal(t, ts, event.Resolved.ResolvedTs)
296+
}
297+
298+
events1 <- mockInitializedEvent(11, uint64(subID))
299+
ts := oracle.GoTimeToTS(pdClock.CurrentTime())
300+
events1 <- makeTsEvent(11, ts, uint64(subID))
301+
select {
302+
case <-eventCh:
303+
require.True(t, false, "should not get event when get store failed")
304+
case <-time.After(5 * time.Second):
305+
}
306+
failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed")
307+
select {
308+
case event := <-eventCh:
309+
checkTsEvent(event.RegionFeedEvent, ts)
310+
case <-time.After(5 * time.Second):
311+
require.True(t, false, "reconnection not succeed in 5 second")
312+
}
313+
}
314+
215315
type mockChangeDataServer struct {
216316
ch chan *cdcpb.ChangeDataEvent
217317
wg sync.WaitGroup

cdc/kv/shared_stream.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pingcap/log"
2424
"github.com/pingcap/tiflow/cdc/kv/sharedconn"
2525
"github.com/pingcap/tiflow/pkg/chann"
26+
cerrors "github.com/pingcap/tiflow/pkg/errors"
2627
"github.com/pingcap/tiflow/pkg/util"
2728
"github.com/pingcap/tiflow/pkg/version"
2829
"go.uber.org/zap"
@@ -87,12 +88,31 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
8788
if err := waitForPreFetching(); err != nil {
8889
return err
8990
}
90-
if canceled := stream.run(ctx, c, r); canceled {
91-
return nil
91+
var regionErr error
92+
if err := version.CheckStoreVersion(ctx, c.pd, r.storeID); err != nil {
93+
log.Info("event feed check store version fails",
94+
zap.String("namespace", c.changefeed.Namespace),
95+
zap.String("changefeed", c.changefeed.ID),
96+
zap.Uint64("streamID", stream.streamID),
97+
zap.Uint64("storeID", r.storeID),
98+
zap.String("addr", r.storeAddr),
99+
zap.Error(err))
100+
if errors.Cause(err) == context.Canceled {
101+
return nil
102+
} else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) {
103+
regionErr = &getStoreErr{}
104+
} else {
105+
regionErr = &sendRequestToStoreErr{}
106+
}
107+
} else {
108+
if canceled := stream.run(ctx, c, r); canceled {
109+
return nil
110+
}
111+
regionErr = &sendRequestToStoreErr{}
92112
}
93113
for _, m := range stream.clearStates() {
94114
for _, state := range m {
95-
state.markStopped(&sendRequestToStoreErr{})
115+
state.markStopped(regionErr)
96116
sfEvent := newEventItem(nil, state, stream)
97117
slot := hashRegionID(state.sri.verID.GetID(), len(c.workers))
98118
_ = c.workers[slot].sendEvent(ctx, sfEvent)
@@ -105,7 +125,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
105125
// It means it's a special task for stopping the table.
106126
continue
107127
}
108-
c.onRegionFail(newRegionErrorInfo(sri, &sendRequestToStoreErr{}))
128+
c.onRegionFail(newRegionErrorInfo(sri, regionErr))
109129
}
110130
if err := util.Hang(ctx, time.Second); err != nil {
111131
return err
@@ -132,17 +152,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
132152
}
133153
}
134154

135-
if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil {
136-
log.Info("event feed check store version fails",
137-
zap.String("namespace", c.changefeed.Namespace),
138-
zap.String("changefeed", c.changefeed.ID),
139-
zap.Uint64("streamID", s.streamID),
140-
zap.Uint64("storeID", rs.storeID),
141-
zap.String("addr", rs.storeAddr),
142-
zap.Error(err))
143-
return isCanceled()
144-
}
145-
146155
log.Info("event feed going to create grpc stream",
147156
zap.String("namespace", c.changefeed.Namespace),
148157
zap.String("changefeed", c.changefeed.ID),
@@ -339,7 +348,9 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request
339348
if s.multiplexing != nil {
340349
req := &cdcpb.ChangeDataRequest{
341350
RequestId: uint64(subscriptionID),
342-
Request: &cdcpb.ChangeDataRequest_Deregister_{},
351+
Request: &cdcpb.ChangeDataRequest_Deregister_{
352+
Deregister: &cdcpb.ChangeDataRequest_Deregister{},
353+
},
343354
}
344355
if err = doSend(s.multiplexing, req, subscriptionID); err != nil {
345356
return err

pkg/version/check.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/coreos/go-semver/semver"
2626
"github.com/pingcap/errors"
27+
"github.com/pingcap/failpoint"
2728
"github.com/pingcap/kvproto/pkg/metapb"
2829
"github.com/pingcap/log"
2930
"github.com/pingcap/tidb/pkg/util/engine"
@@ -196,6 +197,9 @@ func checkPDVersion(ctx context.Context, pdAddr string, credential *security.Cre
196197
// CheckStoreVersion checks whether the given TiKV is compatible with this CDC.
197198
// If storeID is 0, it checks all TiKV.
198199
func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error {
200+
failpoint.Inject("GetStoreFailed", func() {
201+
failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID)))
202+
})
199203
var stores []*metapb.Store
200204
var err error
201205
if storeID == 0 {

0 commit comments

Comments
 (0)