Skip to content

Commit 42532a3

Browse files
committed
Merge remote-tracking branch 'hyy/0905-2' into impl-puller-prior-queue
2 parents e9b2e2d + 346f8ab commit 42532a3

File tree

7 files changed

+101
-30
lines changed

7 files changed

+101
-30
lines changed

logservice/logpuller/region_event_handler.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,21 @@ func (h *regionEventHandler) GetArea(path SubscriptionID, dest *subscribedSpan)
128128
}
129129

130130
func (h *regionEventHandler) GetTimestamp(event regionEvent) dynstream.Timestamp {
131-
if event.entries != nil && event.entries.Entries != nil && len(event.entries.Entries.GetEntries()) > 0 {
132-
entries := event.entries.Entries.GetEntries()
133-
switch entries[0].Type {
134-
case cdcpb.Event_INITIALIZED:
135-
return dynstream.Timestamp(event.state.region.resolvedTs())
136-
case cdcpb.Event_COMMITTED,
137-
cdcpb.Event_PREWRITE,
138-
cdcpb.Event_COMMIT,
139-
cdcpb.Event_ROLLBACK:
140-
return dynstream.Timestamp(entries[0].CommitTs)
141-
default:
142-
log.Warn("unknown event entries", zap.Any("event", event.entries))
143-
return 0
131+
if event.entries != nil && event.entries.Entries != nil {
132+
for _, entry := range event.entries.Entries.GetEntries() {
133+
switch entry.Type {
134+
case cdcpb.Event_INITIALIZED:
135+
return dynstream.Timestamp(event.state.region.resolvedTs())
136+
case cdcpb.Event_COMMITTED,
137+
cdcpb.Event_PREWRITE,
138+
cdcpb.Event_COMMIT,
139+
cdcpb.Event_ROLLBACK:
140+
return dynstream.Timestamp(entry.CommitTs)
141+
default:
142+
// ignore other event types
143+
}
144144
}
145+
return 0
145146
} else {
146147
return dynstream.Timestamp(event.resolvedTs)
147148
}

logservice/logpuller/subscription_client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,7 @@ func (s *subscriptionClient) handleErrors(ctx context.Context) error {
749749
for {
750750
select {
751751
case <-ctx.Done():
752+
log.Info("subscription client handle errors exit")
752753
return ctx.Err()
753754
case errInfo := <-s.errCache.errCh:
754755
if err := s.doHandleError(ctx, errInfo); err != nil {
@@ -1086,7 +1087,11 @@ func (e *errCache) dispatch(ctx context.Context) error {
10861087
errInfo := e.cache[0]
10871088
e.cache = e.cache[1:]
10881089
e.Unlock()
1089-
e.errCh <- errInfo
1090+
select {
1091+
case <-ctx.Done():
1092+
log.Info("subscription client dispatch err cache done")
1093+
case e.errCh <- errInfo:
1094+
}
10901095
}
10911096
for {
10921097
select {

logservice/logpuller/subscription_client_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,60 @@ func TestSubscriptionWithFailedTiKV(t *testing.T) {
204204
require.True(t, false, "reconnection not succeed in 5 second")
205205
}
206206
}
207+
208+
// TestErrCacheDispatchWithFullChannelAndCanceledContext tests that when errCh is full
209+
// and context is canceled, the dispatch method doesn't get stuck.
210+
func TestErrCacheDispatchWithFullChannelAndCanceledContext(t *testing.T) {
211+
// Create errCache with a small errCh to easily fill it up
212+
errCache := &errCache{
213+
cache: make([]regionErrorInfo, 0, 10),
214+
errCh: make(chan regionErrorInfo, 2), // Small buffer to easily fill
215+
notify: make(chan struct{}, 10),
216+
}
217+
218+
// Create a mock regionErrorInfo
219+
mockErrInfo := regionErrorInfo{
220+
regionInfo: regionInfo{
221+
verID: tikv.NewRegionVerID(1, 1, 1),
222+
span: heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")},
223+
},
224+
err: errors.New("test error"),
225+
}
226+
227+
// Fill up the errCh channel to make it full
228+
errCache.errCh <- mockErrInfo
229+
errCache.errCh <- mockErrInfo
230+
231+
// Add some errors to the cache
232+
for i := 0; i < 5; i++ {
233+
errCache.add(mockErrInfo)
234+
}
235+
236+
// Create a context that will be canceled
237+
ctx, cancel := context.WithCancel(context.Background())
238+
239+
// Channel to signal when dispatch returns
240+
dispatchDone := make(chan error, 1)
241+
242+
// Start dispatch in a goroutine
243+
go func() {
244+
err := errCache.dispatch(ctx)
245+
dispatchDone <- err
246+
}()
247+
248+
// Give dispatch some time to start and potentially get stuck
249+
time.Sleep(50 * time.Millisecond)
250+
251+
// Cancel the context
252+
cancel()
253+
254+
// Wait for dispatch to return with a timeout
255+
select {
256+
case err := <-dispatchDone:
257+
// Verify that dispatch returned with context.Canceled error
258+
require.Equal(t, context.Canceled, err)
259+
case <-time.After(5 * time.Second):
260+
// If we timeout here, it means dispatch is stuck
261+
t.Fatal("dispatch method is stuck and didn't return after context cancellation")
262+
}
263+
}

maintainer/operator/operator_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ func (oc *Controller) GetMinCheckpointTs() uint64 {
203203
if op.BlockTsForward() {
204204
spanReplication := oc.spanController.GetTaskByID(op.ID())
205205
if spanReplication == nil {
206-
// for test
207-
log.Panic("span replication is nil", zap.String("operator", op.String()))
206+
log.Info("span replication is nil", zap.String("operator", op.String()))
207+
continue
208208
}
209209
if spanReplication.GetStatus().CheckpointTs < minCheckpointTs {
210210
minCheckpointTs = spanReplication.GetStatus().CheckpointTs

maintainer/replica/default_span_split_checker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (s *defaultSpanSplitChecker) UpdateStatus(replica *SpanReplication) {
128128

129129
// check traffic first
130130
if status.GetStatus().EventSizePerSecond != 0 {
131-
if status.GetStatus().EventSizePerSecond < float32(s.writeThreshold) {
131+
if s.writeThreshold == 0 || status.GetStatus().EventSizePerSecond < float32(s.writeThreshold) {
132132
status.trafficScore = 0
133133
} else {
134134
status.trafficScore++
@@ -147,7 +147,7 @@ func (s *defaultSpanSplitChecker) UpdateStatus(replica *SpanReplication) {
147147
status.regionCheckTime = time.Now()
148148
}
149149

150-
log.Info("default span split checker: update status", zap.String("changefeed", s.changefeedID.Name()), zap.String("replica", replica.ID.String()), zap.Int("trafficScore", status.trafficScore), zap.Int("regionCount", status.regionCount))
150+
log.Debug("default span split checker: update status", zap.String("changefeed", s.changefeedID.Name()), zap.String("replica", replica.ID.String()), zap.Int("trafficScore", status.trafficScore), zap.Int("regionCount", status.regionCount))
151151

152152
if status.trafficScore >= trafficScoreThreshold || (s.regionThreshold > 0 && status.regionCount >= s.regionThreshold) {
153153
if _, ok := s.splitReadyTasks[status.ID]; !ok {

server/module_http.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import (
2222
"github.com/gin-gonic/gin"
2323
"github.com/pingcap/log"
2424
"github.com/pingcap/ticdc/api"
25-
"github.com/pingcap/ticdc/pkg/common"
26-
"github.com/pingcap/ticdc/pkg/errors"
2725
"github.com/pingcap/ticdc/pkg/logger"
2826
"go.uber.org/zap"
2927
"golang.org/x/net/netutil"
@@ -43,7 +41,7 @@ type HttpServer struct {
4341

4442
// NewHttpServer create the HTTP server.
4543
// `lis` is a listener that gives us plain-text HTTP requests.
46-
func NewHttpServer(c *server, lis net.Listener) common.SubModule {
44+
func NewHttpServer(c *server, lis net.Listener) *HttpServer {
4745
// LimitListener returns a Listener that accepts at most n simultaneous
4846
// connections from the provided Listener. Connections that exceed the
4947
// limit will wait in a queue and no new goroutines will be created until
@@ -72,17 +70,24 @@ func NewHttpServer(c *server, lis net.Listener) common.SubModule {
7270
}
7371
}
7472

75-
func (s *HttpServer) Run(ctx context.Context) error {
73+
func (s *HttpServer) Run(ctx context.Context) {
7674
log.Info("http server is running", zap.String("addr", s.listener.Addr().String()))
77-
err := s.server.Serve(s.listener)
78-
if err != nil {
79-
log.Error("http server error", zap.Error(err))
80-
}
81-
return errors.WrapError(errors.ErrServeHTTP, err)
75+
go func() {
76+
err := s.server.Serve(s.listener)
77+
if err != nil {
78+
log.Error("http server error", zap.Error(err))
79+
}
80+
}()
8281
}
8382

84-
func (s *HttpServer) Close(ctx context.Context) error {
85-
return s.server.Shutdown(ctx)
83+
func (s *HttpServer) Close() {
84+
log.Info("http server is closing")
85+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
86+
defer cancel()
87+
err := s.server.Shutdown(ctx)
88+
if err != nil {
89+
log.Warn("close http server failed", zap.Error(err))
90+
}
8691
}
8792

8893
func (s *HttpServer) Name() string {

server/server.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ func (c *server) initialize(ctx context.Context) error {
173173
c.subCommonModules = []common.SubModule{
174174
nodeManager,
175175
NewElector(c),
176-
NewHttpServer(c, c.tcpServer.HTTP1Listener()),
177176
NewGrpcServer(c.tcpServer.GrpcListener()),
178177
}
179178

@@ -232,6 +231,10 @@ func (c *server) setPreServices(ctx context.Context) error {
232231
appctx.SetService(appctx.DispatcherOrchestrator, dispatcherOrchestrator)
233232
c.preServices = append(c.preServices, dispatcherOrchestrator)
234233

234+
httpServer := NewHttpServer(c, c.tcpServer.HTTP1Listener())
235+
httpServer.Run(ctx)
236+
c.preServices = append(c.preServices, httpServer)
237+
235238
log.Info("pre services all set", zap.Any("preServicesNum", len(c.preServices)))
236239
return nil
237240
}

0 commit comments

Comments
 (0)