Skip to content

Commit 14bc47c

Browse files
authored
coordinator: fix test assert and refactor StopByChangefeedID (#1898)
close #1897
1 parent 1d63daf commit 14bc47c

File tree

4 files changed

+29
-28
lines changed

4 files changed

+29
-28
lines changed

coordinator/changefeed/backoff.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (m *Backoff) shouldFailWhenRetry() bool {
106106
// resetErrRetry reset the error retry related fields
107107
func (m *Backoff) resetErrRetry() {
108108
m.errBackoff.Reset()
109-
m.nextRetryTime = atomic.NewTime(time.Time{})
109+
m.nextRetryTime.Store(time.Time{})
110110
m.failed.Store(false)
111111
m.retrying.Store(false)
112112
}
@@ -203,7 +203,7 @@ func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbea
203203
}
204204
// set the next retry time
205205
m.backoffInterval = m.errBackoff.NextBackOff()
206-
m.nextRetryTime = atomic.NewTime(time.Now().Add(m.backoffInterval))
206+
m.nextRetryTime.Store(time.Now().Add(m.backoffInterval))
207207

208208
// check if we exceed the maxElapsedTime
209209
if m.shouldFailWhenRetry() {

coordinator/changefeed/backoff_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func TestRetry(t *testing.T) {
7878
})
7979
require.False(t, changefeed)
8080
require.Equal(t, config.StateFailed, state)
81+
require.Nil(t, err)
8182
require.Equal(t, uint64(2), backoff.checkpointTs)
8283

8384
backoff.resetErrRetry()

coordinator/changefeed/changefeed_db.go

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -107,30 +107,27 @@ func (db *ChangefeedDB) StopByChangefeedID(cfID common.ChangeFeedID, remove bool
107107
defer db.lock.Unlock()
108108

109109
cf, ok := db.changefeeds[cfID]
110-
if ok {
111-
// remove the changefeed
112-
delete(db.changefeeds, cfID)
113-
delete(db.stopped, cf.ID)
114-
db.RemoveReplicaWithoutLock(cf)
115-
116-
if remove {
117-
log.Info("remove changefeed", zap.String("changefeed", cf.ID.String()))
118-
} else {
119-
log.Info("stop changefeed", zap.String("changefeed", cfID.String()))
120-
// push back to stopped
121-
db.changefeeds[cfID] = cf
122-
db.stopped[cfID] = cf
123-
}
124-
125-
nodeID := cf.GetNodeID()
126-
if cf.GetNodeID() == "" {
127-
log.Info("changefeed is not scheduled, delete directly")
128-
return ""
129-
}
110+
if !ok {
111+
return ""
112+
}
113+
nodeID := cf.GetNodeID()
114+
if nodeID != "" {
130115
cf.SetNodeID("")
131-
return nodeID
132116
}
133-
return ""
117+
118+
// Remove from replication tracking
119+
db.RemoveReplicaWithoutLock(cf)
120+
121+
if remove {
122+
log.Info("remove changefeed", zap.String("changefeed", cf.ID.String()))
123+
delete(db.changefeeds, cfID)
124+
delete(db.stopped, cfID)
125+
} else {
126+
log.Info("stop changefeed", zap.String("changefeed", cfID.String()))
127+
db.stopped[cfID] = cf
128+
}
129+
130+
return nodeID
134131
}
135132

136133
// GetSize returns the size of the all chagnefeeds
@@ -186,8 +183,8 @@ func (db *ChangefeedDB) MarkMaintainerReplicating(task *Changefeed) {
186183
}
187184

188185
// GetWaitingSchedulingChangefeeds returns the absent maintainers and the working state of each node
189-
func (db *ChangefeedDB) GetWaitingSchedulingChangefeeds(absent []*Changefeed, maxSize int) ([]*Changefeed, map[node.ID]int) {
190-
absent = db.GetAbsent()
186+
func (db *ChangefeedDB) GetWaitingSchedulingChangefeeds(maxSize int) ([]*Changefeed, map[node.ID]int) {
187+
absent := db.GetAbsent()
191188
if len(absent) > maxSize {
192189
absent = absent[:maxSize]
193190
}

coordinator/changefeed/changefeed_db_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func TestStopByChangefeedID(t *testing.T) {
6969
require.Contains(t, db.stopped, cf.ID)
7070
require.Contains(t, db.changefeeds, cf.ID)
7171
require.Equal(t, node.ID("node-1"), nodeID)
72+
require.Lenf(t, db.GetReplicating(), 0, "")
7273

7374
require.Equal(t, "", db.StopByChangefeedID(common.NewChangeFeedIDWithName("a"), false).String())
7475
}
@@ -96,12 +97,14 @@ func TestRemoveChangefeed(t *testing.T) {
9697
db.StopByChangefeedID(cf.ID, false)
9798
require.NotContains(t, db.GetAbsent(), cf)
9899
require.Contains(t, db.changefeeds, cf.ID)
100+
require.Contains(t, db.stopped, cf.ID)
99101

100102
cf2 := &Changefeed{ID: common.NewChangeFeedIDWithName("test2")}
101103
db.AddReplicatingMaintainer(cf2, "node1")
102104
require.Equal(t, node.ID("node1"), db.StopByChangefeedID(cf2.ID, true))
103105
require.NotContains(t, db.GetAbsent(), cf2)
104106
require.NotContains(t, db.changefeeds, cf2.ID)
107+
require.NotContains(t, db.stopped, cf2.ID)
105108
require.Equal(t, "", cf2.nodeID.String())
106109
}
107110

@@ -140,13 +143,13 @@ func TestGetWaitingSchedulingChangefeeds(t *testing.T) {
140143
cf3.backoff = NewBackoff(cf3.ID, 0, 0)
141144
db.AddAbsentChangefeed(cf3)
142145

143-
result, nMap := db.GetWaitingSchedulingChangefeeds(nil, 3)
146+
result, nMap := db.GetWaitingSchedulingChangefeeds(3)
144147
require.Equal(t, 1, nMap["node1"])
145148
require.NotContains(t, result, cf1)
146149
require.NotContains(t, result, cf2)
147150
require.Contains(t, result, cf3)
148151

149-
result, nMap = db.GetWaitingSchedulingChangefeeds(nil, 1)
152+
result, nMap = db.GetWaitingSchedulingChangefeeds(1)
150153
require.Equal(t, 1, nMap["node1"])
151154
require.Len(t, result, 1)
152155
}

0 commit comments

Comments
 (0)