Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Error(err))
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS) {
return true, nil
}
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
Expand All @@ -378,11 +378,50 @@
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
<<<<<<< HEAD

Check failure on line 381 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected case or default or }
=======
case timodel.ActionCreateTables:

Check failure on line 383 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected case, expected :
querys, err := ddl.SplitQueries(job.Query)
if err != nil {
return false, errors.Trace(err)
}
// we only use multiTableInfos and Querys when we generate job event
// So if some table should be discard, we just need to delete the info from multiTableInfos and Querys
if len(querys) != len(job.BinlogInfo.MultipleTableInfos) {
log.Error("the number of queries in `Job.Query` is not equal to "+
"the number of `TableInfo` in `Job.BinlogInfo.MultipleTableInfos`",
zap.Int("numQueries", len(querys)),
zap.Int("numTableInfos", len(job.BinlogInfo.MultipleTableInfos)),
zap.String("Job.Query", job.Query),
zap.Any("Job.BinlogInfo.MultipleTableInfos", job.BinlogInfo.MultipleTableInfos),
zap.Error(cerror.ErrTiDBUnexpectedJobMeta.GenWithStackByArgs()))
return false, cerror.ErrTiDBUnexpectedJobMeta.GenWithStackByArgs()
}

var newMultiTableInfos []*timodel.TableInfo
var newQuerys []string

multiTableInfos := job.BinlogInfo.MultipleTableInfos

for index, tableInfo := range multiTableInfos {
// judge each table whether need to be skip
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, tableInfo.Name.O, job.StartTS) {
continue
}
newMultiTableInfos = append(newMultiTableInfos, multiTableInfos[index])
newQuerys = append(newQuerys, querys[index])
}

skip = len(newMultiTableInfos) == 0

job.BinlogInfo.MultipleTableInfos = newMultiTableInfos
job.Query = strings.Join(newQuerys, "")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Joining queries without a separator might make the reconstructed query string hard to read in logs and error messages. Consider joining with a semicolon to improve readability.

job.Query = strings.Join(newQuerys, ";")

>>>>>>> db43be26bf (puller: Support discarding unsupported DDL by setting `ignore-txn-start-ts` in filter. (#12287))

Check failure on line 419 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 419 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expected case or default or }
Comment on lines +381 to +419

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This block contains unresolved merge conflict markers. Please resolve them before merging.

case timodel.ActionRenameTable:
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
if !discard {
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
Expand All @@ -398,8 +437,8 @@
}
// since we can find the old table, it must be able to find the old schema.
// 2. If we can find the preTableInfo, we filter it by the old table name.
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.StartTS)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
if err != nil {
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
Expand All @@ -424,7 +463,7 @@
if job.BinlogInfo.TableInfo != nil {
job.TableName = job.BinlogInfo.TableInfo.Name.O
}
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS)
}

if skip {
Expand Down Expand Up @@ -544,15 +583,23 @@
if !ok {
shouldDiscardOldTable = true
} else {
<<<<<<< HEAD

Check failure on line 586 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected }
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, oldTable.Name.O)
=======

Check failure on line 588 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expected }
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, info.OldSchemaName.O, oldTable.Name.O, job.StartTS)
>>>>>>> db43be26bf (puller: Support discarding unsupported DDL by setting `ignore-txn-start-ts` in filter. (#12287))

Check failure on line 590 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 590 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expected }
Comment on lines +586 to +590

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This block contains unresolved merge conflict markers. Please resolve them. Additionally, the variable info used in one of the branches is not defined in this scope, which will cause a compilation error. It seems oldSchemaNames[i] should be used instead.

}

newSchemaName, ok := snap.SchemaByID(newSchemaIDs[i])
if !ok {
// the new table name does not hit the filter rule, so we should discard the table.
shouldDiscardNewTable = true
} else {
<<<<<<< HEAD
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, newTableNames[i].O)
=======
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, info.NewTableName.O, job.StartTS)
>>>>>>> db43be26bf (puller: Support discarding unsupported DDL by setting `ignore-txn-start-ts` in filter. (#12287))

Check failure on line 602 in cdc/puller/ddl_puller.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'
Comment on lines +598 to +602

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This block contains unresolved merge conflict markers. Please resolve them. Additionally, the variable info used in one of the branches is not defined in this scope, which will cause a compilation error. It seems newTableNames[i] should be used instead.

}

if shouldDiscardOldTable && shouldDiscardNewTable {
Expand Down
49 changes: 48 additions & 1 deletion cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func TestHandleJob(t *testing.T) {
job.StartTS = 1
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.False(t, skip)
require.True(t, skip)

job = helper.DDL2Job("create table test1.t2(id int primary key)")
skip, err = ddlJobPullerImpl.handleJob(job)
Expand Down Expand Up @@ -836,3 +836,50 @@ func TestCheckIneligibleTableDDL(t *testing.T) {
require.NoError(t, err)
require.False(t, skip)
}

func TestHandleExchangeTableName(t *testing.T) {
ddlJobPuller, helper := newMockDDLJobPuller(t, true)
defer helper.Close()

startTs := uint64(10)
ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl)
ddlJobPullerImpl.setResolvedTs(startTs)

cfg := config.GetDefaultReplicaConfig()
f, err := filter.NewFilter(cfg, "")
require.NoError(t, err)
ddlJobPullerImpl.filter = f

ddl := helper.DDL2Job("CREATE DATABASE test1;")
skip, err := ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

helper.Tk().MustExec("Use test1;")

ddl = helper.DDL2Job("CREATE TABLE a (id INT PRIMARY KEY);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

ddl = helper.DDL2Job("CREATE TABLE b (id INT PRIMARY KEY);")
skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.False(t, skip)

ddl = helper.DDL2Job("rename table a to c, b to a, c to b")
_, err = ddlJobPullerImpl.handleJob(ddl)
// Should return an error
require.Error(t, err)
require.Contains(t, err.Error(), "ignore-txn-start-ts")

// Add ddl.StartTs to ignore-txn-start-ts, expect no error.
cfg.Filter.IgnoreTxnStartTs = []uint64{ddl.StartTS}
adjustFilter, err := filter.NewFilter(cfg, "")
require.NoError(t, err)
ddlJobPullerImpl.filter = adjustFilter

skip, err = ddlJobPullerImpl.handleJob(ddl)
require.NoError(t, err)
require.True(t, skip)
}
2 changes: 1 addition & 1 deletion cmd/filter-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func runFilter(cmd *cobra.Command, args []string) {
fmt.Printf("Table: %s, Not matched filter rule\n", table)
case "ddl":
ddlType := timodel.ActionCreateTable
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1])
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1], 0)
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
return
Expand Down
8 changes: 6 additions & 2 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type Filter interface {
// ShouldDiscardDDL returns true if this DDL should be discarded.
// If a ddl is discarded, it will neither be applied to cdc's schema storage
// nor sent to downstream.
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool
ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool
// ShouldIgnoreTable returns true if the table should be ignored.
ShouldIgnoreTable(schema, table string) bool
// ShouldIgnoreSchema returns true if the schema should be ignored.
Expand Down Expand Up @@ -180,11 +180,15 @@ func (f *filter) ShouldIgnoreDMLEvent(
// 0. By allow list.
// 1. By schema name.
// 2. By table name.
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool {
func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, startTs uint64) bool {
if !isAllowedDDL(ddlType) {
return true
}

if f.shouldIgnoreStartTs(startTs) {
return true
}

if IsSchemaDDL(ddlType) {
return f.ShouldIgnoreSchema(schema)
}
Expand Down
61 changes: 42 additions & 19 deletions pkg/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,12 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}
rules []string
eventFilters []*config.EventFilterRule
ignoredTs []uint64
}{
{
// Discard by not allowed DDL type cases.
Expand All @@ -302,11 +304,12 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
{"test", "", "create database test", timodel.ActionCreateSequence, true},
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
{"test", "", "create database test", timodel.ActionCreateSequence, 0, true},
},
rules: []string{"*.*"},
},
Expand All @@ -317,18 +320,19 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"sns", "", "create database test", timodel.ActionCreateSchema, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, false},
{"sns", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, 0, false},
{
"sns", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
timodel.ActionModifySchemaCharsetAndCollate, false,
timodel.ActionModifySchemaCharsetAndCollate, 0, false,
},
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
{"ecom", "", "create database test", timodel.ActionCreateSchema, false},
{"test", "", "create database test", timodel.ActionCreateSchema, true},
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"ecom", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
{"ecom", "", "create database test", timodel.ActionCreateSchema, 0, false},
{"test", "", "create database test", timodel.ActionCreateSchema, 0, true},
},
rules: []string{"sns.*", "ecom.*", "!sns.log", "!ecom.test"},
},
Expand All @@ -339,32 +343,51 @@ func TestShouldDiscardDDL(t *testing.T) {
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"schema", "C1", "create database test", timodel.ActionCreateSchema, false},
{"test", "", "drop database test1", timodel.ActionDropSchema, true},
{"schema", "C1", "create database test", timodel.ActionCreateSchema, 0, false},
{"test", "", "drop database test1", timodel.ActionDropSchema, 0, true},
{
"dbname", "", "ALTER DATABASE dbname CHARACTER SET utf8 COLLATE utf8_general_ci",
timodel.ActionModifySchemaCharsetAndCollate, true,
timodel.ActionModifySchemaCharsetAndCollate, 0, true,
},
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, false},
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, true},
{"test", "aa", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
{"schema", "C1", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, false},
{"schema", "", "create table test.t1(a int primary key)", timodel.ActionCreateTable, 0, true},
},
rules: []string{"schema.C1"},
},
{
// Discard by startTs cases.
cases: []struct {
schema string
table string
query string
ddlType timodel.ActionType
startTs uint64
ignore bool
}{
{"sns", "", "create database test", timodel.ActionCreateSchema, 1, false},
{"sns", "", "drop database test", timodel.ActionDropSchema, 2, true},
{"test", "", "create database test", timodel.ActionCreateSequence, 3, true},
},
rules: []string{"*.*"},
ignoredTs: []uint64{2, 3},
},
}

for _, ftc := range testCases {
filter, err := NewFilter(&config.ReplicaConfig{
Filter: &config.FilterConfig{
Rules: ftc.rules,
EventFilters: ftc.eventFilters,
Rules: ftc.rules,
EventFilters: ftc.eventFilters,
IgnoreTxnStartTs: ftc.ignoredTs,
},
}, "")
require.Nil(t, err)
for _, tc := range ftc.cases {
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table)
ignore := filter.ShouldDiscardDDL(tc.ddlType, tc.schema, tc.table, tc.startTs)
require.Equal(t, tc.ignore, ignore, "%#v", tc)
}
}
Expand Down
Loading