@@ -73,9 +73,8 @@ type schemaStorage struct {
73
73
snaps []* schema.Snapshot
74
74
snapsMu sync.RWMutex
75
75
76
- gcTs uint64
77
- resolvedTs uint64
78
- schemaVersion int64
76
+ gcTs uint64
77
+ resolvedTs uint64
79
78
80
79
filter filter.Filter
81
80
@@ -92,9 +91,8 @@ func NewSchemaStorage(
92
91
role util.Role , filter filter.Filter ,
93
92
) (SchemaStorage , error ) {
94
93
var (
95
- snap * schema.Snapshot
96
- version int64
97
- err error
94
+ snap * schema.Snapshot
95
+ err error
98
96
)
99
97
// storage may be nil in some unit test cases.
100
98
if storage == nil {
@@ -105,7 +103,6 @@ func NewSchemaStorage(
105
103
if err != nil {
106
104
return nil , errors .Trace (err )
107
105
}
108
- version , err = schema .GetSchemaVersion (meta )
109
106
if err != nil {
110
107
return nil , errors .Trace (err )
111
108
}
@@ -116,7 +113,6 @@ func NewSchemaStorage(
116
113
forceReplicate : forceReplicate ,
117
114
filter : filter ,
118
115
id : id ,
119
- schemaVersion : version ,
120
116
role : role ,
121
117
}, nil
122
118
}
@@ -194,7 +190,6 @@ func (s *schemaStorage) GetLastSnapshot() *schema.Snapshot {
194
190
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
195
191
func (s * schemaStorage ) HandleDDLJob (job * timodel.Job ) error {
196
192
if s .skipJob (job ) {
197
- s .schemaVersion = job .BinlogInfo .SchemaVersion
198
193
s .AdvanceResolvedTs (job .BinlogInfo .FinishedTS )
199
194
return nil
200
195
}
@@ -203,16 +198,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
203
198
var snap * schema.Snapshot
204
199
if len (s .snaps ) > 0 {
205
200
lastSnap := s .snaps [len (s .snaps )- 1 ]
206
- // We use schemaVersion to check if an already-executed DDL job is processed for a second time.
207
- // Unexecuted DDL jobs should have largest schemaVersions.
208
- if job .BinlogInfo .FinishedTS <= lastSnap .CurrentTs () || job .BinlogInfo .SchemaVersion <= s .schemaVersion {
201
+ if job .BinlogInfo .FinishedTS <= lastSnap .CurrentTs () {
209
202
log .Info ("schemaStorage: ignore foregone DDL" ,
210
203
zap .String ("namespace" , s .id .Namespace ),
211
204
zap .String ("changefeed" , s .id .ID ),
212
205
zap .String ("DDL" , job .Query ),
213
206
zap .Int64 ("jobID" , job .ID ),
214
207
zap .Uint64 ("finishTs" , job .BinlogInfo .FinishedTS ),
215
- zap .Int64 ("schemaVersion" , s .schemaVersion ),
216
208
zap .Int64 ("jobSchemaVersion" , job .BinlogInfo .SchemaVersion ),
217
209
zap .String ("role" , s .role .String ()))
218
210
return nil
@@ -234,7 +226,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
234
226
return errors .Trace (err )
235
227
}
236
228
s .snaps = append (s .snaps , snap )
237
- s .schemaVersion = job .BinlogInfo .SchemaVersion
238
229
s .AdvanceResolvedTs (job .BinlogInfo .FinishedTS )
239
230
log .Info ("schemaStorage: update snapshot by the DDL job" ,
240
231
zap .String ("namespace" , s .id .Namespace ),
@@ -243,7 +234,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
243
234
zap .String ("table" , job .TableName ),
244
235
zap .String ("query" , job .Query ),
245
236
zap .Uint64 ("finishedTs" , job .BinlogInfo .FinishedTS ),
246
- zap .Uint64 ("schemaVersion" , uint64 (s .schemaVersion )),
247
237
zap .String ("role" , s .role .String ()))
248
238
return nil
249
239
}
0 commit comments