Skip to content
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c65a41c
filter: refactor filter and add event filter functions
wlwilliamx Aug 4, 2025
d7b55c2
add unit tests for filter
wlwilliamx Aug 6, 2025
77bbc27
add unit tests for sqlEventFilter
wlwilliamx Aug 6, 2025
46d83fb
fix the missing error return in NewBinlogEvent
wlwilliamx Aug 6, 2025
64b2ad5
modify comments
wlwilliamx Aug 6, 2025
e07602a
fix wrong ignore when there is no expr config
wlwilliamx Aug 6, 2025
9f15914
add unit tests for dmlExprFilter
wlwilliamx Aug 6, 2025
ce9c25e
modify filter-helper to the new filter in ticdc repo
wlwilliamx Aug 6, 2025
a93d58c
add event_filter ci
wlwilliamx Aug 6, 2025
427ab3f
Merge remote-tracking branch 'upstream/master' into feat/add-event-fi…
wlwilliamx Aug 6, 2025
278a010
merge upstream/master
wlwilliamx Aug 6, 2025
03f1c3c
fix typo when merge upstream/master
wlwilliamx Aug 6, 2025
d759f3b
modify by make check
wlwilliamx Aug 6, 2025
a82afbe
remove a todo comment
wlwilliamx Aug 6, 2025
24db0e6
remove some useless code
wlwilliamx Aug 6, 2025
aee5ec6
fix TestDMLEventBasicEncodeAndDecode panic
wlwilliamx Aug 6, 2025
1fb742a
fix the filter logic in buildDDLEventCommon to pass TestApplyDDLJobs
wlwilliamx Aug 7, 2025
73535f1
Merge remote-tracking branch 'upstream/master' into feat/add-event-fi…
wlwilliamx Aug 19, 2025
18cfa68
resolve conflicts
wlwilliamx Aug 19, 2025
b89a0d6
Update pkg/filter/filter.go
wlwilliamx Aug 19, 2025
9f18a90
rename ti to tableInfo
wlwilliamx Aug 19, 2025
208d3db
Merge remote-tracking branch 'origin/feat/add-event-filter' into feat…
wlwilliamx Aug 19, 2025
409096d
add a comment
wlwilliamx Aug 19, 2025
176fb7b
larger generateBatchSQL performance requirement to ensure UT pass
wlwilliamx Aug 19, 2025
dfe93f0
remove a useless code
wlwilliamx Aug 19, 2025
379a245
add info level comments when ignored by filter
wlwilliamx Aug 19, 2025
af2eea4
fix the new truncated table is not synced
wlwilliamx Aug 20, 2025
206538b
ci: add a sleep to ensure check pass
wlwilliamx Aug 20, 2025
4bc2885
add tiflow#11956 to improve event_filter ci
wlwilliamx Aug 20, 2025
aacec70
fix some corner cases that the DDL should be ignored but DML should not
wlwilliamx Aug 21, 2025
8e23ca2
ci: add some corner cases that the DDL should be ignored but DML shou…
wlwilliamx Aug 21, 2025
e12bd8b
ci: add some check_table_exists in event_filter
wlwilliamx Aug 21, 2025
27c8262
Merge remote-tracking branch 'upstream/master' into feat/add-event-fi…
wlwilliamx Aug 21, 2025
6aca939
rename a variable
wlwilliamx Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 5 additions & 28 deletions cmd/filter-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ import (
"fmt"
"strings"

"github.com/pingcap/ticdc/cmd/util"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -49,13 +47,7 @@ func runFilter(cmd *cobra.Command, args []string) {
// fmt.Printf("Filter Rules: %v\n", filterRules)
// fmt.Printf("Schema Name: %s\n", schemaName)
// fmt.Printf("Table Name: %s\n", tableName)
cfg := &config.ReplicaConfig{}
err := util.StrictDecodeFile(cfgPath, "cdc filter helper", cfg)
if err != nil {
fmt.Printf("decode config file error: %v\n", err)
return
}
ft, err := filter.NewFilter(cfg, "")
ft, err := filter.NewFilter(config.NewDefaultFilterConfig(), "UTC", false, false)
if err != nil {
fmt.Printf("filter create error: %v\n", err)
return
Expand All @@ -73,30 +65,15 @@ func runFilter(cmd *cobra.Command, args []string) {

switch target {
case "table":
matched := !ft.ShouldIgnoreTable(tableAndSchema[0], tableAndSchema[1])
matched := !ft.ShouldIgnoreTable(tableAndSchema[0], tableAndSchema[1], nil)
if matched {
fmt.Printf("Table: %s, Matched filter rule\n", table)
return
}
fmt.Printf("Table: %s, Not matched filter rule\n", table)
case "ddl":
ddlType := timodel.ActionCreateTable
discard := ft.ShouldDiscardDDL(ddlType, tableAndSchema[0], tableAndSchema[1])
if discard {
fmt.Printf("DDL: %s, should be discard by event filter rule\n", ddl)
return
}
ignored, err := ft.ShouldIgnoreDDLEvent(&model.DDLEvent{
StartTs: uint64(0),
Query: ddl,
Type: ddlType,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: tableAndSchema[0],
Table: tableAndSchema[1],
},
},
})
ignored, err := ft.ShouldIgnoreDDL(tableAndSchema[0], tableAndSchema[1], ddl, ddlType, nil)
if err != nil {
fmt.Printf("filter ddl error: %s, error: %v\n", ddl, err)
return
Expand Down
8 changes: 8 additions & 0 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ func (d *BasicDispatcher) AddDMLEventsToSink(events []*commonEvent.DMLEvent) {
}

func (d *BasicDispatcher) AddBlockEventToSink(event commonEvent.BlockEvent) error {
if event.GetType() == commonEvent.TypeDDLEvent {
ddl := event.(*commonEvent.DDLEvent)
if ddl.NotSync {
log.Info("ignore DDL by NotSync", zap.Stringer("dispatcher", d.id), zap.Any("ddl", ddl))
d.PassBlockEventToSink(event)
return nil
}
}
d.tableProgress.Add(event)
return d.sink.WriteBlockEvent(event)
}
Expand Down
1 change: 1 addition & 0 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func loadAllPhysicalTablesAtTs(
log.Panic("table info not found", zap.Int64("tableID", tableID))
}
if tableFilter != nil && tableFilter.ShouldIgnoreTable(schemaName, tableInfo.Name, fullTableInfo) {
log.Info("ignore table by filter", zap.String("schema", schemaName), zap.String("table", tableInfo.Name), zap.Any("tableInfo", fullTableInfo))
continue
}
splitable := isSplitable(fullTableInfo)
Expand Down
12 changes: 9 additions & 3 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ func (p *persistentStorage) fetchTableDDLEvents(dispatcherID common.DispatcherID
events := make([]commonEvent.DDLEvent, 0, len(allTargetTs))
for _, ts := range allTargetTs {
rawEvent := readPersistedDDLEvent(storageSnap, ts)
ddlEvent, ok := buildDDLEvent(&rawEvent, tableFilter)
ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter)
if err != nil {
return nil, errors.Trace(err)
}
if ok {
events = append(events, ddlEvent)
} else {
Expand Down Expand Up @@ -478,7 +481,10 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter
p.mu.RUnlock()
for _, ts := range allTargetTs {
rawEvent := readPersistedDDLEvent(storageSnap, ts)
ddlEvent, ok := buildDDLEvent(&rawEvent, tableFilter)
ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter)
if err != nil {
return nil, errors.Trace(err)
}
if ok {
events = append(events, ddlEvent)
}
Expand Down Expand Up @@ -818,7 +824,7 @@ func shouldSkipDDL(job *model.Job, tableMap map[int64]*BasicTableInfo) bool {
return false
}

func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) (commonEvent.DDLEvent, bool) {
func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) (commonEvent.DDLEvent, bool, error) {
handler, ok := allDDLHandlers[model.ActionType(rawEvent.Type)]
if !ok {
log.Panic("unknown ddl type", zap.Any("ddlType", rawEvent.Type), zap.String("query", rawEvent.Query))
Expand Down
Loading
Loading