-
Notifications
You must be signed in to change notification settings - Fork 26
feat(filter): Implement a Efficient DML/DDL Event Filtering Mechanism #1612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly refactors the event filtering mechanism, moving the filtering logic earlier in the data processing pipeline to improve efficiency. It introduces a new, streamlined Filter
interface that operates on lower-level data primitives, enabling a "filter-while-building" model for DML events. This enhancement provides more precise control over data replication by discarding unwanted events at their source, reducing computational and memory overhead.
Highlights
- Core Filter Interface Refactoring: The central
Filter
interface has been fundamentally redesigned.ShouldIgnoreDMLEvent
is replaced byShouldIgnoreDML
which operates onchunk.Row
andcommon.RowType
, andShouldIgnoreDDLEvent
/ShouldDiscardDDL
are merged into a singleShouldIgnoreDDL
method. This change enables earlier and more efficient filtering. - Early DML Filtering Integration: The
DMLEvent.AppendRow
method now accepts afilter.Filter
instance, allowing DML events to be filtered immediately upon row appending. If a row is filtered out, it's discarded before full decoding and batching, significantly optimizing resource usage by implementing a 'filter-while-building' model. - Enhanced Expression-Based Filtering: The
dmlExprFilter
now uses table IDs and versions for caching table schemas, making it more robust against table renames and ensuring accurate filtering rules even when table metadata changes. - Streamlined SQL Event Filtering: The
sqlEventFilter
has been adapted to the newFilter
interface, accepting more primitive arguments for DDL and DML events. This reduces coupling between components and simplifies the filtering API. - Deep Integration into Event Processing Pipeline: The filtering logic is now deeply integrated into the
eventScanner
anddmlProcessor
components. This ensures that filtering occurs at the earliest possible stage of data retrieval and event assembly, maximizing efficiency. - Code Structure Improvement: The
RowType
enum, representing DML event types, has been moved fromdml_event.go
to a more genericpkg/common/types.go
package. This centralizes common types and resolves potential circular dependencies. - Comprehensive Testing: New dedicated unit tests have been added for the updated filtering logic (
pkg/filter/expr_filter_test.go
,pkg/filter/sql_event_filter_test.go
), and a large number of existing tests across the codebase have been updated to align with the new function signatures and filtering behavior.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is a significant and well-executed refactoring of the event filtering mechanism. Moving the filtering logic to an earlier stage in the event processing pipeline is a solid approach for improving performance. The new Filter
interface, operating on chunk.Row
, is a key enabler for this and is cleanly implemented. The switch to using table IDs for caching in dmlExprFilter
is a great improvement for robustness. The code is well-structured, and the addition of comprehensive unit tests is commendable.
I've found a couple of potential issues, one in the filter-helper tool and a logic concern in DDL filtering, along with some suggestions for code cleanup.
/check-issue-triage-complete |
/CC @lidezhu @asddongmen |
Please fix all failed tests. |
Co-authored-by: dongmen <[email protected]>
…/add-event-filter
[LGTM Timeline notifier]Timeline:
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: flowbehappy The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/test pull-error-log-review |
What problem does this PR solve?
Issue Number: close #1220
What is changed and how it works?
This pull request introduces a robust and deeply integrated event filtering system. This feature allows users to define flexible rules to include or exclude events from the replication stream based on event type (e.g.,
INSERT
,CREATE TABLE
), SQL patterns for DDL queries, and powerful expression-based filtering on DML column values.The primary goal of this feature is to give users precise control over the data flow, enabling them to easily ignore changes from temporary tables, log tables, or specific business operations.
Core Architectural Design: Early Filtering
The core design principle of this refactoring is to move the filtering logic as early as possible into the event processing pipeline. Previous filtering methods often operated on fully constructed and materialized event objects, which incurred unnecessary computational and memory costs.
This new design refactors the core
Filter
interface to operate directly on lower-level primitives likechunk.Row
instead of high-levelmodel.RowChangedEvent
objects. The filtering logic is injected directly into the row appending process ofDMLEvent
(AppendRow
method), achieving a highly efficient "filter-while-building" model. This means that if a row is filtered out by a rule, it is discarded immediately and is never decoded or added to the final event batch, eliminating wasted resources at the source.Detailed Breakdown of Key Changes
1. Core
Filter
Interface Refactoring (pkg/filter/filter.go
)To enable the "Early Filtering" design, the central
Filter
interface was fundamentally refactored:DML Filtering Interface Change:
ShouldIgnoreDMLEvent(dml *model.RowChangedEvent, rawRow model.RowChangedDatums, tableInfo *model.TableInfo)
ShouldIgnoreDML(dmlType common.RowType, preRow, row chunk.Row, ti *common.TableInfo)
model.RowChangedEvent
. Instead, it operates directly on the underlying row data primitive,chunk.Row
. This is the key change that allows the filtering logic to be applied at a much earlier stage of event construction, leading to significant performance gains.DDL Filtering Interface Consolidation:
ShouldDiscardDDL
andShouldIgnoreDDLEvent
.ShouldIgnoreDDL(schema, table, query string, ddlType timodel.ActionType, tableInfo *timodel.TableInfo)
.2. Deep Integration of DML Filtering into Event Construction (
pkg/common/event/dml_event.go
)DMLEvent.AppendRow
method was modified to accept afilter.Filter
instance.filter
is now used to immediately evaluate the row. Iffilter.ShouldIgnoreDML
returnstrue
, the row is discarded instantly, and subsequent logic for appending and counting is skipped.3. Expression-Based Filter (
dmlExprFilter
) Enhancements (pkg/filter/expr_filter.go
)The component responsible for filtering DML by column values has received significant upgrades:
tables
map, used for caching table schemas, has been changed. Its key is now theint64
table ID instead of thestring
table name, and its value is theuint64
table version (UpdateTS
) instead of the fullTableInfo
object.RENAME TABLE
operations, while table IDs are immutable. This ensures that filtering rules continue to apply correctly even after a table is renamed.shouldSkipDML
method signature was updated to align with the newFilter
interface, acceptingchunk.Row
as input to filter on the raw data directly.4. SQL Event Filter (
sqlEventFilter
) Adaptation (pkg/filter/sql_event_filter.go
)This filter handles rules based on event type (e.g.,
INSERT
,CREATE TABLE
) and SQL regex. It has been updated to support the new architecture:shouldSkipDDL
andshouldSkipDML
have been updated to no longer accept the fullmodel.DDLEvent
ormodel.RowChangedEvent
structs.Filter
interface. By accepting more primitive arguments (like schema, table name, DDL type), it reduces coupling between components.5.
eventservice
Integration (pkg/eventservice/event_scanner.go
)To ensure the filter is active at the earliest point of data retrieval, the changes were propagated throughout the
eventservice
:eventScanner
, when processing raw KV events, now passes thefilter
object down to thedmlProcessor
.dmlProcessor
, in turn, uses thisfilter
when processing new transactions (processNewTransaction
) and appending rows (appendRow
).eventScanner
reads raw KV data and starts assembling DML events, maximizing efficiency.6. Code Structure and Type Centralization (
pkg/common/types.go
)RowType
enum (RowTypeInsert
,RowTypeUpdate
,RowTypeDelete
) was moved fromdml_event.go
to the more genericpkg/common/types.go
package.RowType
is a fundamental type used across multiple modules (filter, sink, event). Elevating it to a common package prevents circular dependencies and improves the overall code structure.7. Comprehensive Unit Testing
pkg/filter/expr_filter_test.go
andpkg/filter/sql_event_filter_test.go
.filter_test.go
,event_scanner_test.go
, sink tests) were updated to align with the new function signatures and filtering logic.Check List
Tests
Add
event_filter
to the light group of mysql, kafka, pulsar and storage.Deployed upstream and downstream clusters locally and tested various functions of the filter.
Questions
Will it cause performance regression or break compatibility?
No.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note