Skip to content

Commit db43be2

Browse files
authored
puller: Support discarding unsupported DDL by setting ignore-txn-start-ts in filter. (#12287)
close #12286
1 parent 654dac2 commit db43be2

File tree

5 files changed

+105
-31
lines changed

5 files changed

+105
-31
lines changed

cdc/puller/ddl_puller.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
355355
zap.Uint64("startTs", job.StartTS),
356356
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
357357
zap.Error(err))
358-
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
358+
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS) {
359359
return true, nil
360360
}
361361
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
@@ -403,7 +403,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
403403

404404
for index, tableInfo := range multiTableInfos {
405405
// judge each table whether need to be skip
406-
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, tableInfo.Name.O) {
406+
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, tableInfo.Name.O, job.StartTS) {
407407
continue
408408
}
409409
newMultiTableInfos = append(newMultiTableInfos, multiTableInfos[index])
@@ -418,7 +418,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
418418
oldTable, ok := snap.PhysicalTableByID(job.TableID)
419419
if !ok {
420420
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
421-
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
421+
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
422422
if !discard {
423423
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
424424
}
@@ -434,8 +434,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
434434
}
435435
// since we can find the old table, it must be able to find the old schema.
436436
// 2. If we can find the preTableInfo, we filter it by the old table name.
437-
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
438-
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
437+
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.StartTS)
438+
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
439439
if err != nil {
440440
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
441441
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
@@ -460,7 +460,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
460460
if job.BinlogInfo.TableInfo != nil {
461461
job.TableName = job.BinlogInfo.TableInfo.Name.O
462462
}
463-
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
463+
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS)
464464
}
465465

466466
if skip {
@@ -569,15 +569,15 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
569569
if !ok {
570570
shouldDiscardOldTable = true
571571
} else {
572-
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, info.OldSchemaName.O, oldTable.Name.O)
572+
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, info.OldSchemaName.O, oldTable.Name.O, job.StartTS)
573573
}
574574

575575
newSchemaName, ok := snap.SchemaByID(info.NewSchemaID)
576576
if !ok {
577577
// the new table name does not hit the filter rule, so we should discard the table.
578578
shouldDiscardNewTable = true
579579
} else {
580-
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, info.NewTableName.O)
580+
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, info.NewTableName.O, job.StartTS)
581581
}
582582

583583
if shouldDiscardOldTable && shouldDiscardNewTable {

cdc/puller/ddl_puller_test.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ func TestHandleJob(t *testing.T) {
412412
job.StartTS = 1
413413
skip, err = ddlJobPullerImpl.handleJob(job)
414414
require.NoError(t, err)
415-
require.False(t, skip)
415+
require.True(t, skip)
416416

417417
job = helper.DDL2Job("create table test1.t2(id int primary key)")
418418
skip, err = ddlJobPullerImpl.handleJob(job)
@@ -820,3 +820,50 @@ func TestCheckIneligibleTableDDL(t *testing.T) {
820820
require.NoError(t, err)
821821
require.False(t, skip)
822822
}
823+
824+
func TestHandleExchangeTableName(t *testing.T) {
825+
ddlJobPuller, helper := newMockDDLJobPuller(t, true)
826+
defer helper.Close()
827+
828+
startTs := uint64(10)
829+
ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl)
830+
ddlJobPullerImpl.setResolvedTs(startTs)
831+
832+
cfg := config.GetDefaultReplicaConfig()
833+
f, err := filter.NewFilter(cfg, "")
834+
require.NoError(t, err)
835+
ddlJobPullerImpl.filter = f
836+
837+
ddl := helper.DDL2Job("CREATE DATABASE test1;")
838+
skip, err := ddlJobPullerImpl.handleJob(ddl)
839+
require.NoError(t, err)
840+
require.False(t, skip)
841+
842+
helper.Tk().MustExec("Use test1;")
843+
844+
ddl = helper.DDL2Job("CREATE TABLE a (id INT PRIMARY KEY);")
845+
skip, err = ddlJobPullerImpl.handleJob(ddl)
846+
require.NoError(t, err)
847+
require.False(t, skip)
848+
849+
ddl = helper.DDL2Job("CREATE TABLE b (id INT PRIMARY KEY);")
850+
skip, err = ddlJobPullerImpl.handleJob(ddl)
851+
require.NoError(t, err)
852+
require.False(t, skip)
853+
854+
ddl = helper.DDL2Job("rename table a to c, b to a, c to b")
855+
_, err = ddlJobPullerImpl.handleJob(ddl)
856+
// Should return an error
857+
require.Error(t, err)
858+
require.Contains(t, err.Error(), "ignore-txn-start-ts")
859+
860+
// Add ddl.StartTs to ignore-txn-start-ts, expect no error.
861+
cfg.Filter.IgnoreTxnStartTs = []uint64{ddl.StartTS}
862+
adjustFilter, err := filter.NewFilter(cfg, "")
863+
require.NoError(t, err)
864+
ddlJobPullerImpl.filter = adjustFilter
865+
866+
skip, err = ddlJobPullerImpl.handleJob(ddl)
867+
require.NoError(t, err)
868+
require.True(t, skip)
869+
}

cmd/filter-helper/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func runFilter(cmd *cobra.Command, args []string) {
8181
fmt.Printf("Table: %s, Not matched filter rule\n", table)
8282
case "ddl":
8383
ddlType := timodel.ActionCreateTable
84-
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1])
84+
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1], 0)
8585
if discard {
8686
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
8787
return

pkg/filter/filter.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ type Filter interface {
104104
// ShouldDiscardDDL returns true if this DDL should be discarded.
105105
// If a ddl is discarded, it will neither be applied to cdc's schema storage
106106
// nor sent to downstream.
107-
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool
107+
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool
108108
// ShouldIgnoreTable returns true if the table should be ignored.
109109
ShouldIgnoreTable(schema, table string) bool
110110
// ShouldIgnoreSchema returns true if the schema should be ignored.
@@ -185,11 +185,15 @@ func (f *filter) ShouldIgnoreDMLEvent(
185185
// 0. By allow list.
186186
// 1. By schema name.
187187
// 2. By table name.
188-
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool {
188+
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool {
189189
if !isAllowedDDL(ddlType) {
190190
return true
191191
}
192192

193+
if f.shouldIgnoreStartTs(startTs) {
194+
return true
195+
}
196+
193197
if IsSchemaDDL(ddlType) {
194198
return f.ShouldIgnoreSchema(schema)
195199
}

pkg/filter/filter_test.go

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,12 @@ func TestShouldDiscardDDL(t *testing.T) {
292292
table string
293293
query string
294294
ddlType timodel.ActionType
295+
startTs uint64
295296
ignore bool
296297
}
297298
rules []string
298299
eventFilters []*config.EventFilterRule
300+
ignoredTs []uint64
299301
}{
300302
{
301303
// Discard by not allowed DDL type cases.
@@ -304,11 +306,12 @@ func TestShouldDiscardDDL(t *testing.T) {
304306
table string
305307
query string
306308
ddlType timodel.ActionType
309+
startTs uint64
307310
ignore bool
308311
}{
309-
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
310-
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
311-
{"test", "", "create database test", timodel.ActionCreateSequence, true},
312+
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
313+
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
314+
{"test", "", "create database test", timodel.ActionCreateSequence, 0, true},
312315
},
313316
rules: []string{"*.*"},
314317
},
@@ -319,18 +322,19 @@ func TestShouldDiscardDDL(t *testing.T) {
319322
table string
320323
query string
321324
ddlType timodel.ActionType
325+
startTs uint64
322326
ignore bool
323327
}{
324-
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
325-
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
328+
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
329+
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
326330
{
327331
"sns", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
328-
timodel.ActionModifySchemaCharsetAndCollate, false,
332+
timodel.ActionModifySchemaCharsetAndCollate, 0, false,
329333
},
330-
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
331-
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
332-
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
333-
{"test", "", "create database test", timodel.ActionCreateSchema, true},
334+
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
335+
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
336+
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
337+
{"test", "", "create database test", timodel.ActionCreateSchema, 0, true},
334338
},
335339
rules: []string{"sns.*", "ecom.*", "!sns.log", "!ecom.test"},
336340
},
@@ -341,32 +345,51 @@ func TestShouldDiscardDDL(t *testing.T) {
341345
table string
342346
query string
343347
ddlType timodel.ActionType
348+
startTs uint64
344349
ignore bool
345350
}{
346-
{"schema", "C1", "create database test", timodel.ActionCreateSchema, false},
347-
{"test", "", "drop database test1", timodel.ActionDropSchema, true},
351+
{"schema", "C1", "create database test", timodel.ActionCreateSchema, 0, false},
352+
{"test", "", "drop database test1", timodel.ActionDropSchema, 0, true},
348353
{
349354
"dbname", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
350-
timodel.ActionModifySchemaCharsetAndCollate, true,
355+
timodel.ActionModifySchemaCharsetAndCollate, 0, true,
351356
},
352-
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
353-
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
354-
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
357+
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
358+
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
359+
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
355360
},
356361
rules: []string{"schema.C1"},
357362
},
363+
{
364+
// Discard by startTs cases.
365+
cases: []struct {
366+
schema string
367+
table string
368+
query string
369+
ddlType timodel.ActionType
370+
startTs uint64
371+
ignore bool
372+
}{
373+
{"sns", "", "create database test", timodel.ActionCreateSchema, 1, false},
374+
{"sns", "", "drop database test", timodel.ActionDropSchema, 2, true},
375+
{"test", "", "create database test", timodel.ActionCreateSequence, 3, true},
376+
},
377+
rules: []string{"*.*"},
378+
ignoredTs: []uint64{2, 3},
379+
},
358380
}
359381

360382
for _, ftc := range testCases {
361383
filter, err := NewFilter(&config.ReplicaConfig{
362384
Filter: &config.FilterConfig{
363-
Rules: ftc.rules,
364-
EventFilters: ftc.eventFilters,
385+
Rules: ftc.rules,
386+
EventFilters: ftc.eventFilters,
387+
IgnoreTxnStartTs: ftc.ignoredTs,
365388
},
366389
}, "")
367390
require.Nil(t, err)
368391
for _, tc := range ftc.cases {
369-
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table)
392+
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table, tc.startTs)
370393
require.Equal(t, tc.ignore, ignore, "%#v", tc)
371394
}
372395
}

0 commit comments

Comments
 (0)