Skip to content

Commit bef13bf

Browse files
committed
fix CI
Signed-off-by: lance6716 <[email protected]>
1 parent d89ecf4 commit bef13bf

File tree

3 files changed

+24
-15
lines changed

3 files changed

+24
-15
lines changed

pkg/ddl/notifier/store.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type Store interface {
3535
se *sess.Session,
3636
ddlJobID int64,
3737
multiSchemaChangeID int64,
38-
processedBy uint64,
38+
oldProcessedBy uint64,
39+
newProcessedBy uint64,
3940
) error
4041
DeleteAndCommit(ctx context.Context, se *sess.Session, ddlJobID int64, multiSchemaChangeID int) error
4142
// List will start a transaction of given session and read all schema changes
@@ -87,17 +88,28 @@ func (t *tableStore) UpdateProcessed(
8788
se *sess.Session,
8889
ddlJobID int64,
8990
multiSchemaChangeID int64,
90-
processedBy uint64,
91+
oldProcessedBy uint64,
92+
newProcessedBy uint64,
9193
) error {
9294
sql := fmt.Sprintf(`
9395
UPDATE %s.%s
9496
SET processed_by_flag = %d
95-
WHERE ddl_job_id = %d AND sub_job_id = %d`,
97+
WHERE ddl_job_id = %d AND sub_job_id = %d AND processed_by_flag = %d`,
9698
t.db, t.table,
97-
processedBy,
98-
ddlJobID, multiSchemaChangeID)
99+
newProcessedBy,
100+
ddlJobID, multiSchemaChangeID, oldProcessedBy,
101+
)
99102
_, err := se.Execute(ctx, sql, "ddl_notifier")
100-
return err
103+
if err != nil {
104+
return errors.Trace(err)
105+
}
106+
if se.GetSessionVars().StmtCtx.AffectedRows() == 0 {
107+
return errors.Errorf(
108+
"failed to update processed_by_flag, maybe the row has been updated by other owner. ddl_job_id: %d, sub_job_id: %d",
109+
ddlJobID, multiSchemaChangeID,
110+
)
111+
}
112+
return nil
101113
}
102114

103115
// DeleteAndCommit implements Store interface.

pkg/ddl/notifier/subscribe.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -298,17 +298,14 @@ func (n *DDLNotifier) processEventForHandler(
298298
zap.Duration("duration", time.Since(now)))
299299
}
300300

301-
if err = n.store.UpdateProcessed(
301+
return errors.Trace(n.store.UpdateProcessed(
302302
ctx,
303303
session,
304304
change.ddlJobID,
305305
change.subJobID,
306+
change.processedByFlag,
306307
newFlag,
307-
); err != nil {
308-
return errors.Trace(err)
309-
}
310-
311-
return nil
308+
))
312309
}
313310

314311
// Stop stops the background loop.

pkg/ddl/notifier/testkit_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,9 @@ func Test2OwnerForAShortTime(t *testing.T) {
318318

319319
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
320320
sessionPool := util.NewSessionPool(
321-
1,
321+
4,
322322
func() (pools.Resource, error) {
323-
return tk.Session(), nil
323+
return testkit.NewTestKit(t, store).Session(), nil
324324
},
325325
nil,
326326
nil,
@@ -360,7 +360,7 @@ func Test2OwnerForAShortTime(t *testing.T) {
360360
if !bytes.Contains(content, []byte("Error processing change")) {
361361
return false
362362
}
363-
return bytes.Contains(content, []byte("Write conflict"))
363+
return bytes.Contains(content, []byte("maybe the row has been updated by other owner"))
364364
}, time.Second, 25*time.Millisecond)
365365
// the handler should not commit
366366
tk2.MustQuery("SELECT * FROM test.result").Check(testkit.Rows())

0 commit comments

Comments
 (0)