Skip to content

Commit 041c9fb

Browse files
authored
puller: Support discarding unsupported DDL by setting ignore-txn-start-ts in filter. (#12287) (#12290)
close #12286
1 parent c9eff3f commit 041c9fb

File tree

5 files changed

+113
-37
lines changed

5 files changed

+113
-37
lines changed

cdc/puller/ddl_puller.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -298,15 +298,15 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
298298
if !ok {
299299
shouldDiscardOldTable = true
300300
} else {
301-
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, oldTable.Name.O)
301+
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, oldTable.Name.O, job.StartTS)
302302
}
303303

304304
newSchemaName, ok := snap.SchemaByID(newSchemaIDs[i])
305305
if !ok {
306306
// the new table name does not hit the filter rule, so we should discard the table.
307307
shouldDiscardNewTable = true
308308
} else {
309-
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, newTableNames[i].O)
309+
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, newTableNames[i].O, job.StartTS)
310310
}
311311

312312
if shouldDiscardOldTable && shouldDiscardNewTable {
@@ -395,6 +395,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
395395
zap.String("schema", job.SchemaName),
396396
zap.String("table", job.TableName),
397397
zap.String("query", job.Query),
398+
zap.Uint64("startTs", job.StartTS),
399+
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
398400
zap.Stringer("job", job),
399401
zap.Error(err))
400402
}
@@ -418,17 +420,19 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
418420
snap := p.schemaStorage.GetLastSnapshot()
419421
if err := snap.FillSchemaName(job); err != nil {
420422
log.Info("failed to fill schema name for ddl job", zap.Error(err))
421-
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
423+
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS) {
422424
return true, nil
423425
}
424-
return true, errors.Trace(err)
426+
return true, cerror.WrapError(cerror.ErrHandleDDLFailed,
427+
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
425428
}
426429

427430
switch job.Type {
428431
case timodel.ActionRenameTables:
429432
skip, err = p.handleRenameTables(job)
430433
if err != nil {
431-
return true, errors.Trace(err)
434+
return true, cerror.WrapError(cerror.ErrHandleDDLFailed,
435+
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
432436
}
433437
case timodel.ActionRenameTable:
434438
log.Info("rename table ddl job",
@@ -441,7 +445,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
441445
oldTable, ok := snap.PhysicalTableByID(job.TableID)
442446
if !ok {
443447
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
444-
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
448+
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
445449
if !discard {
446450
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
447451
}
@@ -452,8 +456,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
452456
zap.String("oldSchemaName", oldTable.TableName.Schema))
453457
// since we can find the old table, we must can find the old schema.
454458
// 2. If we can find the preTableInfo, we filter it by the old table name.
455-
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
456-
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
459+
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.StartTS)
460+
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
457461
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
458462
if skipByOldTableName && !skipByNewTableName {
459463
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
@@ -468,7 +472,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
468472
if job.BinlogInfo.TableInfo != nil {
469473
job.TableName = job.BinlogInfo.TableInfo.Name.O
470474
}
471-
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
475+
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS)
472476
}
473477

474478
if skip {
@@ -485,7 +489,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
485489
zap.String("table", job.BinlogInfo.TableInfo.Name.O),
486490
zap.String("job", job.String()),
487491
zap.Error(err))
488-
return true, errors.Trace(err)
492+
return true, cerror.WrapError(cerror.ErrHandleDDLFailed,
493+
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
489494
}
490495

491496
p.setResolvedTs(job.BinlogInfo.FinishedTS)

cdc/puller/ddl_puller_test.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ func TestHandleJob(t *testing.T) {
445445
job.StartTS = 1
446446
skip, err = ddlJobPullerImpl.handleJob(job)
447447
require.NoError(t, err)
448-
require.False(t, skip)
448+
require.True(t, skip)
449449

450450
job = helper.DDL2Job("create table test1.t2(id int primary key)")
451451
skip, err = ddlJobPullerImpl.handleJob(job)
@@ -891,3 +891,51 @@ func TestCcheckIneligibleTableDDL(t *testing.T) {
891891
require.NoError(t, err)
892892
require.False(t, skip)
893893
}
894+
895+
func TestHandleExchangeTableName(t *testing.T) {
896+
startTs := uint64(10)
897+
mockPuller := newMockPuller(t, startTs)
898+
ddlJobPuller, helper := newMockDDLJobPuller(t, mockPuller, true)
899+
defer helper.Close()
900+
901+
ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl)
902+
ddlJobPullerImpl.setResolvedTs(startTs)
903+
904+
cfg := config.GetDefaultReplicaConfig()
905+
f, err := filter.NewFilter(cfg, "")
906+
require.NoError(t, err)
907+
ddlJobPullerImpl.filter = f
908+
909+
ddl := helper.DDL2Job("CREATE DATABASE test1;")
910+
skip, err := ddlJobPullerImpl.handleJob(ddl)
911+
require.NoError(t, err)
912+
require.False(t, skip)
913+
914+
helper.Tk().MustExec("Use test1;")
915+
916+
ddl = helper.DDL2Job("CREATE TABLE a (id INT PRIMARY KEY);")
917+
skip, err = ddlJobPullerImpl.handleJob(ddl)
918+
require.NoError(t, err)
919+
require.False(t, skip)
920+
921+
ddl = helper.DDL2Job("CREATE TABLE b (id INT PRIMARY KEY);")
922+
skip, err = ddlJobPullerImpl.handleJob(ddl)
923+
require.NoError(t, err)
924+
require.False(t, skip)
925+
926+
ddl = helper.DDL2Job("rename table a to c, b to a, c to b")
927+
_, err = ddlJobPullerImpl.handleJob(ddl)
928+
// Should return an error
929+
require.Error(t, err)
930+
require.Contains(t, err.Error(), "ignore-txn-start-ts")
931+
932+
// Add ddl.StartTs to ignore-txn-start-ts, expect no error.
933+
cfg.Filter.IgnoreTxnStartTs = []uint64{ddl.StartTS}
934+
adjustFilter, err := filter.NewFilter(cfg, "")
935+
require.NoError(t, err)
936+
ddlJobPullerImpl.filter = adjustFilter
937+
938+
skip, err = ddlJobPullerImpl.handleJob(ddl)
939+
require.NoError(t, err)
940+
require.True(t, skip)
941+
}

cmd/filter-helper/main.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,7 @@ func runFilter(cmd *cobra.Command, args []string) {
8181
fmt.Printf("Table: %s, Not matched filter rule\n", table)
8282
case "ddl":
8383
ddlType := timodel.ActionCreateTable
84-
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1])
85-
if err != nil {
86-
fmt.Printf("filter ddl error: %s, error: %v\n", ddl, err)
87-
return
88-
}
84+
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1], 0)
8985
if discard {
9086
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
9187
return

pkg/filter/filter.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type Filter interface {
8080
// ShouldDiscardDDL returns true if this DDL should be discarded.
8181
// If a ddl is discarded, it will neither be applied to cdc's schema storage
8282
// nor sent to downstream.
83-
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool
83+
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool
8484
// ShouldIgnoreTable returns true if the table should be ignored.
8585
ShouldIgnoreTable(schema, table string) bool
8686
// ShouldIgnoreSchema returns true if the schema should be ignored.
@@ -161,11 +161,15 @@ func (f *filter) ShouldIgnoreDMLEvent(
161161
// 0. By allow list.
162162
// 1. By schema name.
163163
// 2. By table name.
164-
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool {
164+
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool {
165165
if !isAllowedDDL(ddlType) {
166166
return true
167167
}
168168

169+
if f.shouldIgnoreStartTs(startTs) {
170+
return true
171+
}
172+
169173
if IsSchemaDDL(ddlType) {
170174
return f.ShouldIgnoreSchema(schema)
171175
}

pkg/filter/filter_test.go

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,12 @@ func TestShouldDiscardDDL(t *testing.T) {
285285
table string
286286
query string
287287
ddlType timodel.ActionType
288+
startTs uint64
288289
ignore bool
289290
}
290291
rules []string
291292
eventFilters []*config.EventFilterRule
293+
ignoredTs []uint64
292294
}{
293295
{
294296
// Discard by not allowed DDL type cases.
@@ -297,11 +299,12 @@ func TestShouldDiscardDDL(t *testing.T) {
297299
table string
298300
query string
299301
ddlType timodel.ActionType
302+
startTs uint64
300303
ignore bool
301304
}{
302-
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
303-
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
304-
{"test", "", "create database test", timodel.ActionCreateSequence, true},
305+
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
306+
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
307+
{"test", "", "create database test", timodel.ActionCreateSequence, 0, true},
305308
},
306309
rules: []string{"*.*"},
307310
},
@@ -312,18 +315,19 @@ func TestShouldDiscardDDL(t *testing.T) {
312315
table string
313316
query string
314317
ddlType timodel.ActionType
318+
startTs uint64
315319
ignore bool
316320
}{
317-
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
318-
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
321+
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
322+
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
319323
{
320324
"sns", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
321-
timodel.ActionModifySchemaCharsetAndCollate, false,
325+
timodel.ActionModifySchemaCharsetAndCollate, 0, false,
322326
},
323-
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
324-
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
325-
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
326-
{"test", "", "create database test", timodel.ActionCreateSchema, true},
327+
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
328+
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
329+
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
330+
{"test", "", "create database test", timodel.ActionCreateSchema, 0, true},
327331
},
328332
rules: []string{"sns.*", "ecom.*", "!sns.log", "!ecom.test"},
329333
},
@@ -334,32 +338,51 @@ func TestShouldDiscardDDL(t *testing.T) {
334338
table string
335339
query string
336340
ddlType timodel.ActionType
341+
startTs uint64
337342
ignore bool
338343
}{
339-
{"schema", "C1", "create database test", timodel.ActionCreateSchema, false},
340-
{"test", "", "drop database test1", timodel.ActionDropSchema, true},
344+
{"schema", "C1", "create database test", timodel.ActionCreateSchema, 0, false},
345+
{"test", "", "drop database test1", timodel.ActionDropSchema, 0, true},
341346
{
342347
"dbname", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
343-
timodel.ActionModifySchemaCharsetAndCollate, true,
348+
timodel.ActionModifySchemaCharsetAndCollate, 0, true,
344349
},
345-
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
346-
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
347-
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
350+
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
351+
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
352+
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
348353
},
349354
rules: []string{"schema.C1"},
350355
},
356+
{
357+
// Discard by startTs cases.
358+
cases: []struct {
359+
schema string
360+
table string
361+
query string
362+
ddlType timodel.ActionType
363+
startTs uint64
364+
ignore bool
365+
}{
366+
{"sns", "", "create database test", timodel.ActionCreateSchema, 1, false},
367+
{"sns", "", "drop database test", timodel.ActionDropSchema, 2, true},
368+
{"test", "", "create database test", timodel.ActionCreateSequence, 3, true},
369+
},
370+
rules: []string{"*.*"},
371+
ignoredTs: []uint64{2, 3},
372+
},
351373
}
352374

353375
for _, ftc := range testCases {
354376
filter, err := NewFilter(&config.ReplicaConfig{
355377
Filter: &config.FilterConfig{
356-
Rules: ftc.rules,
357-
EventFilters: ftc.eventFilters,
378+
Rules: ftc.rules,
379+
EventFilters: ftc.eventFilters,
380+
IgnoreTxnStartTs: ftc.ignoredTs,
358381
},
359382
}, "")
360383
require.Nil(t, err)
361384
for _, tc := range ftc.cases {
362-
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table)
385+
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table, tc.startTs)
363386
require.Equal(t, tc.ignore, ignore, "%#v", tc)
364387
}
365388
}

0 commit comments

Comments
 (0)