Skip to content
50 changes: 48 additions & 2 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import (
"context"
"math"
"sort"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -24,8 +25,53 @@

// RowChangedDatums is used to store the changed datums of a row.
type RowChangedDatums struct {
RowDatums []types.Datum
PreRowDatums []types.Datum
RowDatums []types.Datum // row datums (excluding virtual columns)
PreRowDatums []types.Datum // pre row datums (excluding virtual columns)
}

// mergeDatumWithVirtualCols returns a slice of row datums with placeholders for virtual columns placed at the specified offset.
func mergeDatumWithVirtualCols(datums []types.Datum, virtualColsOffset []int) []types.Datum {
if len(virtualColsOffset) == 0 {
return datums
}

Check warning on line 36 in cdc/model/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/mounter.go#L35-L36

Added lines #L35 - L36 were not covered by tests
if !sort.IntsAreSorted(virtualColsOffset) {
log.Panic("virtual column offsets must be sorted",
zap.Ints("virtualColsOffset", virtualColsOffset))
}

Check warning on line 40 in cdc/model/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/mounter.go#L38-L40

Added lines #L38 - L40 were not covered by tests

// checks if virtual column offsets are within valid range.
maxAllowedIndex := len(datums) + len(virtualColsOffset)
for _, idx := range virtualColsOffset {
if idx < 0 || idx >= maxAllowedIndex {
log.Panic("invalid virtual column index",
zap.Int("index", idx),
zap.Int("maxAllowedIndex", maxAllowedIndex-1))
}

Check warning on line 49 in cdc/model/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/mounter.go#L46-L49

Added lines #L46 - L49 were not covered by tests
}

result := make([]types.Datum, 0, maxAllowedIndex)
originalIdx := 0
virtualIdx := 0
for originalIdx < len(datums) || virtualIdx < len(virtualColsOffset) {
if virtualIdx < len(virtualColsOffset) && virtualColsOffset[virtualIdx] == len(result) {
result = append(result, types.Datum{})
virtualIdx++
} else if originalIdx < len(datums) {
result = append(result, datums[originalIdx])
originalIdx++
}
}
return result
}

// RowDatumsWithVirtualCols returns the row datums with placeholders for virtual columns placed at the specified offset.
func (r *RowChangedDatums) RowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum {
return mergeDatumWithVirtualCols(r.RowDatums, virtualColsOffset)

Check warning on line 69 in cdc/model/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/mounter.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}

// PreRowDatumsWithVirtualCols returns the pre row datums with placeholders for virtual columns placed at the specified offset.
func (r *RowChangedDatums) PreRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum {
return mergeDatumWithVirtualCols(r.PreRowDatums, virtualColsOffset)

Check warning on line 74 in cdc/model/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/mounter.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}

// IsEmpty returns true if the RowChangeDatums is empty.
Expand Down
35 changes: 35 additions & 0 deletions cdc/model/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"math/rand"
"testing"

"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -128,3 +129,37 @@ func TestComparePolymorphicEvents(t *testing.T) {
require.True(t, ComparePolymorphicEvents(item.a, item.b))
}
}

func TestMergeDatumWithVirtualCols(t *testing.T) {
t.Parallel()
datums := []types.Datum{
types.NewStringDatum("I"),
types.NewStringDatum("employee"),
types.NewStringDatum("hr"),
types.NewStringDatum("433305438660591626"),
types.NewStringDatum("101"),
types.NewStringDatum("Smith"),
types.NewStringDatum("Bob"),
types.NewStringDatum("2014-06-04"),
types.NewDatum(nil),
}
virtualColsOffset := []int{0, 3, 11}
mergedDatums := mergeDatumWithVirtualCols(datums, virtualColsOffset)

// Original length (9) + virtual columns (3) = 12
require.Len(t, mergedDatums, 12)

// Verify positions of virtual columns and original data
for _, pos := range virtualColsOffset {
// Virtual columns should be empty Datums
require.Equal(t, types.Datum{}, mergedDatums[pos],
"Virtual column at position %d should be empty", pos)
}

// Verify original data is in correct positions (skipping virtual columns)
originalIndices := []int{1, 2, 4, 5, 6, 7, 8, 9, 10}
for i, origIdx := range originalIndices {
require.Equal(t, datums[i], mergedDatums[origIdx],
"Original datum at position %d doesn't match", origIdx)
}
}
21 changes: 12 additions & 9 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@
// only for new row format decoder
handleColID []int64

// number of virtual columns
virtualColumnCount int
// offset of virtual columns in TableInfo.Columns
// for example:
// Table has 4 columns: a (physical), b (physical), c (virtual), d (virtual)
// TableInfo.Columns order: a, b, c, d
// VirtualColumnsOffset will be [2, 3] (indices of virtual columns c and d)
VirtualColumnsOffset []int
// rowColInfosWithoutVirtualCols is the same as rowColInfos, but without virtual columns
rowColInfosWithoutVirtualCols *[]rowcodec.ColInfo
}
Expand Down Expand Up @@ -121,7 +125,6 @@

rowColumnsCurrentOffset := 0

ti.virtualColumnCount = 0
for i, col := range ti.Columns {
ti.columnsOffset[col.ID] = i
pkIsHandle := false
Expand All @@ -146,7 +149,7 @@
}
}
} else {
ti.virtualColumnCount += 1
ti.VirtualColumnsOffset = append(ti.VirtualColumnsOffset, i)
}
ti.rowColInfos[i] = rowcodec.ColInfo{
ID: col.ID,
Expand Down Expand Up @@ -182,21 +185,21 @@
}

func (ti *TableInfo) initRowColInfosWithoutVirtualCols() {
if ti.virtualColumnCount == 0 {
if len(ti.VirtualColumnsOffset) == 0 {
ti.rowColInfosWithoutVirtualCols = &ti.rowColInfos
return
}
colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-ti.virtualColumnCount)
colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-len(ti.VirtualColumnsOffset))
for i, col := range ti.Columns {
if IsColCDCVisible(col) {
colInfos = append(colInfos, ti.rowColInfos[i])
}
}
if len(colInfos) != len(ti.rowColInfos)-ti.virtualColumnCount {
if len(colInfos) != len(ti.rowColInfos)-len(ti.VirtualColumnsOffset) {
log.Panic("invalid rowColInfosWithoutVirtualCols",
zap.Int("len(colInfos)", len(colInfos)),
zap.Int("len(ti.rowColInfos)", len(ti.rowColInfos)),
zap.Int("ti.virtualColumnCount", ti.virtualColumnCount))
zap.Any("ti.VirtualColumnsOffset", ti.VirtualColumnsOffset))

Check warning on line 202 in cdc/model/schema_storage.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/schema_storage.go#L202

Added line #L202 was not covered by tests
}
ti.rowColInfosWithoutVirtualCols = &colInfos
}
Expand Down Expand Up @@ -388,7 +391,7 @@

// HasVirtualColumns returns whether the table has virtual columns
func (ti *TableInfo) HasVirtualColumns() bool {
return ti.virtualColumnCount > 0
return len(ti.VirtualColumnsOffset) > 0

Check warning on line 394 in cdc/model/schema_storage.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/schema_storage.go#L394

Added line #L394 was not covered by tests
}

// IsEligible returns whether the table is a eligible table
Expand Down
10 changes: 7 additions & 3 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,15 @@ func TestBuildTiDBTableInfoWithUniqueKey(t *testing.T) {
require.Equal(t, columns[3].Flag, *tableInfo.ForceGetColumnFlagType(tableInfo.Columns[3].ID))
}

func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) {
func TestBuildTiDBTableInfoWithVirtualColumns(t *testing.T) {
t.Parallel()
ftNull := parser_types.NewFieldType(mysql.TypeUnspecified)
ftNull.SetFlag(mysql.NotNullFlag)

ftNotNull := parser_types.NewFieldType(mysql.TypeUnspecified)
ftNotNull.SetFlag(mysql.NotNullFlag | mysql.MultipleKeyFlag)

tableInfo := timodel.TableInfo{
tidbTableInfo := timodel.TableInfo{
Columns: []*timodel.ColumnInfo{
{
Name: pmodel.CIStr{O: "a"},
Expand Down Expand Up @@ -590,12 +590,16 @@ func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) {
IsCommonHandle: false,
PKIsHandle: false,
}
infoWithourVirtualCols := BuildTiDBTableInfoWithoutVirtualColumns(&tableInfo)
// test BuildTiDBTableInfoWithoutVirtualColumns
infoWithourVirtualCols := BuildTiDBTableInfoWithoutVirtualColumns(&tidbTableInfo)
require.Equal(t, 3, len(infoWithourVirtualCols.Columns))
require.Equal(t, 0, infoWithourVirtualCols.Columns[0].Offset)
require.Equal(t, "a", infoWithourVirtualCols.Columns[0].Name.O)
require.Equal(t, 1, infoWithourVirtualCols.Columns[1].Offset)
require.Equal(t, "b", infoWithourVirtualCols.Columns[1].Name.O)
require.Equal(t, 2, infoWithourVirtualCols.Columns[2].Offset)
require.Equal(t, "d", infoWithourVirtualCols.Columns[2].Name.O)

tableInfo := WrapTableInfo(100, "test", 1000, &tidbTableInfo)
require.Equal(t, []int{2}, tableInfo.VirtualColumnsOffset)
}
64 changes: 57 additions & 7 deletions pkg/filter/expr_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
Expand Down Expand Up @@ -274,8 +276,9 @@
return false, err
}
return r.skipDMLByExpression(
rawRow.RowDatums,
rawRow.RowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
exprs,
ti,
)
case row.IsUpdate():
oldExprs, err := r.getUpdateOldExpr(ti)
Expand All @@ -287,15 +290,17 @@
return false, err
}
ignoreOld, err := r.skipDMLByExpression(
rawRow.PreRowDatums,
rawRow.PreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
oldExprs,
ti,
)
if err != nil {
return false, err
}
ignoreNew, err := r.skipDMLByExpression(
rawRow.RowDatums,
rawRow.RowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
newExprs,
ti,
)
if err != nil {
return false, err
Expand All @@ -307,25 +312,70 @@
return false, err
}
return r.skipDMLByExpression(
rawRow.PreRowDatums,
rawRow.PreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
exprs,
ti,
)
default:
log.Warn("unknown row changed event type")
return false, nil
}
}

func (r *dmlExprFilterRule) buildRowWithVirtualColumns(
rowData []types.Datum,
tableInfo *model.TableInfo,
) (chunk.Row, error) {
row := chunk.MutRowFromDatums(rowData).ToRow()
if len(tableInfo.VirtualColumnsOffset) == 0 {
// If there is no virtual column, we can return the row directly.
return row, nil
}

columns, _, err := expression.ColumnInfos2ColumnsAndNames(r.sessCtx.GetExprCtx(),
ast.CIStr{} /* unused */, tableInfo.Name, tableInfo.Columns, tableInfo.TableInfo)
if err != nil {
return chunk.Row{}, err
}

Check warning on line 339 in pkg/filter/expr_filter.go

View check run for this annotation

Codecov / codecov/patch

pkg/filter/expr_filter.go#L338-L339

Added lines #L338 - L339 were not covered by tests
var fts []*types.FieldType
for _, col := range columns {
fts = append(fts, col.GetType(r.sessCtx.GetExprCtx().GetEvalCtx()))
}
ch := chunk.NewEmptyChunk(fts)
ch.AppendRow(row)

vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(r.sessCtx.GetExprCtx().GetEvalCtx(), columns)
err = table.FillVirtualColumnValue(vColFts, vColOffsets, columns, tableInfo.Columns, r.sessCtx.GetExprCtx(), ch)
if err != nil {
return chunk.Row{}, err
}

Check warning on line 351 in pkg/filter/expr_filter.go

View check run for this annotation

Codecov / codecov/patch

pkg/filter/expr_filter.go#L350-L351

Added lines #L350 - L351 were not covered by tests
return ch.GetRow(0), nil
}

func collectVirtualColumnOffsetsAndTypes(ctx expression.EvalContext, cols []*expression.Column) ([]int, []*types.FieldType) {
var offsets []int
var fts []*types.FieldType
for i, col := range cols {
if col.VirtualExpr != nil {
offsets = append(offsets, i)
fts = append(fts, col.GetType(ctx))
}
}
return offsets, fts
}

func (r *dmlExprFilterRule) skipDMLByExpression(
rowData []types.Datum,
expr expression.Expression,
tableInfo *model.TableInfo,
) (bool, error) {
if len(rowData) == 0 || expr == nil {
return false, nil
}

row := chunk.MutRowFromDatums(rowData).ToRow()

row, err := r.buildRowWithVirtualColumns(rowData, tableInfo)
if err != nil {
return false, errors.Trace(err)
}

Check warning on line 378 in pkg/filter/expr_filter.go

View check run for this annotation

Codecov / codecov/patch

pkg/filter/expr_filter.go#L377-L378

Added lines #L377 - L378 were not covered by tests
d, err := expr.Eval(r.sessCtx.GetExprCtx().GetEvalCtx(), row)
if err != nil {
log.Error("failed to eval expression", zap.Error(err))
Expand Down
Loading