Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 35 additions & 31 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,33 @@ type Controller struct {
taskHandlers []*threadpool.TaskHandle
messageCenter messaging.MessageCenter

changefeedProgressReportCh chan map[common.ChangeFeedID]*changefeed.Changefeed
changefeedStateChangedCh chan *ChangefeedStateChangeEvent
changefeedProgressReportCh chan []*ChangefeedStateChangeEvent

lastPrintStatusTime time.Time

apiLock sync.RWMutex
}

type ChangeType int

const (
ChangeStateAndTs = iota
ChangeTs
ChangeState
)

type ChangefeedStateChangeEvent struct {
ChangefeedID common.ChangeFeedID
changefeedID common.ChangeFeedID
changefeed *changefeed.Changefeed
State model.FeedState
err *model.RunningError
changeType ChangeType
}

func NewController(
version int64,
selfNode *node.Info,
updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed,
stateChangedCh chan *ChangefeedStateChangeEvent,
changefeedChangeEventCh chan []*ChangefeedStateChangeEvent,
backend changefeed.Backend,
eventCh *chann.DrainableChann[*Event],
taskScheduler threadpool.ThreadPool,
Expand Down Expand Up @@ -135,8 +143,7 @@ func NewController(
nodeManager: nodeManager,
taskScheduler: taskScheduler,
backend: backend,
changefeedProgressReportCh: updatedChangefeedCh,
changefeedStateChangedCh: stateChangedCh,
changefeedProgressReportCh: changefeedChangeEventCh,
lastPrintStatusTime: time.Now(),
pdClient: pdClient,
}
Expand Down Expand Up @@ -319,19 +326,18 @@ func (c *Controller) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.Coordin

// handleMaintainerStatus handle the status report from the maintainers
func (c *Controller) handleMaintainerStatus(from node.ID, statusList []*heartbeatpb.MaintainerStatus) {
changedCfs := make(map[common.ChangeFeedID]*changefeed.Changefeed, len(statusList))

events := make([]*ChangefeedStateChangeEvent, 0, len(statusList))
for _, status := range statusList {
cfID := common.NewChangefeedIDFromPB(status.ChangefeedID)
cf := c.handleSingleMaintainerStatus(from, status, cfID)
if cf != nil {
changedCfs[cfID] = cf
event := c.handleSingleMaintainerStatus(from, status, cfID)
if event != nil {
events = append(events, event)
}
}

// Try to send updated changefeeds without blocking
select {
case c.changefeedProgressReportCh <- changedCfs:
case c.changefeedProgressReportCh <- events:
default:
}
}
Expand All @@ -340,7 +346,7 @@ func (c *Controller) handleSingleMaintainerStatus(
from node.ID,
status *heartbeatpb.MaintainerStatus,
cfID common.ChangeFeedID,
) *changefeed.Changefeed {
) *ChangefeedStateChangeEvent {
// Update the operator status first
c.operatorController.UpdateOperatorStatus(cfID, from, status)

Expand All @@ -354,8 +360,8 @@ func (c *Controller) handleSingleMaintainerStatus(
return nil
}

c.updateChangefeedStatus(cf, cfID, status)
return cf
event := c.updateChangefeedStatus(cf, cfID, status)
return event
}

func (c *Controller) handleNonExistentChangefeed(
Expand Down Expand Up @@ -403,31 +409,29 @@ func (c *Controller) updateChangefeedStatus(
cf *changefeed.Changefeed,
cfID common.ChangeFeedID,
status *heartbeatpb.MaintainerStatus,
) {
) *ChangefeedStateChangeEvent {
changed, state, err := cf.UpdateStatus(status)
event := new(ChangefeedStateChangeEvent)
event.changefeedID = cfID
event.State = state
event.changefeed = cf
if !changed {
return
event.changeType = ChangeTs
return event
}

log.Info("changefeed status changed",
zap.Stringer("changefeed", cfID),
zap.String("state", string(state)),
zap.Stringer("error", err))

var mErr *model.RunningError
if err != nil {
mErr = &model.RunningError{
event.err = &model.RunningError{
Time: time.Now(),
Addr: err.Node,
Code: err.Code,
Message: err.Message,
}
}
c.changefeedStateChangedCh <- &ChangefeedStateChangeEvent{
ChangefeedID: cfID,
State: state,
err: mErr,
}
log.Info("changefeed status changed",
zap.Stringer("changefeed", cfID),
zap.String("state", string(event.State)),
zap.Stringer("error", err))
return event
}

// FinishBootstrap is called when all nodes have sent bootstrap response
Expand Down
96 changes: 50 additions & 46 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,8 @@ type coordinator struct {
// eventCh is used to receive the event from message center, basically these messages
// are from maintainer.
eventCh *chann.DrainableChann[*Event]
// changefeedProgressReportCh is used to receive the changefeed progress report from the controller
changefeedProgressReportCh chan map[common.ChangeFeedID]*changefeed.Changefeed
// changefeedStateChangedCh is used to receive the changefeed state changed event from the controller
changefeedStateChangedCh chan *ChangefeedStateChangeEvent
// changefeedChangeEventCh is used to receive the changefeed progress report from the controller
changefeedChangeEventCh chan []*ChangefeedStateChangeEvent

cancel func()
closed atomic.Bool
Expand All @@ -109,18 +107,17 @@ func New(node *node.Info,
) server.Coordinator {
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
c := &coordinator{
version: version,
nodeInfo: node,
gcServiceID: gcServiceID,
lastTickTime: time.Now(),
gcManager: gc.NewManager(gcServiceID, pdClient, pdClock),
eventCh: chann.NewAutoDrainChann[*Event](),
pdClient: pdClient,
pdClock: pdClock,
mc: mc,
changefeedProgressReportCh: make(chan map[common.ChangeFeedID]*changefeed.Changefeed, 1024),
changefeedStateChangedCh: make(chan *ChangefeedStateChangeEvent, 1024),
backend: backend,
version: version,
nodeInfo: node,
gcServiceID: gcServiceID,
lastTickTime: time.Now(),
gcManager: gc.NewManager(gcServiceID, pdClient, pdClock),
eventCh: chann.NewAutoDrainChann[*Event](),
pdClient: pdClient,
pdClock: pdClock,
mc: mc,
changefeedChangeEventCh: make(chan []*ChangefeedStateChangeEvent, 1024),
backend: backend,
}
// handle messages from message center
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages)
Expand All @@ -131,8 +128,7 @@ func New(node *node.Info,
controller := NewController(
c.version,
c.nodeInfo,
c.changefeedProgressReportCh,
c.changefeedStateChangedCh,
c.changefeedChangeEventCh,
c.backend,
c.eventCh,
c.taskScheduler,
Expand Down Expand Up @@ -210,13 +206,16 @@ func (c *coordinator) run(ctx context.Context) error {
now := time.Now()
metrics.CoordinatorCounter.Add(float64(now.Sub(c.lastTickTime)) / float64(time.Second))
c.lastTickTime = now
case cfs := <-c.changefeedProgressReportCh:
if err := c.saveCheckpointTs(ctx, cfs); err != nil {
case events := <-c.changefeedChangeEventCh:
if err := c.saveCheckpointTs(ctx, events); err != nil {
return errors.Trace(err)
}
case event := <-c.changefeedStateChangedCh:
if err := c.handleStateChangedEvent(ctx, event); err != nil {
return errors.Trace(err)
for _, event := range events {
if event.changeType == ChangeState || event.changeType == ChangeStateAndTs {
if err := c.handleStateChangedEvent(ctx, event); err != nil {
return errors.Trace(err)
}
}
}
}
}
Expand All @@ -238,9 +237,9 @@ func (c *coordinator) handleStateChangedEvent(
ctx context.Context,
event *ChangefeedStateChangeEvent,
) error {
cf := c.controller.getChangefeed(event.ChangefeedID)
cf := c.controller.getChangefeed(event.changefeedID)
if cf == nil {
log.Warn("changefeed not found", zap.String("changefeed", event.ChangefeedID.String()))
log.Warn("changefeed not found", zap.String("changefeed", event.changefeedID.String()))
return nil
}
cfInfo, err := cf.GetInfo().Clone()
Expand All @@ -262,22 +261,22 @@ func (c *coordinator) handleStateChangedEvent(

switch event.State {
case model.StateWarning:
c.controller.operatorController.StopChangefeed(ctx, event.ChangefeedID, false)
c.controller.updateChangefeedEpoch(ctx, event.ChangefeedID)
c.controller.moveChangefeedToSchedulingQueue(event.ChangefeedID, false, false)
c.controller.operatorController.StopChangefeed(ctx, event.changefeedID, false)
c.controller.updateChangefeedEpoch(ctx, event.changefeedID)
c.controller.moveChangefeedToSchedulingQueue(event.changefeedID, false, false)
case model.StateFailed, model.StateFinished:
c.controller.operatorController.StopChangefeed(ctx, event.ChangefeedID, false)
c.controller.operatorController.StopChangefeed(ctx, event.changefeedID, false)
case model.StateNormal:
log.Info("changefeed is resumed or created successfully, try to delete its safeguard gc safepoint",
zap.String("changefeed", event.ChangefeedID.String()))
zap.String("changefeed", event.changefeedID.String()))
// We need to clean its gc safepoint when changefeed is resumed or created
gcServiceID := c.getEnsureGCServiceID(gc.EnsureGCServiceCreating)
err := gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID)
err := gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.changefeedID)
if err != nil {
log.Warn("failed to delete create changefeed gc safepoint", zap.Error(err))
}
gcServiceID = c.getEnsureGCServiceID(gc.EnsureGCServiceResuming)
err = gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.ChangefeedID)
err = gc.UndoEnsureChangefeedStartTsSafety(ctx, c.pdClient, gcServiceID, event.changefeedID)
if err != nil {
log.Warn("failed to delete resume changefeed gc safepoint", zap.Error(err))
}
Expand All @@ -297,31 +296,39 @@ func (c *coordinator) checkStaleCheckpointTs(ctx context.Context, id common.Chan
if !errors.IsChangefeedGCFastFailErrorCode(errCode) {
state = model.StateWarning
}
event := &ChangefeedStateChangeEvent{
changefeedID: id,
State: state,
err: &model.RunningError{
Code: string(errCode),
Message: err.Error(),
},
changeType: ChangeState,
}
select {
case <-ctx.Done():
log.Warn("Failed to send state change event to stateChangedCh since context timeout, "+
"there may be a lot of state need to be handled. Try next time",
zap.String("changefeed", id.String()),
zap.Error(ctx.Err()))
return
case c.changefeedStateChangedCh <- &ChangefeedStateChangeEvent{
ChangefeedID: id,
State: state,
err: &model.RunningError{
Code: string(errCode),
Message: err.Error(),
},
}:
case c.changefeedChangeEventCh <- []*ChangefeedStateChangeEvent{event}:
}
}
}

func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.ChangeFeedID]*changefeed.Changefeed) error {
func (c *coordinator) saveCheckpointTs(ctx context.Context, events []*ChangefeedStateChangeEvent) error {
statusMap := make(map[common.ChangeFeedID]uint64)
for _, upCf := range cfs {
cfsMap := make(map[common.ChangeFeedID]*changefeed.Changefeed)
for _, event := range events {
if event.changeType == ChangeState {
continue
}
upCf := event.changefeed
reportedCheckpointTs := upCf.GetStatus().CheckpointTs
if upCf.GetLastSavedCheckPointTs() < reportedCheckpointTs {
statusMap[upCf.ID] = reportedCheckpointTs
cfsMap[upCf.ID] = upCf
c.checkStaleCheckpointTs(ctx, upCf.ID, reportedCheckpointTs)
}
}
Expand All @@ -335,10 +342,7 @@ func (c *coordinator) saveCheckpointTs(ctx context.Context, cfs map[common.Chang
}
// update the last saved checkpoint ts and send checkpointTs to maintainer
for id, cp := range statusMap {
cf, ok := cfs[id]
if !ok {
continue
}
cf := cfsMap[id]
cf.SetLastSavedCheckPointTs(cp)
if cf.IsMQSink() {
msg := cf.NewCheckpointTsMessage(cf.GetLastSavedCheckPointTs())
Expand Down
Loading