Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
ced66dc
init
wk989898 Aug 15, 2025
36b7588
update
wk989898 Aug 15, 2025
0edf39d
.
wk989898 Aug 15, 2025
517c7e4
fix
wk989898 Aug 15, 2025
322a2b3
fix
wk989898 Aug 15, 2025
8849ed9
fix
wk989898 Aug 15, 2025
e682c73
chore
wk989898 Aug 15, 2025
65af1da
Merge branch 'master' into redo-maintainer
wk989898 Aug 15, 2025
3d812d2
fix
wk989898 Aug 17, 2025
f890c7a
Merge branch 'master' into redo-maintainer
wk989898 Aug 17, 2025
10d8d95
fix ut
wk989898 Aug 18, 2025
02afc43
fix
wk989898 Aug 18, 2025
afdd83a
chore
wk989898 Aug 18, 2025
6cd0cdf
fix
wk989898 Aug 18, 2025
428e3f8
fix UT
wk989898 Aug 18, 2025
da4fd4f
chore
wk989898 Aug 19, 2025
1e2a9ad
chore: modify redo metrics
wk989898 Aug 20, 2025
9b96520
fix dml epoch
wk989898 Aug 20, 2025
378b10b
update
wk989898 Aug 20, 2025
5c3f1ea
debug
wk989898 Aug 20, 2025
bb9d811
add more log
wk989898 Aug 20, 2025
ad3a6c0
fix data race
wk989898 Aug 21, 2025
4f25f2d
debug
wk989898 Aug 21, 2025
08dbbb3
Merge branch 'master' into redo-maintainer
wk989898 Aug 21, 2025
8157f3b
debug
wk989898 Aug 21, 2025
7cba5d3
update
wk989898 Aug 21, 2025
703b20a
Merge branch 'master' into redo-maintainer
wk989898 Aug 21, 2025
3fb5620
update
wk989898 Aug 21, 2025
5b4d60d
Merge branch 'master' into redo-maintainer
wk989898 Aug 21, 2025
0ff98fc
update
wk989898 Aug 21, 2025
3634bb5
update
wk989898 Aug 21, 2025
0844bf8
update
wk989898 Aug 22, 2025
71fca54
fix data race
wk989898 Aug 22, 2025
a7400c5
update
wk989898 Aug 22, 2025
8b37833
chore
wk989898 Aug 22, 2025
acd4715
fix data race
wk989898 Aug 22, 2025
e029beb
debug
wk989898 Aug 22, 2025
0cc6628
add more log
wk989898 Aug 25, 2025
3dbe394
debug
wk989898 Aug 25, 2025
390b0ff
update
wk989898 Aug 25, 2025
557b133
remove debug log
wk989898 Aug 25, 2025
b52a7f3
fix data race
wk989898 Aug 25, 2025
ba69942
ignore checkpointTs in redoTsMessage
wk989898 Aug 26, 2025
9c09afc
fmt
wk989898 Aug 26, 2025
5f5bb06
fix
wk989898 Aug 27, 2025
c41e8c8
update
wk989898 Aug 27, 2025
e3c35ad
update
wk989898 Aug 29, 2025
4635dbc
Merge branch 'master' into redo-maintainer
wk989898 Sep 2, 2025
6c5d2d0
update
wk989898 Sep 2, 2025
ad41527
fix ut
wk989898 Sep 2, 2025
484b8bd
update
wk989898 Sep 3, 2025
18ed58e
fix
wk989898 Sep 3, 2025
b7a1a50
debug test
wk989898 Sep 3, 2025
b443884
chore
wk989898 Sep 3, 2025
139488f
update
wk989898 Sep 3, 2025
b8e8eb3
replace isRedo with consistent
wk989898 Sep 3, 2025
9747ef3
update test
wk989898 Sep 3, 2025
fcccb2b
fix
wk989898 Sep 3, 2025
8ce497b
update
wk989898 Sep 3, 2025
1be7333
fix
wk989898 Sep 4, 2025
91170fd
Merge branch 'master' into redo-maintainer
wk989898 Sep 4, 2025
0c86503
rename consistent to dispatcherType
wk989898 Sep 4, 2025
865f569
.
wk989898 Sep 4, 2025
5143b97
Merge branch 'master' into redo-maintainer
wk989898 Sep 4, 2025
8bab5fb
.
wk989898 Sep 4, 2025
cdcb969
chore
wk989898 Sep 4, 2025
aa4331b
fix
wk989898 Sep 4, 2025
a95f31b
fix
wk989898 Sep 4, 2025
0c0356c
fix
wk989898 Sep 4, 2025
ba91a63
Merge branch 'master' into redo-maintainer
wk989898 Sep 4, 2025
d95c680
debug
wk989898 Sep 4, 2025
0f24865
refactor
wk989898 Sep 5, 2025
76355dd
Merge branch 'master' into redo-maintainer
wk989898 Sep 5, 2025
4f20f1e
update
wk989898 Sep 5, 2025
11cdbfa
update
wk989898 Sep 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ P=3
# The following packages are used in unit tests.
# Add new packages here if you want to include them in unit tests.
UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/dispatchermanager/... ./downstreamadapter/eventcollector/... ./pkg/sink/...
UT_PACKAGES_MAINTAINER := ./maintainer/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_MAINTAINER := ./maintainer/... ./pkg/scheduler/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_LOGSERVICE := ./logservice/...
UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/...

Expand Down
19 changes: 12 additions & 7 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,8 @@ func (h *OpenAPIV2) MoveTable(c *gin.Context) {
}

targetNodeID := c.Query("targetNodeID")
err = maintainer.MoveTable(int64(tableId), node.ID(targetNodeID))
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
err = maintainer.MoveTable(int64(tableId), node.ID(targetNodeID), mode)
if err != nil {
log.Error("failed to move table", zap.Error(err), zap.Int64("tableID", tableId), zap.String("targetNodeID", targetNodeID))
_ = c.Error(err)
Expand Down Expand Up @@ -859,7 +860,8 @@ func (h *OpenAPIV2) MoveSplitTable(c *gin.Context) {
}

targetNodeID := c.Query("targetNodeID")
err = maintainer.MoveSplitTable(int64(tableId), node.ID(targetNodeID))
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
err = maintainer.MoveSplitTable(int64(tableId), node.ID(targetNodeID), mode)
if err != nil {
log.Error("failed to move split table", zap.Error(err), zap.Int64("tableID", tableId), zap.String("targetNodeID", targetNodeID))
_ = c.Error(err)
Expand Down Expand Up @@ -930,8 +932,8 @@ func (h *OpenAPIV2) SplitTableByRegionCount(c *gin.Context) {
_ = c.Error(apperror.ErrMaintainerNotFounded)
return
}

err = maintainer.SplitTableByRegionCount(int64(tableId))
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
err = maintainer.SplitTableByRegionCount(int64(tableId), mode)
if err != nil {
log.Error("failed to split table by region count", zap.Error(err), zap.Int64("tableID", tableId))
_ = c.Error(err)
Expand Down Expand Up @@ -1002,7 +1004,8 @@ func (h *OpenAPIV2) MergeTable(c *gin.Context) {
return
}

err = maintainer.MergeTable(int64(tableId))
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
err = maintainer.MergeTable(int64(tableId), mode)
if err != nil {
log.Error("failed to merge table", zap.Error(err), zap.Int64("tableID", tableId))
_ = c.Error(err)
Expand Down Expand Up @@ -1061,7 +1064,8 @@ func (h *OpenAPIV2) ListTables(c *gin.Context) {
return
}

tables := maintainer.GetTables()
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
tables := maintainer.GetTables(mode)

nodeTableInfoMap := make(map[string]*NodeTableInfo)

Expand Down Expand Up @@ -1131,7 +1135,8 @@ func (h *OpenAPIV2) getDispatcherCount(c *gin.Context) {
return
}

number := maintainer.GetDispatcherCount()
mode, _ := strconv.ParseInt(c.Query("mode"), 10, 64)
number := maintainer.GetDispatcherCount(mode)
c.JSON(http.StatusOK, &DispatcherCount{Count: number})
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/cdc/cli/cli_changefeed_merge_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type mergeTableChangefeedOptions struct {
changefeedID string
namespace string
tableId int64
mode int64
}

// newCreateChangefeedOptions creates new options for the `cli changefeed create` command.
Expand All @@ -42,6 +43,7 @@ func (o *mergeTableChangefeedOptions) addFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVarP(&o.namespace, "namespace", "n", "default", "Replication task (changefeed) Namespace")
cmd.PersistentFlags().StringVarP(&o.changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID")
cmd.PersistentFlags().Int64VarP(&o.tableId, "table-id", "t", 0, "the id of table to move")
cmd.PersistentFlags().Int64Var(&o.mode, "mode", 0, "enable redo when mode is 1")
_ = cmd.MarkPersistentFlagRequired("changefeed-id")
_ = cmd.MarkPersistentFlagRequired("table-id")
}
Expand All @@ -61,7 +63,7 @@ func (o *mergeTableChangefeedOptions) complete(f factory.Factory) error {
func (o *mergeTableChangefeedOptions) run(cmd *cobra.Command) error {
ctx := context.Background()

err := o.apiClientV2.Changefeeds().MergeTable(ctx, o.namespace, o.changefeedID, o.tableId)
err := o.apiClientV2.Changefeeds().MergeTable(ctx, o.namespace, o.changefeedID, o.tableId, o.mode)
var errStr string
if err != nil {
errStr = err.Error()
Expand Down
4 changes: 3 additions & 1 deletion cmd/cdc/cli/cli_changefeed_move_split_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type moveSplitTableChangefeedOptions struct {
namespace string
tableId int64
targetNodeID string
mode int64
}

// newCreateChangefeedOptions creates new options for the `cli changefeed create` command.
Expand All @@ -44,6 +45,7 @@ func (o *moveSplitTableChangefeedOptions) addFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVarP(&o.changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID")
cmd.PersistentFlags().Int64VarP(&o.tableId, "table-id", "t", 0, "the id of table to move")
cmd.PersistentFlags().StringVarP(&o.targetNodeID, "target-node-id", "d", "", "the dest for the table to move")
cmd.PersistentFlags().Int64Var(&o.mode, "mode", 0, "enable redo when mode is 1")
_ = cmd.MarkPersistentFlagRequired("changefeed-id")
_ = cmd.MarkPersistentFlagRequired("table-id")
_ = cmd.MarkPersistentFlagRequired("target-node-id")
Expand All @@ -64,7 +66,7 @@ func (o *moveSplitTableChangefeedOptions) complete(f factory.Factory) error {
func (o *moveSplitTableChangefeedOptions) run(cmd *cobra.Command) error {
ctx := context.Background()

err := o.apiClientV2.Changefeeds().MoveSplitTable(ctx, o.namespace, o.changefeedID, o.tableId, o.targetNodeID)
err := o.apiClientV2.Changefeeds().MoveSplitTable(ctx, o.namespace, o.changefeedID, o.tableId, o.targetNodeID, o.mode)
var errStr string
if err != nil {
errStr = err.Error()
Expand Down
4 changes: 3 additions & 1 deletion cmd/cdc/cli/cli_changefeed_move_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type moveTableChangefeedOptions struct {
namespace string
tableId int64
targetNodeID string
mode int64
}

// newCreateChangefeedOptions creates new options for the `cli changefeed create` command.
Expand All @@ -44,6 +45,7 @@ func (o *moveTableChangefeedOptions) addFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVarP(&o.changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID")
cmd.PersistentFlags().Int64VarP(&o.tableId, "table-id", "t", 0, "the id of table to move")
cmd.PersistentFlags().StringVarP(&o.targetNodeID, "target-node-id", "d", "", "the dest for the table to move")
cmd.PersistentFlags().Int64Var(&o.mode, "mode", 0, "enable redo when mode is 1")
_ = cmd.MarkPersistentFlagRequired("changefeed-id")
_ = cmd.MarkPersistentFlagRequired("table-id")
_ = cmd.MarkPersistentFlagRequired("target-node-id")
Expand All @@ -69,7 +71,7 @@ type response struct {
func (o *moveTableChangefeedOptions) run(cmd *cobra.Command) error {
ctx := context.Background()

err := o.apiClientV2.Changefeeds().MoveTable(ctx, o.namespace, o.changefeedID, o.tableId, o.targetNodeID)
err := o.apiClientV2.Changefeeds().MoveTable(ctx, o.namespace, o.changefeedID, o.tableId, o.targetNodeID, o.mode)
var errStr string
if err != nil {
errStr = err.Error()
Expand Down
4 changes: 3 additions & 1 deletion cmd/cdc/cli/cli_changefeed_split_table_by_region_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type splitTableByRegionCountChangefeedOptions struct {
changefeedID string
namespace string
tableId int64
mode int64
}

// newCreateChangefeedOptions creates new options for the `cli changefeed create` command.
Expand All @@ -42,6 +43,7 @@ func (o *splitTableByRegionCountChangefeedOptions) addFlags(cmd *cobra.Command)
cmd.PersistentFlags().StringVarP(&o.namespace, "namespace", "n", "default", "Replication task (changefeed) Namespace")
cmd.PersistentFlags().StringVarP(&o.changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID")
cmd.PersistentFlags().Int64VarP(&o.tableId, "table-id", "t", 0, "the id of table to move")
cmd.PersistentFlags().Int64Var(&o.mode, "mode", 0, "enable redo when mode is 1")
_ = cmd.MarkPersistentFlagRequired("changefeed-id")
_ = cmd.MarkPersistentFlagRequired("table-id")
}
Expand All @@ -61,7 +63,7 @@ func (o *splitTableByRegionCountChangefeedOptions) complete(f factory.Factory) e
func (o *splitTableByRegionCountChangefeedOptions) run(cmd *cobra.Command) error {
ctx := context.Background()

err := o.apiClientV2.Changefeeds().SplitTableByRegionCount(ctx, o.namespace, o.changefeedID, o.tableId)
err := o.apiClientV2.Changefeeds().SplitTableByRegionCount(ctx, o.namespace, o.changefeedID, o.tableId, o.mode)
var errStr string
if err != nil {
errStr = err.Error()
Expand Down
75 changes: 40 additions & 35 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// DispatcherService defines the interface for providing dispatcher information and basic event handling.
type DispatcherService interface {
GetId() common.DispatcherID
GetType() int
GetMode() int64
GetStartTs() uint64
GetBDRMode() bool
GetChangefeedID() common.ChangeFeedID
Expand Down Expand Up @@ -121,6 +121,9 @@ type BasicDispatcher struct {
// componentStatus is the status of the dispatcher, such as working, removing, stopped.
componentStatus *ComponentStateWithMutex

// schemaIDToDispatchers is shared in the DispatcherManager
schemaIDToDispatchers *SchemaIDToDispatchers

// Shared info containing all common configuration and resources
sharedInfo *SharedInfo

Expand Down Expand Up @@ -148,8 +151,8 @@ type BasicDispatcher struct {

isRemoving atomic.Bool

seq uint64
dispatcherType int
seq uint64
mode int64

BootstrapState bootstrapState
}
Expand All @@ -159,29 +162,31 @@ func NewBasicDispatcher(
tableSpan *heartbeatpb.TableSpan,
startTs uint64,
schemaID int64,
schemaIDToDispatchers *SchemaIDToDispatchers,
startTsIsSyncpoint bool,
currentPDTs uint64,
dispatcherType int,
mode int64,
sink sink.Sink,
sharedInfo *SharedInfo,
) *BasicDispatcher {
dispatcher := &BasicDispatcher{
id: id,
tableSpan: tableSpan,
startTs: startTs,
startTsIsSyncpoint: startTsIsSyncpoint,
sharedInfo: sharedInfo,
sink: sink,
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing),
resolvedTs: startTs,
isRemoving: atomic.Bool{},
blockEventStatus: BlockEventStatus{blockPendingEvent: nil},
tableProgress: NewTableProgress(),
schemaID: schemaID,
resendTaskMap: newResendTaskMap(),
creationPDTs: currentPDTs,
dispatcherType: dispatcherType,
BootstrapState: BootstrapFinished,
id: id,
tableSpan: tableSpan,
startTs: startTs,
startTsIsSyncpoint: startTsIsSyncpoint,
sharedInfo: sharedInfo,
sink: sink,
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Initializing),
resolvedTs: startTs,
isRemoving: atomic.Bool{},
blockEventStatus: BlockEventStatus{blockPendingEvent: nil},
tableProgress: NewTableProgress(),
schemaID: schemaID,
schemaIDToDispatchers: schemaIDToDispatchers,
resendTaskMap: newResendTaskMap(),
creationPDTs: currentPDTs,
mode: mode,
BootstrapState: BootstrapFinished,
}

return dispatcher
Expand Down Expand Up @@ -275,7 +280,7 @@ func (d *BasicDispatcher) updateDispatcherStatusToWorking() {
ID: d.id.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: d.GetCheckpointTs(),
IsRedo: IsRedoDispatcher(d),
Mode: d.GetMode(),
},
Seq: d.seq,
}
Expand Down Expand Up @@ -306,17 +311,17 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC
//
// wakeCallback is used to wake the dynamic stream to handle the next batch events.
// It will be called when all the events are flushed to downstream successfully.
func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) (block bool) {
func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
// Only return false when all events are resolvedTs Event.
block = false
block := false
dmlWakeOnce := &sync.Once{}
dmlEvents := make([]*commonEvent.DMLEvent, 0, len(dispatcherEvents))
latestResolvedTs := uint64(0)
isRedo := IsRedoDispatcher(d)
// Dispatcher is ready, handle the events
for _, dispatcherEvent := range dispatcherEvents {
log.Debug("dispatcher receive all event",
zap.Stringer("dispatcher", d.id), zap.Bool("isRedo", isRedo),
zap.Stringer("dispatcher", d.id), zap.Int64("mode", d.mode),
zap.String("eventType", commonEvent.TypeToString(dispatcherEvent.Event.GetType())),
zap.Any("event", dispatcherEvent.Event))
failpoint.Inject("HandleEventsSlowly", func() {
lag := time.Duration(rand.Intn(5000)) * time.Millisecond
Expand Down Expand Up @@ -376,7 +381,7 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
err := ddl.GetError()
if err != nil {
d.HandleError(err)
return
return block
}
log.Info("dispatcher receive ddl event",
zap.Stringer("dispatcher", d.id),
Expand All @@ -392,7 +397,7 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
})
d.dealWithBlockEvent(ddl)
case commonEvent.TypeSyncPointEvent:
if isRedo {
if common.IsRedoMode(d.GetMode()) {
continue
}
if len(dispatcherEvents) != 1 {
Expand Down Expand Up @@ -510,7 +515,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
IsSyncPoint: dispatcherStatus.GetAction().IsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
IsRedo: IsRedoDispatcher(d),
Mode: d.GetMode(),
}
}
}
Expand Down Expand Up @@ -568,7 +573,7 @@ func (d *BasicDispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
IsSyncPoint: false, // sync point event must should block
Stage: heartbeatpb.BlockStage_NONE,
},
IsRedo: IsRedoDispatcher(d),
Mode: d.GetMode(),
}
identifier := BlockEventIdentifier{
CommitTs: event.GetCommitTs(),
Expand Down Expand Up @@ -622,7 +627,7 @@ func (d *BasicDispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
IsSyncPoint: true,
Stage: heartbeatpb.BlockStage_WAITING,
},
IsRedo: IsRedoDispatcher(d),
Mode: d.GetMode(),
}
identifier := BlockEventIdentifier{
CommitTs: commitTs,
Expand All @@ -644,7 +649,7 @@ func (d *BasicDispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
IsSyncPoint: false,
Stage: heartbeatpb.BlockStage_WAITING,
},
IsRedo: IsRedoDispatcher(d),
Mode: d.GetMode(),
}
identifier := BlockEventIdentifier{
CommitTs: event.GetCommitTs(),
Expand Down Expand Up @@ -676,7 +681,7 @@ func (d *BasicDispatcher) dealWithBlockEvent(event commonEvent.BlockEvent) {
return
} else {
d.schemaID = schemaIDChange.NewSchemaID
d.sharedInfo.schemaIDToDispatchers.Update(schemaIDChange.OldSchemaID, schemaIDChange.NewSchemaID)
d.schemaIDToDispatchers.Update(schemaIDChange.OldSchemaID, schemaIDChange.NewSchemaID)
return
}
}
Expand Down Expand Up @@ -742,7 +747,7 @@ func (d *BasicDispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
log.Info("dispatcher component has stopped and is ready for cleanup",
zap.Stringer("changefeedID", d.sharedInfo.changefeedID),
zap.Stringer("dispatcher", d.id),
zap.Bool("isRedo", IsRedoDispatcher(d)),
zap.Int64("mode", d.mode),
zap.String("table", common.FormatTableSpan(d.tableSpan)),
zap.Uint64("checkpointTs", d.GetCheckpointTs()),
zap.Uint64("resolvedTs", d.GetResolvedTs()),
Expand All @@ -751,7 +756,7 @@ func (d *BasicDispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
}
log.Info("dispatcher is not ready to close",
zap.Stringer("dispatcher", d.id),
zap.Bool("isRedo", IsRedoDispatcher(d)),
zap.Int64("mode", d.mode),
zap.Bool("sinkIsNormal", d.sink.IsNormal()),
zap.Bool("tableProgressEmpty", d.tableProgress.Empty()),
zap.Int("tableProgressLen", d.tableProgress.Len()),
Expand All @@ -763,7 +768,7 @@ func (d *BasicDispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
func (d *BasicDispatcher) removeDispatcher() {
log.Info("remove dispatcher",
zap.Stringer("dispatcher", d.id),
zap.Bool("isRedo", IsRedoDispatcher(d)),
zap.Int64("mode", d.mode),
zap.Stringer("changefeedID", d.sharedInfo.changefeedID),
zap.String("table", common.FormatTableSpan(d.tableSpan)))
dispatcherStatusDS := GetDispatcherStatusDynamicStream()
Expand Down
Loading