From 85c3af85dbba066fdb07b7a999e4d66771a12e41 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 20:24:08 +0800 Subject: [PATCH 01/14] alpha version --- cdc/model/mounter.go | 43 +++++++++++++++++++++++++-- cdc/model/schema_storage.go | 17 +++++------ pkg/filter/expr_filter.go | 59 ++++++++++++++++++++++++++++++++----- 3 files changed, 101 insertions(+), 18 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index df4c47a48fc..1b6c7fd9767 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -16,6 +16,7 @@ package model import ( "context" "math" + "sort" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/types" @@ -24,8 +25,46 @@ import ( // RowChangedDatums is used to store the changed datums of a row. type RowChangedDatums struct { - RowDatums []types.Datum - PreRowDatums []types.Datum + RowDatums []types.Datum // datums without virtual columns + PreRowDatums []types.Datum // pre datums without virtual columns +} + +func (r *RowChangedDatums) getDatumsWithVirtualCols(datums []types.Datum, virtualColsOffset []int) []types.Datum { + if len(virtualColsOffset) == 0 { + return datums + } + sort.Ints(virtualColsOffset) + + maxAllowedIndex := len(datums) + len(virtualColsOffset) + for _, idx := range virtualColsOffset { + if idx < 0 || idx >= maxAllowedIndex { + log.Panic("invalid virtual column index", + zap.Int("index", idx), + zap.Int("maxAllowedIndex", maxAllowedIndex-1)) + } + } + + result := make([]types.Datum, 0, maxAllowedIndex) + originalIdx := 0 + virtualIdx := 0 + for originalIdx < len(datums) || virtualIdx < len(virtualColsOffset) { + if virtualIdx < len(virtualColsOffset) && virtualColsOffset[virtualIdx] == len(result) { + result = append(result, types.Datum{}) + virtualIdx++ + } else if originalIdx < len(datums) { + result = append(result, datums[originalIdx]) + originalIdx++ + } + } + return result +} + +func (r *RowChangedDatums) GetRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { + return r.getDatumsWithVirtualCols(r.RowDatums, virtualColsOffset) +} + +func (r *RowChangedDatums) GetPreRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { + return r.getDatumsWithVirtualCols(r.PreRowDatums, virtualColsOffset) } // IsEmpty returns true if the RowChangeDatums is empty. diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 48a3364a45b..227fc004296 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -90,8 +90,8 @@ type TableInfo struct { // only for new row format decoder handleColID []int64 - // number of virtual columns - virtualColumnCount int + // offset of virtual columns in TableInfo.Columns + VirtualColumnsOffset []int // rowColInfosWithoutVirtualCols is the same as rowColInfos, but without virtual columns rowColInfosWithoutVirtualCols *[]rowcodec.ColInfo } @@ -121,7 +121,6 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode rowColumnsCurrentOffset := 0 - ti.virtualColumnCount = 0 for i, col := range ti.Columns { ti.columnsOffset[col.ID] = i pkIsHandle := false @@ -146,7 +145,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode } } } else { - ti.virtualColumnCount += 1 + ti.VirtualColumnsOffset = append(ti.VirtualColumnsOffset, i) } ti.rowColInfos[i] = rowcodec.ColInfo{ ID: col.ID, @@ -182,21 +181,21 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode } func (ti *TableInfo) initRowColInfosWithoutVirtualCols() { - if ti.virtualColumnCount == 0 { + if len(ti.VirtualColumnsOffset) == 0 { ti.rowColInfosWithoutVirtualCols = &ti.rowColInfos return } - colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-ti.virtualColumnCount) + colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-len(ti.VirtualColumnsOffset)) for i, col := range ti.Columns { if IsColCDCVisible(col) { colInfos = append(colInfos, ti.rowColInfos[i]) } } - if len(colInfos) != len(ti.rowColInfos)-ti.virtualColumnCount { + if len(colInfos) != len(ti.rowColInfos)-len(ti.VirtualColumnsOffset) { log.Panic("invalid rowColInfosWithoutVirtualCols", zap.Int("len(colInfos)", len(colInfos)), zap.Int("len(ti.rowColInfos)", len(ti.rowColInfos)), - zap.Int("ti.virtualColumnCount", ti.virtualColumnCount)) + zap.Int("ti.len(ti.VirtualColumnsOffset)", len(ti.VirtualColumnsOffset))) } ti.rowColInfosWithoutVirtualCols = &colInfos } @@ -388,7 +387,7 @@ func (ti *TableInfo) HasUniqueColumn() bool { // HasVirtualColumns returns whether the table has virtual columns func (ti *TableInfo) HasVirtualColumns() bool { - return ti.virtualColumnCount > 0 + return len(ti.VirtualColumnsOffset) > 0 } // IsEligible returns whether the table is a eligible table diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index 5ff6a58b327..b2c67ce5531 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -21,7 +21,9 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" @@ -274,8 +276,9 @@ func (r *dmlExprFilterRule) shouldSkipDML( return false, err } return r.skipDMLByExpression( - rawRow.RowDatums, + rawRow.GetRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), exprs, + ti, ) case row.IsUpdate(): oldExprs, err := r.getUpdateOldExpr(ti) @@ -287,15 +290,17 @@ func (r *dmlExprFilterRule) shouldSkipDML( return false, err } ignoreOld, err := r.skipDMLByExpression( - rawRow.PreRowDatums, + rawRow.GetPreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), oldExprs, + ti, ) if err != nil { return false, err } ignoreNew, err := r.skipDMLByExpression( - rawRow.RowDatums, + rawRow.GetRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), newExprs, + ti, ) if err != nil { return false, err @@ -307,8 +312,9 @@ func (r *dmlExprFilterRule) shouldSkipDML( return false, err } return r.skipDMLByExpression( - rawRow.PreRowDatums, + rawRow.GetPreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), exprs, + ti, ) default: log.Warn("unknown row changed event type") @@ -316,16 +322,55 @@ func (r *dmlExprFilterRule) shouldSkipDML( } } +func (r *dmlExprFilterRule) buildRowWithVirtualColumns( + rowData []types.Datum, + tableInfo *model.TableInfo, +) (chunk.Row, error) { + row := chunk.MutRowFromDatums(rowData).ToRow() + if len(tableInfo.VirtualColumnsOffset) == 0 { + // If there is no virtual column, we can return the row directly. + return row, nil + } + + columns, _, err := expression.ColumnInfos2ColumnsAndNames(r.sessCtx.GetExprCtx(), + ast.CIStr{} /* unused */, tableInfo.Name, tableInfo.Columns, tableInfo.TableInfo) + if err != nil { + return chunk.Row{}, err + } + vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(r.sessCtx.GetExprCtx().GetEvalCtx(), columns) + ch := chunk.NewEmptyChunk(vColFts) + ch.AppendRow(row) + err = table.FillVirtualColumnValue(vColFts, vColOffsets, columns, tableInfo.Columns, r.sessCtx.GetExprCtx(), ch) + if err != nil { + return chunk.Row{}, err + } + return ch.GetRow(0), nil +} + +func collectVirtualColumnOffsetsAndTypes(ctx expression.EvalContext, cols []*expression.Column) ([]int, []*types.FieldType) { + var offsets []int + var fts []*types.FieldType + for i, col := range cols { + if col.VirtualExpr != nil { + offsets = append(offsets, i) + fts = append(fts, col.GetType(ctx)) + } + } + return offsets, fts +} + func (r *dmlExprFilterRule) skipDMLByExpression( rowData []types.Datum, expr expression.Expression, + tableInfo *model.TableInfo, ) (bool, error) { if len(rowData) == 0 || expr == nil { return false, nil } - - row := chunk.MutRowFromDatums(rowData).ToRow() - + row, err := r.buildRowWithVirtualColumns(rowData, tableInfo) + if err != nil { + return false, errors.Trace(err) + } d, err := expr.Eval(r.sessCtx.GetExprCtx().GetEvalCtx(), row) if err != nil { log.Error("failed to eval expression", zap.Error(err)) From 19ffef40bf9e2f48effb5fd95e6f157c41ad63c6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 21:22:50 +0800 Subject: [PATCH 02/14] fix tests --- dm/pkg/utils/common.go | 15 +- pkg/filter/expr_filter.go | 13 +- pkg/filter/expr_filter_test.go | 522 +++++++++++++++++---------------- 3 files changed, 297 insertions(+), 253 deletions(-) diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 1326097afef..225de8c08df 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -22,13 +22,14 @@ import ( "github.com/pingcap/tidb/pkg/expression/exprctx" "github.com/pingcap/tidb/pkg/expression/sessionexpr" infoschema "github.com/pingcap/tidb/pkg/infoschema/context" - "github.com/pingcap/tidb/pkg/meta/model" + timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/dbutil" "github.com/pingcap/tidb/pkg/util/filter" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/dm/pkg/log" "go.uber.org/zap" ) @@ -219,15 +220,21 @@ func NewSessionCtx(vars map[string]string) sessionctx.Context { } // AdjustBinaryProtocolForDatum converts the data in binlog to TiDB datum. -func AdjustBinaryProtocolForDatum(ctx sessionctx.Context, data []interface{}, cols []*model.ColumnInfo) ([]types.Datum, error) { +func AdjustBinaryProtocolForDatum(ctx sessionctx.Context, data []interface{}, cols []*timodel.ColumnInfo) ([]types.Datum, error) { ret := make([]types.Datum, 0, len(data)) - for i, d := range data { + colIndex := 0 + for _, d := range data { datum := types.NewDatum(d) - castDatum, err := table.CastValue(ctx, datum, cols[i], false, false) + // fix the next not virtual column + for !model.IsColCDCVisible(cols[colIndex]) { + colIndex++ + } + castDatum, err := table.CastValue(ctx, datum, cols[colIndex], false, false) if err != nil { return nil, err } ret = append(ret, castDatum) + colIndex++ } return ret, nil } diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index b2c67ce5531..688510ada89 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -337,9 +337,18 @@ func (r *dmlExprFilterRule) buildRowWithVirtualColumns( if err != nil { return chunk.Row{}, err } - vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(r.sessCtx.GetExprCtx().GetEvalCtx(), columns) - ch := chunk.NewEmptyChunk(vColFts) + var fts []*types.FieldType + for _, col := range columns { + fts = append(fts, col.GetType(r.sessCtx.GetExprCtx().GetEvalCtx())) + } + ch := chunk.NewEmptyChunk(fts) ch.AppendRow(row) + + vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(r.sessCtx.GetExprCtx().GetEvalCtx(), columns) + log.Info("collect virtual column offsets and types", + zap.Int("columnsLen", len(columns)), + zap.Int("vColOffsetsLen", len(vColOffsets)), + zap.Int("vColFtsLen", len(vColFts))) err = table.FillVirtualColumnValue(vColFts, vColOffsets, columns, tableInfo.Columns, r.sessCtx.GetExprCtx(), ch) if err != nil { return chunk.Row{}, err diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index dd7ee4c3474..8d1dacb23c3 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -16,6 +16,7 @@ package filter import ( "testing" + "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" "github.com/pingcap/tiflow/cdc/model" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestShouldSkipDMLBasic(t *testing.T) { @@ -51,37 +53,19 @@ func TestShouldSkipDMLBasic(t *testing.T) { testCases := []testCase{ { - ddl: "create table test.student(id int primary key, name char(50), age int, gender char(10))", + ddl: "create table test.student(id int primary key, name char(50), score int, total_score INT GENERATED ALWAYS AS (score + 10) VIRTUAL, gender char(10))", cfg: &config.FilterConfig{ EventFilters: []*config.EventFilterRule{ { - Matcher: []string{"test.student"}, - IgnoreInsertValueExpr: "age >= 20 or gender = 'female'", - IgnoreDeleteValueExpr: "age >= 32 and age < 48", - IgnoreUpdateOldValueExpr: "gender = 'male'", - IgnoreUpdateNewValueExpr: "age > 28", + Matcher: []string{"test.student"}, + IgnoreInsertValueExpr: "score >= 20 or gender = 'female'", + // IgnoreDeleteValueExpr: "score >= 32 and age < 48", + // IgnoreUpdateOldValueExpr: "gender = 'male'", + // IgnoreUpdateNewValueExpr: "score > 28", }, }, }, cases: []innerCase{ - { // table name does not configure in matcher, no rule to filter it - schema: "test", - table: "teacher", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{999, "Will", 39, "male"}, - ignore: false, - }, - { // schema name does not configure in matcher, no rule to filter it - schema: "no", - table: "student", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{888, "Li", 45, "male"}, - ignore: false, - }, { // insert schema: "test", table: "student", @@ -91,237 +75,281 @@ func TestShouldSkipDMLBasic(t *testing.T) { row: []interface{}{1, "Dongmen", 20, "male"}, ignore: true, }, - { // insert - schema: "test", - table: "student", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{2, "Rustin", 18, "male"}, - ignore: false, - }, - { // insert - schema: "test", - table: "student", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{3, "Susan", 3, "female"}, - ignore: true, - }, - { // delete - schema: "test", - table: "student", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{4, "Helen", 18, "female"}, - ignore: false, - }, - { // delete - schema: "test", - table: "student", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{5, "Madonna", 32, "female"}, - ignore: true, - }, - { // delete - schema: "test", - table: "student", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{6, "Madison", 48, "male"}, - ignore: false, - }, - { // update, filler by new value - schema: "test", - table: "student", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{7, "Marry", 28, "female"}, - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{7, "Marry", 32, "female"}, - ignore: true, - }, - { // update - schema: "test", - table: "student", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{8, "Marilyn", 18, "female"}, - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{8, "Monroe", 22, "female"}, - ignore: false, - }, - { // update, filter by old value - schema: "test", - table: "student", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{9, "Andreja", 25, "male"}, - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{9, "Andreja", 25, "female"}, - ignore: true, - }, - }, - }, - { - ddl: "create table test.computer(id int primary key, brand char(50), price int)", - cfg: &config.FilterConfig{ - EventFilters: []*config.EventFilterRule{ - { - Matcher: []string{"test.*"}, - IgnoreInsertValueExpr: "price > 10000", - }, - }, - }, - cases: []innerCase{ - { // insert - schema: "test", - table: "computer", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{1, "apple", 12888}, - ignore: true, - }, - { // insert - schema: "test", - table: "computer", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{2, "microsoft", 5888}, - ignore: false, - }, - }, - }, - { // test case for gbk charset - ddl: "create table test.poet(id int primary key, name varchar(50) CHARACTER SET GBK COLLATE gbk_bin, works char(100))", - cfg: &config.FilterConfig{ - EventFilters: []*config.EventFilterRule{ - { - Matcher: []string{"*.*"}, - IgnoreInsertValueExpr: "id <= 1 or name='辛弃疾' or works='离骚'", - }, - }, - }, - cases: []innerCase{ - { // insert - schema: "test", - table: "poet", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{1, "李白", "静夜思"}, - ignore: true, - }, - { // insert - schema: "test", - table: "poet", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{2, "杜甫", "石壕吏"}, - ignore: false, - }, - { // insert - schema: "test", - table: "poet", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{4, "屈原", "离骚"}, - ignore: true, - }, - { // insert - schema: "test", - table: "poet", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{3, "辛弃疾", "众里寻他千百度"}, - ignore: true, - }, - }, - }, - { - ddl: "create table test.season(id int primary key, name char(50), start char(100), end char(100))", - cfg: &config.FilterConfig{ - EventFilters: []*config.EventFilterRule{ - { // do not ignore any event of test.season table - // and ignore events of !test.season table by configure SQL expression. - Matcher: []string{"*.*", "!test.season"}, - IgnoreInsertValueExpr: "id >= 1", - IgnoreUpdateNewValueExpr: "id >= 1", - }, - }, - }, - cases: []innerCase{ - { // do not ignore any event of test.season table - schema: "test", - table: "season", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{1, "Spring", "January", "March"}, - ignore: false, - }, - { // do not ignore any event of test.season table - schema: "test", - table: "season", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{2, "Summer", "April", "June"}, - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{2, "Summer", "April", "July"}, - ignore: false, - }, - { // ignore insert event of test.autumn table - schema: "test", - table: "autumn", - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{3, "Autumn", "July", "September"}, - ignore: true, - }, - { // ignore update event of test.winter table - schema: "test", - table: "winter", - preColumns: []*model.ColumnData{ - {ColumnID: 0}, - }, - preRow: []interface{}{4, "Winter", "October", "January"}, - columns: []*model.ColumnData{ - {ColumnID: 0}, - }, - row: []interface{}{4, "Winter", "October", "December"}, - ignore: true, - }, }, }, + // { + // ddl: "create table test.student(id int primary key, name char(50), age int, gender char(10))", + // cfg: &config.FilterConfig{ + // EventFilters: []*config.EventFilterRule{ + // { + // Matcher: []string{"test.student"}, + // IgnoreInsertValueExpr: "age >= 20 or gender = 'female'", + // IgnoreDeleteValueExpr: "age >= 32 and age < 48", + // IgnoreUpdateOldValueExpr: "gender = 'male'", + // IgnoreUpdateNewValueExpr: "age > 28", + // }, + // }, + // }, + // cases: []innerCase{ + // { // table name does not configure in matcher, no rule to filter it + // schema: "test", + // table: "teacher", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{999, "Will", 39, "male"}, + // ignore: false, + // }, + // { // schema name does not configure in matcher, no rule to filter it + // schema: "no", + // table: "student", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{888, "Li", 45, "male"}, + // ignore: false, + // }, + // { // insert + // schema: "test", + // table: "student", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{1, "Dongmen", 20, "male"}, + // ignore: true, + // }, + // { // insert + // schema: "test", + // table: "student", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{2, "Rustin", 18, "male"}, + // ignore: false, + // }, + // { // insert + // schema: "test", + // table: "student", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{3, "Susan", 3, "female"}, + // ignore: true, + // }, + // { // delete + // schema: "test", + // table: "student", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{4, "Helen", 18, "female"}, + // ignore: false, + // }, + // { // delete + // schema: "test", + // table: "student", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{5, "Madonna", 32, "female"}, + // ignore: true, + // }, + // { // delete + // schema: "test", + // table: "student", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{6, "Madison", 48, "male"}, + // ignore: false, + // }, + // { // update, filler by new value + // schema: "test", + // table: "student", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{7, "Marry", 28, "female"}, + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{7, "Marry", 32, "female"}, + // ignore: true, + // }, + // { // update + // schema: "test", + // table: "student", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{8, "Marilyn", 18, "female"}, + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{8, "Monroe", 22, "female"}, + // ignore: false, + // }, + // { // update, filter by old value + // schema: "test", + // table: "student", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{9, "Andreja", 25, "male"}, + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{9, "Andreja", 25, "female"}, + // ignore: true, + // }, + // }, + // }, + // { + // ddl: "create table test.computer(id int primary key, brand char(50), price int)", + // cfg: &config.FilterConfig{ + // EventFilters: []*config.EventFilterRule{ + // { + // Matcher: []string{"test.*"}, + // IgnoreInsertValueExpr: "price > 10000", + // }, + // }, + // }, + // cases: []innerCase{ + // { // insert + // schema: "test", + // table: "computer", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{1, "apple", 12888}, + // ignore: true, + // }, + // { // insert + // schema: "test", + // table: "computer", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{2, "microsoft", 5888}, + // ignore: false, + // }, + // }, + // }, + // { // test case for gbk charset + // ddl: "create table test.poet(id int primary key, name varchar(50) CHARACTER SET GBK COLLATE gbk_bin, works char(100))", + // cfg: &config.FilterConfig{ + // EventFilters: []*config.EventFilterRule{ + // { + // Matcher: []string{"*.*"}, + // IgnoreInsertValueExpr: "id <= 1 or name='辛弃疾' or works='离骚'", + // }, + // }, + // }, + // cases: []innerCase{ + // { // insert + // schema: "test", + // table: "poet", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{1, "李白", "静夜思"}, + // ignore: true, + // }, + // { // insert + // schema: "test", + // table: "poet", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{2, "杜甫", "石壕吏"}, + // ignore: false, + // }, + // { // insert + // schema: "test", + // table: "poet", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{4, "屈原", "离骚"}, + // ignore: true, + // }, + // { // insert + // schema: "test", + // table: "poet", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{3, "辛弃疾", "众里寻他千百度"}, + // ignore: true, + // }, + // }, + // }, + // { + // ddl: "create table test.season(id int primary key, name char(50), start char(100), end char(100))", + // cfg: &config.FilterConfig{ + // EventFilters: []*config.EventFilterRule{ + // { // do not ignore any event of test.season table + // // and ignore events of !test.season table by configure SQL expression. + // Matcher: []string{"*.*", "!test.season"}, + // IgnoreInsertValueExpr: "id >= 1", + // IgnoreUpdateNewValueExpr: "id >= 1", + // }, + // }, + // }, + // cases: []innerCase{ + // { // do not ignore any event of test.season table + // schema: "test", + // table: "season", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{1, "Spring", "January", "March"}, + // ignore: false, + // }, + // { // do not ignore any event of test.season table + // schema: "test", + // table: "season", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{2, "Summer", "April", "June"}, + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{2, "Summer", "April", "July"}, + // ignore: false, + // }, + // { // ignore insert event of test.autumn table + // schema: "test", + // table: "autumn", + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{3, "Autumn", "July", "September"}, + // ignore: true, + // }, + // { // ignore update event of test.winter table + // schema: "test", + // table: "winter", + // preColumns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // preRow: []interface{}{4, "Winter", "October", "January"}, + // columns: []*model.ColumnData{ + // {ColumnID: 0}, + // }, + // row: []interface{}{4, "Winter", "October", "December"}, + // ignore: true, + // }, + // }, + // }, } sessCtx := utils.ZeroSessionCtx for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) + log.Info("test case", zap.Any("tableInfo", tableInfo)) f, err := newExprFilter("", tc.cfg) require.Nil(t, err) for _, c := range tc.cases { From 31f45cbcf27b187086be7de5049da1aa177e2689 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 21:33:13 +0800 Subject: [PATCH 03/14] add unit tests --- pkg/filter/expr_filter_test.go | 629 +++++++++++++++++++-------------- 1 file changed, 357 insertions(+), 272 deletions(-) diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index 8d1dacb23c3..6b1abc0c7bc 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -53,19 +53,37 @@ func TestShouldSkipDMLBasic(t *testing.T) { testCases := []testCase{ { - ddl: "create table test.student(id int primary key, name char(50), score int, total_score INT GENERATED ALWAYS AS (score + 10) VIRTUAL, gender char(10))", + ddl: "create table test.student(id int primary key, name char(50), age int, gender char(10))", cfg: &config.FilterConfig{ EventFilters: []*config.EventFilterRule{ { - Matcher: []string{"test.student"}, - IgnoreInsertValueExpr: "score >= 20 or gender = 'female'", - // IgnoreDeleteValueExpr: "score >= 32 and age < 48", - // IgnoreUpdateOldValueExpr: "gender = 'male'", - // IgnoreUpdateNewValueExpr: "score > 28", + Matcher: []string{"test.student"}, + IgnoreInsertValueExpr: "age >= 20 or gender = 'female'", + IgnoreDeleteValueExpr: "age >= 32 and age < 48", + IgnoreUpdateOldValueExpr: "gender = 'male'", + IgnoreUpdateNewValueExpr: "age > 28", }, }, }, cases: []innerCase{ + { // table name does not configure in matcher, no rule to filter it + schema: "test", + table: "teacher", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{999, "Will", 39, "male"}, + ignore: false, + }, + { // schema name does not configure in matcher, no rule to filter it + schema: "no", + table: "student", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{888, "Li", 45, "male"}, + ignore: false, + }, { // insert schema: "test", table: "student", @@ -75,274 +93,341 @@ func TestShouldSkipDMLBasic(t *testing.T) { row: []interface{}{1, "Dongmen", 20, "male"}, ignore: true, }, + { // insert + schema: "test", + table: "student", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{2, "Rustin", 18, "male"}, + ignore: false, + }, + { // insert + schema: "test", + table: "student", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{3, "Susan", 3, "female"}, + ignore: true, + }, + { // delete + schema: "test", + table: "student", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{4, "Helen", 18, "female"}, + ignore: false, + }, + { // delete + schema: "test", + table: "student", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{5, "Madonna", 32, "female"}, + ignore: true, + }, + { // delete + schema: "test", + table: "student", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{6, "Madison", 48, "male"}, + ignore: false, + }, + { // update, filler by new value + schema: "test", + table: "student", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{7, "Marry", 28, "female"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{7, "Marry", 32, "female"}, + ignore: true, + }, + { // update + schema: "test", + table: "student", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{8, "Marilyn", 18, "female"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{8, "Monroe", 22, "female"}, + ignore: false, + }, + { // update, filter by old value + schema: "test", + table: "student", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{9, "Andreja", 25, "male"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{9, "Andreja", 25, "female"}, + ignore: true, + }, + }, + }, + // test table with vitual columns + { + ddl: "create table test.student_score(id int primary key, name char(50), score int, total_score INT GENERATED ALWAYS AS (score + 10) VIRTUAL, gender char(10))", + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.student_score"}, + IgnoreInsertValueExpr: "score >= 20 or gender = 'female'", + IgnoreDeleteValueExpr: "score >= 32 and gender = 'female'", + IgnoreUpdateOldValueExpr: "gender = 'male'", + IgnoreUpdateNewValueExpr: "total_score > 38", + }, + }, + }, + cases: []innerCase{ + { // insert + schema: "test", + table: "student_score", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{1, "Dongmen", 20, "male"}, + ignore: true, + }, + { // insert + schema: "test", + table: "student_score", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{2, "Rustin", 18, "male"}, + ignore: false, + }, + { // insert + schema: "test", + table: "student_score", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{3, "Susan", 3, "female"}, + ignore: true, + }, + { // delete + schema: "test", + table: "student_score", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{4, "Helen", 18, "female"}, + ignore: false, + }, + { // delete + schema: "test", + table: "student_score", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{5, "Madonna", 32, "female"}, + ignore: true, + }, + { // delete + schema: "test", + table: "student_score", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{6, "Madison", 48, "male"}, + ignore: false, + }, + { // update, filler by new value + schema: "test", + table: "student_score", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{7, "Marry", 28, "female"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{7, "Marry", 32, "female"}, + ignore: true, + }, + { // update + schema: "test", + table: "student_score", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{8, "Marilyn", 18, "female"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{8, "Monroe", 22, "female"}, + ignore: false, + }, + { // update, filter by old value + schema: "test", + table: "student_score", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{9, "Andreja", 25, "male"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{9, "Andreja", 25, "female"}, + ignore: true, + }, + }, + }, + { + ddl: "create table test.computer(id int primary key, brand char(50), price int)", + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"test.*"}, + IgnoreInsertValueExpr: "price > 10000", + }, + }, + }, + cases: []innerCase{ + { // insert + schema: "test", + table: "computer", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{1, "apple", 12888}, + ignore: true, + }, + { // insert + schema: "test", + table: "computer", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{2, "microsoft", 5888}, + ignore: false, + }, + }, + }, + { // test case for gbk charset + ddl: "create table test.poet(id int primary key, name varchar(50) CHARACTER SET GBK COLLATE gbk_bin, works char(100))", + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { + Matcher: []string{"*.*"}, + IgnoreInsertValueExpr: "id <= 1 or name='辛弃疾' or works='离骚'", + }, + }, + }, + cases: []innerCase{ + { // insert + schema: "test", + table: "poet", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{1, "李白", "静夜思"}, + ignore: true, + }, + { // insert + schema: "test", + table: "poet", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{2, "杜甫", "石壕吏"}, + ignore: false, + }, + { // insert + schema: "test", + table: "poet", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{4, "屈原", "离骚"}, + ignore: true, + }, + { // insert + schema: "test", + table: "poet", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{3, "辛弃疾", "众里寻他千百度"}, + ignore: true, + }, + }, + }, + { + ddl: "create table test.season(id int primary key, name char(50), start char(100), end char(100))", + cfg: &config.FilterConfig{ + EventFilters: []*config.EventFilterRule{ + { // do not ignore any event of test.season table + // and ignore events of !test.season table by configure SQL expression. + Matcher: []string{"*.*", "!test.season"}, + IgnoreInsertValueExpr: "id >= 1", + IgnoreUpdateNewValueExpr: "id >= 1", + }, + }, + }, + cases: []innerCase{ + { // do not ignore any event of test.season table + schema: "test", + table: "season", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{1, "Spring", "January", "March"}, + ignore: false, + }, + { // do not ignore any event of test.season table + schema: "test", + table: "season", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{2, "Summer", "April", "June"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{2, "Summer", "April", "July"}, + ignore: false, + }, + { // ignore insert event of test.autumn table + schema: "test", + table: "autumn", + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{3, "Autumn", "July", "September"}, + ignore: true, + }, + { // ignore update event of test.winter table + schema: "test", + table: "winter", + preColumns: []*model.ColumnData{ + {ColumnID: 0}, + }, + preRow: []interface{}{4, "Winter", "October", "January"}, + columns: []*model.ColumnData{ + {ColumnID: 0}, + }, + row: []interface{}{4, "Winter", "October", "December"}, + ignore: true, + }, }, }, - // { - // ddl: "create table test.student(id int primary key, name char(50), age int, gender char(10))", - // cfg: &config.FilterConfig{ - // EventFilters: []*config.EventFilterRule{ - // { - // Matcher: []string{"test.student"}, - // IgnoreInsertValueExpr: "age >= 20 or gender = 'female'", - // IgnoreDeleteValueExpr: "age >= 32 and age < 48", - // IgnoreUpdateOldValueExpr: "gender = 'male'", - // IgnoreUpdateNewValueExpr: "age > 28", - // }, - // }, - // }, - // cases: []innerCase{ - // { // table name does not configure in matcher, no rule to filter it - // schema: "test", - // table: "teacher", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{999, "Will", 39, "male"}, - // ignore: false, - // }, - // { // schema name does not configure in matcher, no rule to filter it - // schema: "no", - // table: "student", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{888, "Li", 45, "male"}, - // ignore: false, - // }, - // { // insert - // schema: "test", - // table: "student", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{1, "Dongmen", 20, "male"}, - // ignore: true, - // }, - // { // insert - // schema: "test", - // table: "student", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{2, "Rustin", 18, "male"}, - // ignore: false, - // }, - // { // insert - // schema: "test", - // table: "student", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{3, "Susan", 3, "female"}, - // ignore: true, - // }, - // { // delete - // schema: "test", - // table: "student", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{4, "Helen", 18, "female"}, - // ignore: false, - // }, - // { // delete - // schema: "test", - // table: "student", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{5, "Madonna", 32, "female"}, - // ignore: true, - // }, - // { // delete - // schema: "test", - // table: "student", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{6, "Madison", 48, "male"}, - // ignore: false, - // }, - // { // update, filler by new value - // schema: "test", - // table: "student", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{7, "Marry", 28, "female"}, - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{7, "Marry", 32, "female"}, - // ignore: true, - // }, - // { // update - // schema: "test", - // table: "student", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{8, "Marilyn", 18, "female"}, - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{8, "Monroe", 22, "female"}, - // ignore: false, - // }, - // { // update, filter by old value - // schema: "test", - // table: "student", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{9, "Andreja", 25, "male"}, - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{9, "Andreja", 25, "female"}, - // ignore: true, - // }, - // }, - // }, - // { - // ddl: "create table test.computer(id int primary key, brand char(50), price int)", - // cfg: &config.FilterConfig{ - // EventFilters: []*config.EventFilterRule{ - // { - // Matcher: []string{"test.*"}, - // IgnoreInsertValueExpr: "price > 10000", - // }, - // }, - // }, - // cases: []innerCase{ - // { // insert - // schema: "test", - // table: "computer", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{1, "apple", 12888}, - // ignore: true, - // }, - // { // insert - // schema: "test", - // table: "computer", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{2, "microsoft", 5888}, - // ignore: false, - // }, - // }, - // }, - // { // test case for gbk charset - // ddl: "create table test.poet(id int primary key, name varchar(50) CHARACTER SET GBK COLLATE gbk_bin, works char(100))", - // cfg: &config.FilterConfig{ - // EventFilters: []*config.EventFilterRule{ - // { - // Matcher: []string{"*.*"}, - // IgnoreInsertValueExpr: "id <= 1 or name='辛弃疾' or works='离骚'", - // }, - // }, - // }, - // cases: []innerCase{ - // { // insert - // schema: "test", - // table: "poet", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{1, "李白", "静夜思"}, - // ignore: true, - // }, - // { // insert - // schema: "test", - // table: "poet", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{2, "杜甫", "石壕吏"}, - // ignore: false, - // }, - // { // insert - // schema: "test", - // table: "poet", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{4, "屈原", "离骚"}, - // ignore: true, - // }, - // { // insert - // schema: "test", - // table: "poet", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{3, "辛弃疾", "众里寻他千百度"}, - // ignore: true, - // }, - // }, - // }, - // { - // ddl: "create table test.season(id int primary key, name char(50), start char(100), end char(100))", - // cfg: &config.FilterConfig{ - // EventFilters: []*config.EventFilterRule{ - // { // do not ignore any event of test.season table - // // and ignore events of !test.season table by configure SQL expression. - // Matcher: []string{"*.*", "!test.season"}, - // IgnoreInsertValueExpr: "id >= 1", - // IgnoreUpdateNewValueExpr: "id >= 1", - // }, - // }, - // }, - // cases: []innerCase{ - // { // do not ignore any event of test.season table - // schema: "test", - // table: "season", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{1, "Spring", "January", "March"}, - // ignore: false, - // }, - // { // do not ignore any event of test.season table - // schema: "test", - // table: "season", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{2, "Summer", "April", "June"}, - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{2, "Summer", "April", "July"}, - // ignore: false, - // }, - // { // ignore insert event of test.autumn table - // schema: "test", - // table: "autumn", - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{3, "Autumn", "July", "September"}, - // ignore: true, - // }, - // { // ignore update event of test.winter table - // schema: "test", - // table: "winter", - // preColumns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // preRow: []interface{}{4, "Winter", "October", "January"}, - // columns: []*model.ColumnData{ - // {ColumnID: 0}, - // }, - // row: []interface{}{4, "Winter", "October", "December"}, - // ignore: true, - // }, - // }, - // }, } sessCtx := utils.ZeroSessionCtx From 129a330bcfb3616876fe4d70c09b07e91d67b6e7 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 22:52:48 +0800 Subject: [PATCH 04/14] add more unit test --- cdc/model/mounter.go | 16 +++++++++------- cdc/model/schema_storage.go | 6 +++++- cdc/model/schema_storage_test.go | 10 +++++++--- pkg/filter/expr_filter.go | 8 ++++---- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 1b6c7fd9767..02a07331121 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -25,16 +25,18 @@ import ( // RowChangedDatums is used to store the changed datums of a row. type RowChangedDatums struct { - RowDatums []types.Datum // datums without virtual columns - PreRowDatums []types.Datum // pre datums without virtual columns + RowDatums []types.Datum // row datums (excluding virtual columns) + PreRowDatums []types.Datum // pre row datums (excluding virtual columns) } -func (r *RowChangedDatums) getDatumsWithVirtualCols(datums []types.Datum, virtualColsOffset []int) []types.Datum { +// mergeDatumWithVirtualCols returns a slice of row datums with placeholders for virtual columns placed at the specified offset. +func (r *RowChangedDatums) mergeDatumWithVirtualCols(datums []types.Datum, virtualColsOffset []int) []types.Datum { if len(virtualColsOffset) == 0 { return datums } sort.Ints(virtualColsOffset) + // checks if virtual column offsets are within valid range. maxAllowedIndex := len(datums) + len(virtualColsOffset) for _, idx := range virtualColsOffset { if idx < 0 || idx >= maxAllowedIndex { @@ -59,12 +61,12 @@ func (r *RowChangedDatums) getDatumsWithVirtualCols(datums []types.Datum, virtua return result } -func (r *RowChangedDatums) GetRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { - return r.getDatumsWithVirtualCols(r.RowDatums, virtualColsOffset) +func (r *RowChangedDatums) RowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { + return r.mergeDatumWithVirtualCols(r.RowDatums, virtualColsOffset) } -func (r *RowChangedDatums) GetPreRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { - return r.getDatumsWithVirtualCols(r.PreRowDatums, virtualColsOffset) +func (r *RowChangedDatums) PreRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { + return r.mergeDatumWithVirtualCols(r.PreRowDatums, virtualColsOffset) } // IsEmpty returns true if the RowChangeDatums is empty. diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 227fc004296..dad065a0189 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -91,6 +91,10 @@ type TableInfo struct { handleColID []int64 // offset of virtual columns in TableInfo.Columns + // for example: + // Table has 4 columns: a (physical), b (physical), c (virtual), d (virtual) + // TableInfo.Columns order: a, b, c, d + // VirtualColumnsOffset will be [2, 3] (indices of virtual columns c and d) VirtualColumnsOffset []int // rowColInfosWithoutVirtualCols is the same as rowColInfos, but without virtual columns rowColInfosWithoutVirtualCols *[]rowcodec.ColInfo @@ -195,7 +199,7 @@ func (ti *TableInfo) initRowColInfosWithoutVirtualCols() { log.Panic("invalid rowColInfosWithoutVirtualCols", zap.Int("len(colInfos)", len(colInfos)), zap.Int("len(ti.rowColInfos)", len(ti.rowColInfos)), - zap.Int("ti.len(ti.VirtualColumnsOffset)", len(ti.VirtualColumnsOffset))) + zap.Any("ti.VirtualColumnsOffset", ti.VirtualColumnsOffset)) } ti.rowColInfosWithoutVirtualCols = &colInfos } diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 33a31aaddf9..be6fb5bfcfc 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -511,7 +511,7 @@ func TestBuildTiDBTableInfoWithUniqueKey(t *testing.T) { require.Equal(t, columns[3].Flag, *tableInfo.ForceGetColumnFlagType(tableInfo.Columns[3].ID)) } -func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) { +func TestBuildTiDBTableInfoWithVirtualColumns(t *testing.T) { t.Parallel() ftNull := parser_types.NewFieldType(mysql.TypeUnspecified) ftNull.SetFlag(mysql.NotNullFlag) @@ -519,7 +519,7 @@ func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) { ftNotNull := parser_types.NewFieldType(mysql.TypeUnspecified) ftNotNull.SetFlag(mysql.NotNullFlag | mysql.MultipleKeyFlag) - tableInfo := timodel.TableInfo{ + tidbTableInfo := timodel.TableInfo{ Columns: []*timodel.ColumnInfo{ { Name: pmodel.CIStr{O: "a"}, @@ -590,7 +590,8 @@ func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) { IsCommonHandle: false, PKIsHandle: false, } - infoWithourVirtualCols := BuildTiDBTableInfoWithoutVirtualColumns(&tableInfo) + // test BuildTiDBTableInfoWithoutVirtualColumns + infoWithourVirtualCols := BuildTiDBTableInfoWithoutVirtualColumns(&tidbTableInfo) require.Equal(t, 3, len(infoWithourVirtualCols.Columns)) require.Equal(t, 0, infoWithourVirtualCols.Columns[0].Offset) require.Equal(t, "a", infoWithourVirtualCols.Columns[0].Name.O) @@ -598,4 +599,7 @@ func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) { require.Equal(t, "b", infoWithourVirtualCols.Columns[1].Name.O) require.Equal(t, 2, infoWithourVirtualCols.Columns[2].Offset) require.Equal(t, "d", infoWithourVirtualCols.Columns[2].Name.O) + + tableInfo := WrapTableInfo(100, "test", 1000, &tidbTableInfo) + require.Equal(t, []int{2}, tableInfo.VirtualColumnsOffset) } diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index 688510ada89..e5479b5a655 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -276,7 +276,7 @@ func (r *dmlExprFilterRule) shouldSkipDML( return false, err } return r.skipDMLByExpression( - rawRow.GetRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), + rawRow.RowDatumsWithVirtualCols(ti.VirtualColumnsOffset), exprs, ti, ) @@ -290,7 +290,7 @@ func (r *dmlExprFilterRule) shouldSkipDML( return false, err } ignoreOld, err := r.skipDMLByExpression( - rawRow.GetPreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), + rawRow.PreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), oldExprs, ti, ) @@ -298,7 +298,7 @@ func (r *dmlExprFilterRule) shouldSkipDML( return false, err } ignoreNew, err := r.skipDMLByExpression( - rawRow.GetRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), + rawRow.RowDatumsWithVirtualCols(ti.VirtualColumnsOffset), newExprs, ti, ) @@ -312,7 +312,7 @@ func (r *dmlExprFilterRule) shouldSkipDML( return false, err } return r.skipDMLByExpression( - rawRow.GetPreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), + rawRow.PreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset), exprs, ti, ) From be05085ade6ff56cd515584c1d3f69003b5fbc7c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 22:57:11 +0800 Subject: [PATCH 05/14] remove unnecessary log --- pkg/filter/expr_filter.go | 4 ---- pkg/filter/expr_filter_test.go | 3 --- 2 files changed, 7 deletions(-) diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index e5479b5a655..ceb6eb06d0c 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -345,10 +345,6 @@ func (r *dmlExprFilterRule) buildRowWithVirtualColumns( ch.AppendRow(row) vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(r.sessCtx.GetExprCtx().GetEvalCtx(), columns) - log.Info("collect virtual column offsets and types", - zap.Int("columnsLen", len(columns)), - zap.Int("vColOffsetsLen", len(vColOffsets)), - zap.Int("vColFtsLen", len(vColFts))) err = table.FillVirtualColumnValue(vColFts, vColOffsets, columns, tableInfo.Columns, r.sessCtx.GetExprCtx(), ch) if err != nil { return chunk.Row{}, err diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index 6b1abc0c7bc..629d3b59d0c 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -16,7 +16,6 @@ package filter import ( "testing" - "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" "github.com/pingcap/tiflow/cdc/model" @@ -24,7 +23,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) func TestShouldSkipDMLBasic(t *testing.T) { @@ -434,7 +432,6 @@ func TestShouldSkipDMLBasic(t *testing.T) { for _, tc := range testCases { tableInfo := helper.execDDL(tc.ddl) - log.Info("test case", zap.Any("tableInfo", tableInfo)) f, err := newExprFilter("", tc.cfg) require.Nil(t, err) for _, c := range tc.cases { From 03c077f1a510a8f5cc2953c5592e2997b81902dc Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 23:09:55 +0800 Subject: [PATCH 06/14] add integration tests --- cdc/model/mounter.go | 2 ++ .../event_filter/conf/cf.toml | 6 +++++- .../event_filter/data/test.sql | 21 ++++++++++++++++++- tests/integration_tests/event_filter/run.sh | 15 ++++++++++++- 4 files changed, 41 insertions(+), 3 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 02a07331121..7efff5e98c5 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -61,10 +61,12 @@ func (r *RowChangedDatums) mergeDatumWithVirtualCols(datums []types.Datum, virtu return result } +// RowDatumsWithVirtualCols returns the row datums with placeholders for virtual columns placed at the specified offset. func (r *RowChangedDatums) RowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { return r.mergeDatumWithVirtualCols(r.RowDatums, virtualColsOffset) } +// PreRowDatumsWithVirtualCols returns the pre row datums with placeholders for virtual columns placed at the specified offset. func (r *RowChangedDatums) PreRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { return r.mergeDatumWithVirtualCols(r.PreRowDatums, virtualColsOffset) } diff --git a/tests/integration_tests/event_filter/conf/cf.toml b/tests/integration_tests/event_filter/conf/cf.toml index 4597fe3e456..1bbc93c6546 100644 --- a/tests/integration_tests/event_filter/conf/cf.toml +++ b/tests/integration_tests/event_filter/conf/cf.toml @@ -16,4 +16,8 @@ ignore-event = ["alter table"] [[filter.event-filters]] matcher = ["event_filter.t_name*"] -ignore-event = ["rename table"] \ No newline at end of file +ignore-event = ["rename table"] + +[[filter.event-filters]] +matcher = ["event_filter.t_virtual"] +ignore-insert-value-expr = "id = 2 or category = 'furniture'" \ No newline at end of file diff --git a/tests/integration_tests/event_filter/data/test.sql b/tests/integration_tests/event_filter/data/test.sql index 5ffd396aea1..95b92552042 100644 --- a/tests/integration_tests/event_filter/data/test.sql +++ b/tests/integration_tests/event_filter/data/test.sql @@ -110,4 +110,23 @@ CREATE TABLE t_name3 ( id INT, name varchar(128), PRIMARY KEY (id) -); \ No newline at end of file +); + +-- Table with virtual columns for testing ignore-insert-value-expr +CREATE TABLE t_virtual ( + id INT PRIMARY KEY, + price DECIMAL(10,2), + quantity INT, + total_price DECIMAL(10,2) AS (price * quantity) VIRTUAL, + discount DECIMAL(10,2) AS (CASE WHEN quantity > 10 THEN price * 0.9 ELSE price END) VIRTUAL, + category VARCHAR(20), + is_discounted BOOLEAN AS (quantity > 10) VIRTUAL +); + +-- These inserts should be filtered based on the rules +INSERT INTO t_virtual (id, price, quantity, category) VALUES (2, 100.00, 5, 'electronics'); +INSERT INTO t_virtual (id, price, quantity, category) VALUES (3, 200.00, 15, 'furniture'); + +-- These inserts should not be filtered +INSERT INTO t_virtual (id, price, quantity, category) VALUES (1, 50.00, 2, 'clothing'); +INSERT INTO t_virtual (id, price, quantity, category) VALUES (4, 150.00, 1, 'books'); \ No newline at end of file diff --git a/tests/integration_tests/event_filter/run.sh b/tests/integration_tests/event_filter/run.sh index 4cfa71b1a6c..7f951d38afe 100644 --- a/tests/integration_tests/event_filter/run.sh +++ b/tests/integration_tests/event_filter/run.sh @@ -48,6 +48,7 @@ function run() { check_table_exists "event_filter.t_name1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_table_exists "event_filter.t_name2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_table_exists "event_filter.t_name3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "event_filter.t_virtual" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} # check those rows that are not filtered are synced to downstream run_sql "select count(1) from event_filter.t1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} @@ -61,6 +62,18 @@ function run() { run_sql "select count(5) from event_filter.t1 where id=4;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_contains "count(5): 1" + # check virtual column table filtering + run_sql "select count(1) from event_filter.t_virtual;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 2" + run_sql "select count(1) from event_filter.t_virtual where id=1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 1" + run_sql "select count(1) from event_filter.t_virtual where id=2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 0" + run_sql "select count(1) from event_filter.t_virtual where id=3;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 0" + run_sql "select count(1) from event_filter.t_virtual where id=4;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 1" + run_sql "TRUNCATE TABLE event_filter.t_truncate;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql_file $CUR/data/test_truncate.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "ALTER TABLE event_filter.t_alter MODIFY t_bigint BIGINT;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} @@ -82,4 +95,4 @@ function run() { trap stop_tidb_cluster EXIT run $* check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" \ No newline at end of file From 5e3f32470f9ab12a4f5c223bdbf8c5142192a6e2 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 23:14:00 +0800 Subject: [PATCH 07/14] add integration tests --- .../event_filter/conf/cf.toml | 3 ++- .../event_filter/data/test.sql | 14 +++++++--- tests/integration_tests/event_filter/run.sh | 26 ++++++++++++++++--- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/tests/integration_tests/event_filter/conf/cf.toml b/tests/integration_tests/event_filter/conf/cf.toml index 1bbc93c6546..38049ca85b6 100644 --- a/tests/integration_tests/event_filter/conf/cf.toml +++ b/tests/integration_tests/event_filter/conf/cf.toml @@ -18,6 +18,7 @@ ignore-event = ["alter table"] matcher = ["event_filter.t_name*"] ignore-event = ["rename table"] +# Filter rules for virtual column table [[filter.event-filters]] matcher = ["event_filter.t_virtual"] -ignore-insert-value-expr = "id = 2 or category = 'furniture'" \ No newline at end of file +ignore-insert-value-expr = "id = 2 or category = 'furniture' or is_discounted = true" \ No newline at end of file diff --git a/tests/integration_tests/event_filter/data/test.sql b/tests/integration_tests/event_filter/data/test.sql index 95b92552042..b50cbf82637 100644 --- a/tests/integration_tests/event_filter/data/test.sql +++ b/tests/integration_tests/event_filter/data/test.sql @@ -123,10 +123,16 @@ CREATE TABLE t_virtual ( is_discounted BOOLEAN AS (quantity > 10) VIRTUAL ); --- These inserts should be filtered based on the rules -INSERT INTO t_virtual (id, price, quantity, category) VALUES (2, 100.00, 5, 'electronics'); -INSERT INTO t_virtual (id, price, quantity, category) VALUES (3, 200.00, 15, 'furniture'); +-- These inserts should be filtered based on the rules: +-- 1. id = 2 +-- 2. category = 'furniture' +-- 3. is_discounted = true (quantity > 10) +INSERT INTO t_virtual (id, price, quantity, category) VALUES (2, 100.00, 5, 'electronics'); -- filtered by id=2 +INSERT INTO t_virtual (id, price, quantity, category) VALUES (3, 200.00, 15, 'furniture'); -- filtered by category and is_discounted +INSERT INTO t_virtual (id, price, quantity, category) VALUES (5, 300.00, 20, 'electronics'); -- filtered by is_discounted +INSERT INTO t_virtual (id, price, quantity, category) VALUES (6, 400.00, 12, 'clothing'); -- filtered by is_discounted -- These inserts should not be filtered INSERT INTO t_virtual (id, price, quantity, category) VALUES (1, 50.00, 2, 'clothing'); -INSERT INTO t_virtual (id, price, quantity, category) VALUES (4, 150.00, 1, 'books'); \ No newline at end of file +INSERT INTO t_virtual (id, price, quantity, category) VALUES (4, 150.00, 1, 'books'); +INSERT INTO t_virtual (id, price, quantity, category) VALUES (7, 250.00, 8, 'electronics'); \ No newline at end of file diff --git a/tests/integration_tests/event_filter/run.sh b/tests/integration_tests/event_filter/run.sh index 7f951d38afe..3ed4bcd8650 100644 --- a/tests/integration_tests/event_filter/run.sh +++ b/tests/integration_tests/event_filter/run.sh @@ -62,18 +62,36 @@ function run() { run_sql "select count(5) from event_filter.t1 where id=4;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_contains "count(5): 1" - # check virtual column table filtering + # Add these checks after the existing checks for t_virtual + # check virtual column table filtering with virtual column condition run_sql "select count(1) from event_filter.t_virtual;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 2" + check_contains "count(1): 3" # Only 3 rows should pass the filter + run_sql "select count(1) from event_filter.t_virtual where id=1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_contains "count(1): 1" + run_sql "select count(1) from event_filter.t_virtual where id=2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 0" + check_contains "count(1): 0" # filtered by id=2 + run_sql "select count(1) from event_filter.t_virtual where id=3;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 0" + check_contains "count(1): 0" # filtered by category and is_discounted + run_sql "select count(1) from event_filter.t_virtual where id=4;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_contains "count(1): 1" + run_sql "select count(1) from event_filter.t_virtual where id=5;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 0" # filtered by is_discounted + + run_sql "select count(1) from event_filter.t_virtual where id=6;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 0" # filtered by is_discounted + + run_sql "select count(1) from event_filter.t_virtual where id=7;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 1" + + # Verify that rows with is_discounted=true are filtered + run_sql "select count(1) from event_filter.t_virtual where quantity > 10;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "count(1): 0" # All discounted items should be filtered + run_sql "TRUNCATE TABLE event_filter.t_truncate;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql_file $CUR/data/test_truncate.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "ALTER TABLE event_filter.t_alter MODIFY t_bigint BIGINT;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} From aa63b793bc4ee3ae87bd9f0fe4a9212030b41409 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 23:19:40 +0800 Subject: [PATCH 08/14] address comments --- cdc/model/mounter.go | 5 ++++- dm/pkg/utils/common.go | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 7efff5e98c5..972bfefb181 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -34,7 +34,10 @@ func (r *RowChangedDatums) mergeDatumWithVirtualCols(datums []types.Datum, virtu if len(virtualColsOffset) == 0 { return datums } - sort.Ints(virtualColsOffset) + if !sort.IntsAreSorted(virtualColsOffset) { + log.Panic("virtual column offsets must be sorted", + zap.Ints("virtualColsOffset", virtualColsOffset)) + } // checks if virtual column offsets are within valid range. maxAllowedIndex := len(datums) + len(virtualColsOffset) diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 225de8c08df..4509a4af21f 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -227,6 +227,9 @@ func AdjustBinaryProtocolForDatum(ctx sessionctx.Context, data []interface{}, co datum := types.NewDatum(d) // fix the next not virtual column for !model.IsColCDCVisible(cols[colIndex]) { + if colIndex >= len(cols) { + return nil, fmt.Errorf("colIndex out of bounds") + } colIndex++ } castDatum, err := table.CastValue(ctx, datum, cols[colIndex], false, false) From 1ef88fead48c2cdfe31f77f5ca998af3baa23bff Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 3 Jul 2025 23:35:49 +0800 Subject: [PATCH 09/14] fix format --- tests/integration_tests/event_filter/run.sh | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration_tests/event_filter/run.sh b/tests/integration_tests/event_filter/run.sh index 3ed4bcd8650..487bab15ee1 100644 --- a/tests/integration_tests/event_filter/run.sh +++ b/tests/integration_tests/event_filter/run.sh @@ -65,32 +65,32 @@ function run() { # Add these checks after the existing checks for t_virtual # check virtual column table filtering with virtual column condition run_sql "select count(1) from event_filter.t_virtual;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 3" # Only 3 rows should pass the filter + check_contains "count(1): 3" # Only 3 rows should pass the filter run_sql "select count(1) from event_filter.t_virtual where id=1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_contains "count(1): 1" run_sql "select count(1) from event_filter.t_virtual where id=2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 0" # filtered by id=2 + check_contains "count(1): 0" # filtered by id=2 run_sql "select count(1) from event_filter.t_virtual where id=3;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 0" # filtered by category and is_discounted + check_contains "count(1): 0" # filtered by category and is_discounted run_sql "select count(1) from event_filter.t_virtual where id=4;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_contains "count(1): 1" run_sql "select count(1) from event_filter.t_virtual where id=5;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 0" # filtered by is_discounted + check_contains "count(1): 0" # filtered by is_discounted run_sql "select count(1) from event_filter.t_virtual where id=6;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 0" # filtered by is_discounted + check_contains "count(1): 0" # filtered by is_discounted run_sql "select count(1) from event_filter.t_virtual where id=7;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_contains "count(1): 1" # Verify that rows with is_discounted=true are filtered run_sql "select count(1) from event_filter.t_virtual where quantity > 10;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_contains "count(1): 0" # All discounted items should be filtered + check_contains "count(1): 0" # All discounted items should be filtered run_sql "TRUNCATE TABLE event_filter.t_truncate;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_sql_file $CUR/data/test_truncate.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -113,4 +113,4 @@ function run() { trap stop_tidb_cluster EXIT run $* check_logs $WORK_DIR -echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" \ No newline at end of file +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From e82e29c66a0505ec62242d480f3506e9dbca0474 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Fri, 4 Jul 2025 02:19:21 +0800 Subject: [PATCH 10/14] fix test scripts --- .../event_filter/data/test.sql | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/tests/integration_tests/event_filter/data/test.sql b/tests/integration_tests/event_filter/data/test.sql index b50cbf82637..ed03fa994bc 100644 --- a/tests/integration_tests/event_filter/data/test.sql +++ b/tests/integration_tests/event_filter/data/test.sql @@ -38,6 +38,32 @@ WHERE id = 4; /* ignore by event type*/ DROP TABLE t1; + +-- Table with virtual columns for testing ignore-insert-value-expr +CREATE TABLE t_virtual ( + id INT PRIMARY KEY, + price DECIMAL(10,2), + quantity INT, + total_price DECIMAL(10,2) AS (price * quantity) VIRTUAL, + discount DECIMAL(10,2) AS (CASE WHEN quantity > 10 THEN price * 0.9 ELSE price END) VIRTUAL, + category VARCHAR(20), + is_discounted BOOLEAN AS (quantity > 10) VIRTUAL +); + +-- These inserts should be filtered based on the rules: +-- 1. id = 2 +-- 2. category = 'furniture' +-- 3. is_discounted = true (quantity > 10) +INSERT INTO t_virtual (id, price, quantity, category) VALUES (2, 100.00, 5, 'electronics'); -- filtered by id=2 +INSERT INTO t_virtual (id, price, quantity, category) VALUES (3, 200.00, 15, 'furniture'); -- filtered by category and is_discounted +INSERT INTO t_virtual (id, price, quantity, category) VALUES (5, 300.00, 20, 'electronics'); -- filtered by is_discounted +INSERT INTO t_virtual (id, price, quantity, category) VALUES (6, 400.00, 12, 'clothing'); -- filtered by is_discounted + +-- These inserts should not be filtered +INSERT INTO t_virtual (id, price, quantity, category) VALUES (1, 50.00, 2, 'clothing'); +INSERT INTO t_virtual (id, price, quantity, category) VALUES (4, 150.00, 1, 'books'); +INSERT INTO t_virtual (id, price, quantity, category) VALUES (7, 250.00, 8, 'electronics'); + /* all event of t_normal will be replicated to downstream */ CREATE TABLE t_normal ( id INT, @@ -111,28 +137,3 @@ CREATE TABLE t_name3 ( name varchar(128), PRIMARY KEY (id) ); - --- Table with virtual columns for testing ignore-insert-value-expr -CREATE TABLE t_virtual ( - id INT PRIMARY KEY, - price DECIMAL(10,2), - quantity INT, - total_price DECIMAL(10,2) AS (price * quantity) VIRTUAL, - discount DECIMAL(10,2) AS (CASE WHEN quantity > 10 THEN price * 0.9 ELSE price END) VIRTUAL, - category VARCHAR(20), - is_discounted BOOLEAN AS (quantity > 10) VIRTUAL -); - --- These inserts should be filtered based on the rules: --- 1. id = 2 --- 2. category = 'furniture' --- 3. is_discounted = true (quantity > 10) -INSERT INTO t_virtual (id, price, quantity, category) VALUES (2, 100.00, 5, 'electronics'); -- filtered by id=2 -INSERT INTO t_virtual (id, price, quantity, category) VALUES (3, 200.00, 15, 'furniture'); -- filtered by category and is_discounted -INSERT INTO t_virtual (id, price, quantity, category) VALUES (5, 300.00, 20, 'electronics'); -- filtered by is_discounted -INSERT INTO t_virtual (id, price, quantity, category) VALUES (6, 400.00, 12, 'clothing'); -- filtered by is_discounted - --- These inserts should not be filtered -INSERT INTO t_virtual (id, price, quantity, category) VALUES (1, 50.00, 2, 'clothing'); -INSERT INTO t_virtual (id, price, quantity, category) VALUES (4, 150.00, 1, 'books'); -INSERT INTO t_virtual (id, price, quantity, category) VALUES (7, 250.00, 8, 'electronics'); \ No newline at end of file From 8b9223459d602e72952a5763193778072917e883 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Fri, 4 Jul 2025 02:20:25 +0800 Subject: [PATCH 11/14] small fix --- tests/integration_tests/event_filter/conf/cf.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integration_tests/event_filter/conf/cf.toml b/tests/integration_tests/event_filter/conf/cf.toml index 38049ca85b6..9864716d414 100644 --- a/tests/integration_tests/event_filter/conf/cf.toml +++ b/tests/integration_tests/event_filter/conf/cf.toml @@ -6,6 +6,11 @@ matcher = ["event_filter.t1"] ignore-event = ["drop table", "delete"] ignore-insert-value-expr = "id = 2 or city = 'tokyo'" +# Filter rules for virtual column table +[[filter.event-filters]] +matcher = ["event_filter.t_virtual"] +ignore-insert-value-expr = "id = 2 or category = 'furniture' or is_discounted = true" + [[filter.event-filters]] matcher = ["event_filter.t_truncate"] ignore-event = ["truncate table"] @@ -17,8 +22,3 @@ ignore-event = ["alter table"] [[filter.event-filters]] matcher = ["event_filter.t_name*"] ignore-event = ["rename table"] - -# Filter rules for virtual column table -[[filter.event-filters]] -matcher = ["event_filter.t_virtual"] -ignore-insert-value-expr = "id = 2 or category = 'furniture' or is_discounted = true" \ No newline at end of file From 2df780f4d74e8f1a5e44c4fe0db463cd0305f991 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 6 Jul 2025 22:43:26 +0800 Subject: [PATCH 12/14] fix tests --- dm/pkg/utils/common.go | 18 +++-------- pkg/filter/expr_filter_test.go | 40 ++++++++++++++++++++---- tests/integration_tests/syncpoint/run.sh | 2 +- 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/dm/pkg/utils/common.go b/dm/pkg/utils/common.go index 4509a4af21f..1326097afef 100644 --- a/dm/pkg/utils/common.go +++ b/dm/pkg/utils/common.go @@ -22,14 +22,13 @@ import ( "github.com/pingcap/tidb/pkg/expression/exprctx" "github.com/pingcap/tidb/pkg/expression/sessionexpr" infoschema "github.com/pingcap/tidb/pkg/infoschema/context" - timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/dbutil" "github.com/pingcap/tidb/pkg/util/filter" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/dm/pkg/log" "go.uber.org/zap" ) @@ -220,24 +219,15 @@ func NewSessionCtx(vars map[string]string) sessionctx.Context { } // AdjustBinaryProtocolForDatum converts the data in binlog to TiDB datum. -func AdjustBinaryProtocolForDatum(ctx sessionctx.Context, data []interface{}, cols []*timodel.ColumnInfo) ([]types.Datum, error) { +func AdjustBinaryProtocolForDatum(ctx sessionctx.Context, data []interface{}, cols []*model.ColumnInfo) ([]types.Datum, error) { ret := make([]types.Datum, 0, len(data)) - colIndex := 0 - for _, d := range data { + for i, d := range data { datum := types.NewDatum(d) - // fix the next not virtual column - for !model.IsColCDCVisible(cols[colIndex]) { - if colIndex >= len(cols) { - return nil, fmt.Errorf("colIndex out of bounds") - } - colIndex++ - } - castDatum, err := table.CastValue(ctx, datum, cols[colIndex], false, false) + castDatum, err := table.CastValue(ctx, datum, cols[i], false, false) if err != nil { return nil, err } ret = append(ret, castDatum) - colIndex++ } return ret, nil } diff --git a/pkg/filter/expr_filter_test.go b/pkg/filter/expr_filter_test.go index 629d3b59d0c..843a1f92873 100644 --- a/pkg/filter/expr_filter_test.go +++ b/pkg/filter/expr_filter_test.go @@ -14,9 +14,14 @@ package filter import ( + "fmt" "testing" "github.com/pingcap/errors" + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/dm/pkg/utils" @@ -25,6 +30,29 @@ import ( "github.com/stretchr/testify/require" ) +// adjustBinaryProtocolForDatumWithoutVirtualCol converts the data in binlog to TiDB datum. +func adjustBinaryProtocolForDatumWithoutVirtualCol(ctx sessionctx.Context, data []interface{}, cols []*timodel.ColumnInfo) ([]types.Datum, error) { + ret := make([]types.Datum, 0, len(data)) + colIndex := 0 + for _, d := range data { + datum := types.NewDatum(d) + // fix the next not virtual column + for !model.IsColCDCVisible(cols[colIndex]) { + if colIndex >= len(cols) { + return nil, fmt.Errorf("colIndex out of bounds") + } + colIndex++ + } + castDatum, err := table.CastValue(ctx, datum, cols[colIndex], false, false) + if err != nil { + return nil, err + } + ret = append(ret, castDatum) + colIndex++ + } + return ret, nil +} + func TestShouldSkipDMLBasic(t *testing.T) { helper := newTestHelper(t) defer helper.close() @@ -435,9 +463,9 @@ func TestShouldSkipDMLBasic(t *testing.T) { f, err := newExprFilter("", tc.cfg) require.Nil(t, err) for _, c := range tc.cases { - rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) + rowDatums, err := adjustBinaryProtocolForDatumWithoutVirtualCol(sessCtx, c.row, tableInfo.Columns) require.Nil(t, err) - preRowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.preRow, tableInfo.Columns) + preRowDatums, err := adjustBinaryProtocolForDatumWithoutVirtualCol(sessCtx, c.preRow, tableInfo.Columns) require.Nil(t, err) row := &model.RowChangedEvent{ TableInfo: &model.TableInfo{ @@ -554,9 +582,9 @@ func TestShouldSkipDMLError(t *testing.T) { f, err := newExprFilter("", tc.cfg) require.Nil(t, err) for _, c := range tc.cases { - rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) + rowDatums, err := adjustBinaryProtocolForDatumWithoutVirtualCol(sessCtx, c.row, tableInfo.Columns) require.Nil(t, err) - preRowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.preRow, tableInfo.Columns) + preRowDatums, err := adjustBinaryProtocolForDatumWithoutVirtualCol(sessCtx, c.preRow, tableInfo.Columns) require.Nil(t, err) row := &model.RowChangedEvent{ TableInfo: &model.TableInfo{ @@ -752,9 +780,9 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) { if c.updateDDl != "" { tableInfo = helper.execDDL(c.updateDDl) } - rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns) + rowDatums, err := adjustBinaryProtocolForDatumWithoutVirtualCol(sessCtx, c.row, tableInfo.Columns) require.Nil(t, err) - preRowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.preRow, tableInfo.Columns) + preRowDatums, err := adjustBinaryProtocolForDatumWithoutVirtualCol(sessCtx, c.preRow, tableInfo.Columns) require.Nil(t, err) row := &model.RowChangedEvent{ TableInfo: &model.TableInfo{ diff --git a/tests/integration_tests/syncpoint/run.sh b/tests/integration_tests/syncpoint/run.sh index 9098a381a5f..f109423a4d9 100755 --- a/tests/integration_tests/syncpoint/run.sh +++ b/tests/integration_tests/syncpoint/run.sh @@ -138,7 +138,7 @@ function checkDiff() { deployConfig ${primaryArr[$i]} ${secondaryArr[$i]} check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml done - rm $CUR/conf/diff_config.toml + rm -f $CUR/conf/diff_config.toml } function run() { From 9d8eac55fadbdf28bd241f2d20e27d18203da34e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 7 Jul 2025 10:17:51 +0800 Subject: [PATCH 13/14] try fix test --- tests/integration_tests/tidb_mysql_test/r/date_formats.result | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/tidb_mysql_test/r/date_formats.result b/tests/integration_tests/tidb_mysql_test/r/date_formats.result index 09968cab9ed..741f4b82a08 100644 --- a/tests/integration_tests/tidb_mysql_test/r/date_formats.result +++ b/tests/integration_tests/tidb_mysql_test/r/date_formats.result @@ -333,7 +333,7 @@ NULL explain select makedate(1997,1), addtime("31.12.97 11.59.59.999999 PM", "1 1.1.1.000002"),subtime("31.12.97 11.59.59.999999 PM", "1 1.1.1.000002"),timediff("01.01.97 11:59:59.000001 PM","31.12.95 11:59:59.000002 PM"),cast(str_to_date("15-01-2001 12:59:59", "%d-%m-%Y %H:%i:%S") as TIME), maketime(23,11,12),microsecond("1997-12-31 23:59:59.000001"); id estRows task access object operator info Projection_3 1.00 root 1997-01-01->Column#1, ->Column#2, ->Column#3, 00:00:00->Column#4, 12:59:59->Column#5, 23:11:12->Column#6, 1->Column#7 -└─TableDual_4 1.00 root rows:1 +└─TableDual_5 1.00 root rows:1 create table t2 (d date); insert into t2 values ('2004-07-14'),('2005-07-14'); select date_format(d,"%d") from t2 order by 1; From 42cb6b10277d1fd67c3d72dd924244cc510f7eeb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 7 Jul 2025 15:31:46 +0800 Subject: [PATCH 14/14] add unit test --- cdc/model/mounter.go | 6 +++--- cdc/model/mounter_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 972bfefb181..e8292b6ef98 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -30,7 +30,7 @@ type RowChangedDatums struct { } // mergeDatumWithVirtualCols returns a slice of row datums with placeholders for virtual columns placed at the specified offset. -func (r *RowChangedDatums) mergeDatumWithVirtualCols(datums []types.Datum, virtualColsOffset []int) []types.Datum { +func mergeDatumWithVirtualCols(datums []types.Datum, virtualColsOffset []int) []types.Datum { if len(virtualColsOffset) == 0 { return datums } @@ -66,12 +66,12 @@ func (r *RowChangedDatums) mergeDatumWithVirtualCols(datums []types.Datum, virtu // RowDatumsWithVirtualCols returns the row datums with placeholders for virtual columns placed at the specified offset. func (r *RowChangedDatums) RowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { - return r.mergeDatumWithVirtualCols(r.RowDatums, virtualColsOffset) + return mergeDatumWithVirtualCols(r.RowDatums, virtualColsOffset) } // PreRowDatumsWithVirtualCols returns the pre row datums with placeholders for virtual columns placed at the specified offset. func (r *RowChangedDatums) PreRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum { - return r.mergeDatumWithVirtualCols(r.PreRowDatums, virtualColsOffset) + return mergeDatumWithVirtualCols(r.PreRowDatums, virtualColsOffset) } // IsEmpty returns true if the RowChangeDatums is empty. diff --git a/cdc/model/mounter_test.go b/cdc/model/mounter_test.go index dd5e29a3f24..0be5ce92575 100644 --- a/cdc/model/mounter_test.go +++ b/cdc/model/mounter_test.go @@ -17,6 +17,7 @@ import ( "math/rand" "testing" + "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/require" ) @@ -128,3 +129,37 @@ func TestComparePolymorphicEvents(t *testing.T) { require.True(t, ComparePolymorphicEvents(item.a, item.b)) } } + +func TestMergeDatumWithVirtualCols(t *testing.T) { + t.Parallel() + datums := []types.Datum{ + types.NewStringDatum("I"), + types.NewStringDatum("employee"), + types.NewStringDatum("hr"), + types.NewStringDatum("433305438660591626"), + types.NewStringDatum("101"), + types.NewStringDatum("Smith"), + types.NewStringDatum("Bob"), + types.NewStringDatum("2014-06-04"), + types.NewDatum(nil), + } + virtualColsOffset := []int{0, 3, 11} + mergedDatums := mergeDatumWithVirtualCols(datums, virtualColsOffset) + + // Original length (9) + virtual columns (3) = 12 + require.Len(t, mergedDatums, 12) + + // Verify positions of virtual columns and original data + for _, pos := range virtualColsOffset { + // Virtual columns should be empty Datums + require.Equal(t, types.Datum{}, mergedDatums[pos], + "Virtual column at position %d should be empty", pos) + } + + // Verify original data is in correct positions (skipping virtual columns) + originalIndices := []int{1, 2, 4, 5, 6, 7, 8, 9, 10} + for i, origIdx := range originalIndices { + require.Equal(t, datums[i], mergedDatums[origIdx], + "Original datum at position %d doesn't match", origIdx) + } +}