Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Error(err))
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS) {
return true, nil
}
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
Expand Down Expand Up @@ -403,7 +403,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {

for index, tableInfo := range multiTableInfos {
// judge each table whether need to be skip
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, tableInfo.Name.O) {
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, tableInfo.Name.O, job.StartTS) {
continue
}
newMultiTableInfos = append(newMultiTableInfos, multiTableInfos[index])
Expand All @@ -418,7 +418,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
if !discard {
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
Expand All @@ -434,8 +434,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
}
// since we can find the old table, it must be able to find the old schema.
// 2. If we can find the preTableInfo, we filter it by the old table name.
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.StartTS)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
if err != nil {
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
Expand All @@ -460,7 +460,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
if job.BinlogInfo.TableInfo != nil {
job.TableName = job.BinlogInfo.TableInfo.Name.O
}
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS)
}

if skip {
Expand Down Expand Up @@ -569,15 +569,15 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
if !ok {
shouldDiscardOldTable = true
} else {
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, info.OldSchemaName.O, oldTable.Name.O)
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, info.OldSchemaName.O, oldTable.Name.O, job.StartTS)
}

newSchemaName, ok := snap.SchemaByID(info.NewSchemaID)
if !ok {
// the new table name does not hit the filter rule, so we should discard the table.
shouldDiscardNewTable = true
} else {
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, info.NewTableName.O)
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, info.NewTableName.O, job.StartTS)
}

if shouldDiscardOldTable && shouldDiscardNewTable {
Expand Down
47 changes: 47 additions & 0 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,3 +820,50 @@ func TestCheckIneligibleTableDDL(t *testing.T) {
require.NoError(t, err)
require.False(t, skip)
}

func TestHandleExchangeTableName(t *testing.T) {
ddlJobPuller, helper := newMockDDLJobPuller(t, true)
defer helper.Close()

startTs := uint64(10)
ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl)
ddlJobPullerImpl.setResolvedTs(startTs)

cfg := config.GetDefaultReplicaConfig()
f, err := filter.NewFilter(cfg, "")
require.NoError(t, err)
ddlJobPullerImpl.filter = f

ddl := helper.DDL2Job("CREATE DATABASE test1;")
skip, err := ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

helper.Tk().MustExec("Use test1;")

ddl = helper.DDL2Job("CREATE TABLE a (id INT PRIMARY KEY);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

ddl = helper.DDL2Job("CREATE TABLE b (id INT PRIMARY KEY);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

ddl = helper.DDL2Job("rename table a to c, b to a, c to b")
_, err = ddlJobPullerImpl.handleJob(ddl)
// Should return an error
require.Error(t, err)
require.Contains(t, err.Error(), "ignore-txn-start-ts")

// Add ddl.StartTs to ignore-txn-start-ts, expect no error.
cfg.Filter.IgnoreTxnStartTs = []uint64{ddl.StartTS}
adjustFilter, err := filter.NewFilter(cfg, "")
require.NoError(t, err)
ddlJobPullerImpl.filter = adjustFilter

skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.True(t, skip)
}
2 changes: 1 addition & 1 deletion cmd/filter-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func runFilter(cmd *cobra.Command, args []string) {
fmt.Printf("Table: %s, Not matched filter rule\n", table)
case "ddl":
ddlType := timodel.ActionCreateTable
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1])
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1], 0)
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
return
Expand Down
8 changes: 6 additions & 2 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type Filter interface {
// ShouldDiscardDDL returns true if this DDL should be discarded.
// If a ddl is discarded, it will neither be applied to cdc's schema storage
// nor sent to downstream.
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool
// ShouldIgnoreTable returns true if the table should be ignored.
ShouldIgnoreTable(schema, table string) bool
// ShouldIgnoreSchema returns true if the schema should be ignored.
Expand Down Expand Up @@ -185,11 +185,15 @@ func (f *filter) ShouldIgnoreDMLEvent(
// 0. By allow list.
// 1. By schema name.
// 2. By table name.
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool {
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool {
if !isAllowedDDL(ddlType) {
return true
}

if f.shouldIgnoreStartTs(startTs) {
return true
}

if IsSchemaDDL(ddlType) {
return f.ShouldIgnoreSchema(schema)
}
Expand Down
61 changes: 42 additions & 19 deletions pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,12 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}
rules []string
eventFilters []*config.EventFilterRule
ignoredTs []uint64
}{
{
// Discard by not allowed DDL type cases.
Expand All @@ -304,11 +306,12 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
{"test", "", "create database test", timodel.ActionCreateSequence, true},
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
{"test", "", "create database test", timodel.ActionCreateSequence, 0, true},
},
rules: []string{"*.*"},
},
Expand All @@ -319,18 +322,19 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
{
"sns", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
timodel.ActionModifySchemaCharsetAndCollate, false,
timodel.ActionModifySchemaCharsetAndCollate, 0, false,
},
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
{"test", "", "create database test", timodel.ActionCreateSchema, true},
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"test", "", "create database test", timodel.ActionCreateSchema, 0, true},
},
rules: []string{"sns.*", "ecom.*", "!sns.log", "!ecom.test"},
},
Expand All @@ -341,32 +345,51 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"schema", "C1", "create database test", timodel.ActionCreateSchema, false},
{"test", "", "drop database test1", timodel.ActionDropSchema, true},
{"schema", "C1", "create database test", timodel.ActionCreateSchema, 0, false},
{"test", "", "drop database test1", timodel.ActionDropSchema, 0, true},
{
"dbname", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
timodel.ActionModifySchemaCharsetAndCollate, true,
timodel.ActionModifySchemaCharsetAndCollate, 0, true,
},
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
},
rules: []string{"schema.C1"},
},
{
// Discard by startTs cases.
cases: []struct {
schema string
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"sns", "", "create database test", timodel.ActionCreateSchema, 1, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, 2, true},
{"test", "", "create database test", timodel.ActionCreateSequence, 3, true},
},
rules: []string{"*.*"},
ignoredTs: []uint64{2, 3},
},
}

for _, ftc := range testCases {
filter, err := NewFilter(&config.ReplicaConfig{
Filter: &config.FilterConfig{
Rules: ftc.rules,
EventFilters: ftc.eventFilters,
Rules: ftc.rules,
EventFilters: ftc.eventFilters,
IgnoreTxnStartTs: ftc.ignoredTs,
},
}, "")
require.Nil(t, err)
for _, tc := range ftc.cases {
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table)
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table, tc.startTs)
require.Equal(t, tc.ignore, ignore, "%#v", tc)
}
}
Expand Down
Loading