Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var (
metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest")
metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown")
metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable")
metricGetStoreErr = eventFeedErrorCounter.WithLabelValues("GetStoreErr")
metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore")
metricKvIsBusyCounter = eventFeedErrorCounter.WithLabelValues("KvIsBusy")
metricKvCongestedCounter = eventFeedErrorCounter.WithLabelValues("KvCongested")
Expand All @@ -108,6 +109,10 @@ func (e *rpcCtxUnavailableErr) Error() string {
e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer())
}

type getStoreErr struct{}

func (e *getStoreErr) Error() string { return "get store error" }

type sendRequestToStoreErr struct{}

func (e *sendRequestToStoreErr) Error() string { return "send request to store error" }
Expand Down Expand Up @@ -739,6 +744,13 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf
metricFeedRPCCtxUnavailable.Inc()
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable)
return nil
case *getStoreErr:
metricGetStoreErr.Inc()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
// cannot get the store the region belongs to, so we need to reload the region.
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err)
s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedTable)
return nil
case *sendRequestToStoreErr:
metricStoreSendRequestErr.Inc()
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
Expand Down
100 changes: 100 additions & 0 deletions cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/store/mockstore/mockcopr"
Expand Down Expand Up @@ -261,6 +262,105 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) {
}
}

func TestGetStoreFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

events1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataServer(events1)
server1, addr1 := newMockService(ctx, t, srv1, wg)

rpcClient, cluster, pdClient, _ := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())

pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}

grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil)

regionCache := tikv.NewRegionCache(pdClient)

pdClock := pdutil.NewClock4Test()

kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
require.Nil(t, err)
lockResolver := txnutil.NewLockerResolver(kvStorage, model.ChangeFeedID{})

invalidStore1 := "localhost:1"
invalidStore2 := "localhost:2"
cluster.AddStore(1, addr1)
cluster.AddStore(2, invalidStore1)
cluster.AddStore(3, invalidStore2)
cluster.Bootstrap(11, []uint64{1, 2, 3}, []uint64{4, 5, 6}, 4)

client := NewSharedClient(
model.ChangeFeedID{ID: "test"},
&config.ServerConfig{
KVClient: &config.KVClientConfig{
WorkerConcurrent: 1,
GrpcStreamConcurrent: 1,
AdvanceIntervalInMs: 10,
},
Debug: &config.DebugConfig{Puller: &config.PullerConfig{LogRegionDetails: false}},
},
false, pdClient, grpcPool, regionCache, pdClock, lockResolver,
)

defer func() {
cancel()
client.Close()
_ = kvStorage.Close()
regionCache.Close()
pdClient.Close()
srv1.wg.Wait()
server1.Stop()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
err := client.Run(ctx)
require.Equal(t, context.Canceled, errors.Cause(err))
}()

failpoint.Enable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed", `return(true)`)
subID := client.AllocSubscriptionID()
span := tablepb.Span{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")}
eventCh := make(chan MultiplexingEvent, 50)
client.Subscribe(subID, span, 1, eventCh)

makeTsEvent := func(regionID, ts, requestID uint64) *cdcpb.ChangeDataEvent {
return &cdcpb.ChangeDataEvent{
Events: []*cdcpb.Event{
{
RegionId: regionID,
RequestId: requestID,
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: ts},
},
},
}
}

checkTsEvent := func(event model.RegionFeedEvent, ts uint64) {
require.Equal(t, ts, event.Resolved.ResolvedTs)
}

events1 <- mockInitializedEvent(11, uint64(subID))
ts := oracle.GoTimeToTS(pdClock.CurrentTime())
events1 <- makeTsEvent(11, ts, uint64(subID))
select {
case <-eventCh:
require.True(t, false, "should not get event when get store failed")
case <-time.After(5 * time.Second):
}
failpoint.Disable("github.com/pingcap/tiflow/pkg/version/GetStoreFailed")
select {
case event := <-eventCh:
checkTsEvent(event.RegionFeedEvent, ts)
case <-time.After(5 * time.Second):
require.True(t, false, "reconnection not succeed in 5 second")
}
}

type mockChangeDataServer struct {
ch chan *cdcpb.ChangeDataEvent
wg sync.WaitGroup
Expand Down
46 changes: 22 additions & 24 deletions cdc/kv/shared_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/kv/sharedconn"
"github.com/pingcap/tiflow/pkg/chann"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,12 +91,26 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
if err := waitForPreFetching(); err != nil {
return err
}
if canceled := stream.run(ctx, c, r); canceled {
err := stream.run(ctx, c, r)
log.Info("event feed grpc stream exits",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", stream.streamID),
zap.Uint64("storeID", r.storeID),
zap.String("addr", r.storeAddr),
zap.Error(err))
var regionErr error
if err == nil || errors.Cause(err) == context.Canceled {
return nil
} else if cerrors.Is(err, cerrors.ErrGetAllStoresFailed) {
regionErr = &getStoreErr{}
} else {
regionErr = &sendRequestToStoreErr{}
}

for _, m := range stream.clearStates() {
for _, state := range m {
state.markStopped(&sendRequestToStoreErr{})
state.markStopped(regionErr)
sfEvent := newEventItem(nil, state, stream)
slot := hashRegionID(state.region.verID.GetID(), len(c.workers))
_ = c.workers[slot].sendEvent(ctx, sfEvent)
Expand All @@ -108,7 +123,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque
// It means it's a special task for stopping the table.
continue
}
c.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{}))
c.onRegionFail(newRegionErrorInfo(region, regionErr))
}
if err := util.Hang(ctx, time.Second); err != nil {
return err
Expand All @@ -125,16 +140,7 @@ func newRequestedStream(streamID uint64) *requestedStream {
return stream
}

func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) (canceled bool) {
isCanceled := func() bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requestedStore) error {
if err := version.CheckStoreVersion(ctx, c.pd, rs.storeID); err != nil {
log.Info("event feed check store version fails",
zap.String("namespace", c.changefeed.Namespace),
Expand All @@ -143,7 +149,7 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
return isCanceled()
return err
}

log.Info("event feed going to create grpc stream",
Expand All @@ -154,13 +160,6 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
zap.String("addr", rs.storeAddr))

defer func() {
log.Info("event feed grpc stream exits",
zap.String("namespace", c.changefeed.Namespace),
zap.String("changefeed", c.changefeed.ID),
zap.Uint64("streamID", s.streamID),
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Bool("canceled", canceled))
if s.multiplexing != nil {
s.multiplexing = nil
} else if s.tableExclusives != nil {
Expand All @@ -181,7 +180,7 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
zap.Uint64("storeID", rs.storeID),
zap.String("addr", rs.storeAddr),
zap.Error(err))
return isCanceled()
return err
}

if cc.Multiplexing() {
Expand Down Expand Up @@ -211,8 +210,7 @@ func (s *requestedStream) run(ctx context.Context, c *SharedClient, rs *requeste
})
}
g.Go(func() error { return s.send(gctx, c, rs) })
_ = g.Wait()
return isCanceled()
return g.Wait()
}

func (s *requestedStream) receive(
Expand Down
4 changes: 4 additions & 0 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/util/engine"
Expand Down Expand Up @@ -199,6 +200,9 @@
// CheckStoreVersion checks whether the given TiKV is compatible with this CDC.
// If storeID is 0, it checks all TiKV.
func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) error {
failpoint.Inject("GetStoreFailed", func() {
failpoint.Return(cerror.WrapError(cerror.ErrGetAllStoresFailed, fmt.Errorf("unknown store %d", storeID)))
})

Check warning on line 205 in pkg/version/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/version/check.go#L204-L205

Added lines #L204 - L205 were not covered by tests
var stores []*metapb.Store
var err error
if storeID == 0 {
Expand Down
Loading