Skip to content

Commit b144e40

Browse files
authored
event filter: fix panic when evaluate expressions for table with virtual columns (#12211)
close #12206
1 parent ac2fcf4 commit b144e40

File tree

10 files changed

+368
-30
lines changed

10 files changed

+368
-30
lines changed

cdc/model/mounter.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package model
1616
import (
1717
"context"
1818
"math"
19+
"sort"
1920

2021
"github.com/pingcap/log"
2122
"github.com/pingcap/tidb/pkg/types"
@@ -24,8 +25,53 @@ import (
2425

2526
// RowChangedDatums is used to store the changed datums of a row.
2627
type RowChangedDatums struct {
27-
RowDatums []types.Datum
28-
PreRowDatums []types.Datum
28+
RowDatums []types.Datum // row datums (excluding virtual columns)
29+
PreRowDatums []types.Datum // pre row datums (excluding virtual columns)
30+
}
31+
32+
// mergeDatumWithVirtualCols returns a slice of row datums with placeholders for virtual columns placed at the specified offset.
33+
func mergeDatumWithVirtualCols(datums []types.Datum, virtualColsOffset []int) []types.Datum {
34+
if len(virtualColsOffset) == 0 {
35+
return datums
36+
}
37+
if !sort.IntsAreSorted(virtualColsOffset) {
38+
log.Panic("virtual column offsets must be sorted",
39+
zap.Ints("virtualColsOffset", virtualColsOffset))
40+
}
41+
42+
// checks if virtual column offsets are within valid range.
43+
maxAllowedIndex := len(datums) + len(virtualColsOffset)
44+
for _, idx := range virtualColsOffset {
45+
if idx < 0 || idx >= maxAllowedIndex {
46+
log.Panic("invalid virtual column index",
47+
zap.Int("index", idx),
48+
zap.Int("maxAllowedIndex", maxAllowedIndex-1))
49+
}
50+
}
51+
52+
result := make([]types.Datum, 0, maxAllowedIndex)
53+
originalIdx := 0
54+
virtualIdx := 0
55+
for originalIdx < len(datums) || virtualIdx < len(virtualColsOffset) {
56+
if virtualIdx < len(virtualColsOffset) && virtualColsOffset[virtualIdx] == len(result) {
57+
result = append(result, types.Datum{})
58+
virtualIdx++
59+
} else if originalIdx < len(datums) {
60+
result = append(result, datums[originalIdx])
61+
originalIdx++
62+
}
63+
}
64+
return result
65+
}
66+
67+
// RowDatumsWithVirtualCols returns the row datums with placeholders for virtual columns placed at the specified offset.
68+
func (r *RowChangedDatums) RowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum {
69+
return mergeDatumWithVirtualCols(r.RowDatums, virtualColsOffset)
70+
}
71+
72+
// PreRowDatumsWithVirtualCols returns the pre row datums with placeholders for virtual columns placed at the specified offset.
73+
func (r *RowChangedDatums) PreRowDatumsWithVirtualCols(virtualColsOffset []int) []types.Datum {
74+
return mergeDatumWithVirtualCols(r.PreRowDatums, virtualColsOffset)
2975
}
3076

3177
// IsEmpty returns true if the RowChangeDatums is empty.

cdc/model/mounter_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"math/rand"
1818
"testing"
1919

20+
"github.com/pingcap/tidb/pkg/types"
2021
"github.com/stretchr/testify/require"
2122
)
2223

@@ -128,3 +129,37 @@ func TestComparePolymorphicEvents(t *testing.T) {
128129
require.True(t, ComparePolymorphicEvents(item.a, item.b))
129130
}
130131
}
132+
133+
func TestMergeDatumWithVirtualCols(t *testing.T) {
134+
t.Parallel()
135+
datums := []types.Datum{
136+
types.NewStringDatum("I"),
137+
types.NewStringDatum("employee"),
138+
types.NewStringDatum("hr"),
139+
types.NewStringDatum("433305438660591626"),
140+
types.NewStringDatum("101"),
141+
types.NewStringDatum("Smith"),
142+
types.NewStringDatum("Bob"),
143+
types.NewStringDatum("2014-06-04"),
144+
types.NewDatum(nil),
145+
}
146+
virtualColsOffset := []int{0, 3, 11}
147+
mergedDatums := mergeDatumWithVirtualCols(datums, virtualColsOffset)
148+
149+
// Original length (9) + virtual columns (3) = 12
150+
require.Len(t, mergedDatums, 12)
151+
152+
// Verify positions of virtual columns and original data
153+
for _, pos := range virtualColsOffset {
154+
// Virtual columns should be empty Datums
155+
require.Equal(t, types.Datum{}, mergedDatums[pos],
156+
"Virtual column at position %d should be empty", pos)
157+
}
158+
159+
// Verify original data is in correct positions (skipping virtual columns)
160+
originalIndices := []int{1, 2, 4, 5, 6, 7, 8, 9, 10}
161+
for i, origIdx := range originalIndices {
162+
require.Equal(t, datums[i], mergedDatums[origIdx],
163+
"Original datum at position %d doesn't match", origIdx)
164+
}
165+
}

cdc/model/schema_storage.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,12 @@ type TableInfo struct {
9090
// only for new row format decoder
9191
handleColID []int64
9292

93-
// number of virtual columns
94-
virtualColumnCount int
93+
// offset of virtual columns in TableInfo.Columns
94+
// for example:
95+
// Table has 4 columns: a (physical), b (physical), c (virtual), d (virtual)
96+
// TableInfo.Columns order: a, b, c, d
97+
// VirtualColumnsOffset will be [2, 3] (indices of virtual columns c and d)
98+
VirtualColumnsOffset []int
9599
// rowColInfosWithoutVirtualCols is the same as rowColInfos, but without virtual columns
96100
rowColInfosWithoutVirtualCols *[]rowcodec.ColInfo
97101
}
@@ -121,7 +125,6 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
121125

122126
rowColumnsCurrentOffset := 0
123127

124-
ti.virtualColumnCount = 0
125128
for i, col := range ti.Columns {
126129
ti.columnsOffset[col.ID] = i
127130
pkIsHandle := false
@@ -146,7 +149,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
146149
}
147150
}
148151
} else {
149-
ti.virtualColumnCount += 1
152+
ti.VirtualColumnsOffset = append(ti.VirtualColumnsOffset, i)
150153
}
151154
ti.rowColInfos[i] = rowcodec.ColInfo{
152155
ID: col.ID,
@@ -182,21 +185,21 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
182185
}
183186

184187
func (ti *TableInfo) initRowColInfosWithoutVirtualCols() {
185-
if ti.virtualColumnCount == 0 {
188+
if len(ti.VirtualColumnsOffset) == 0 {
186189
ti.rowColInfosWithoutVirtualCols = &ti.rowColInfos
187190
return
188191
}
189-
colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-ti.virtualColumnCount)
192+
colInfos := make([]rowcodec.ColInfo, 0, len(ti.rowColInfos)-len(ti.VirtualColumnsOffset))
190193
for i, col := range ti.Columns {
191194
if IsColCDCVisible(col) {
192195
colInfos = append(colInfos, ti.rowColInfos[i])
193196
}
194197
}
195-
if len(colInfos) != len(ti.rowColInfos)-ti.virtualColumnCount {
198+
if len(colInfos) != len(ti.rowColInfos)-len(ti.VirtualColumnsOffset) {
196199
log.Panic("invalid rowColInfosWithoutVirtualCols",
197200
zap.Int("len(colInfos)", len(colInfos)),
198201
zap.Int("len(ti.rowColInfos)", len(ti.rowColInfos)),
199-
zap.Int("ti.virtualColumnCount", ti.virtualColumnCount))
202+
zap.Any("ti.VirtualColumnsOffset", ti.VirtualColumnsOffset))
200203
}
201204
ti.rowColInfosWithoutVirtualCols = &colInfos
202205
}
@@ -388,7 +391,7 @@ func (ti *TableInfo) HasUniqueColumn() bool {
388391

389392
// HasVirtualColumns returns whether the table has virtual columns
390393
func (ti *TableInfo) HasVirtualColumns() bool {
391-
return ti.virtualColumnCount > 0
394+
return len(ti.VirtualColumnsOffset) > 0
392395
}
393396

394397
// IsEligible returns whether the table is a eligible table

cdc/model/schema_storage_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -511,15 +511,15 @@ func TestBuildTiDBTableInfoWithUniqueKey(t *testing.T) {
511511
require.Equal(t, columns[3].Flag, *tableInfo.ForceGetColumnFlagType(tableInfo.Columns[3].ID))
512512
}
513513

514-
func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) {
514+
func TestBuildTiDBTableInfoWithVirtualColumns(t *testing.T) {
515515
t.Parallel()
516516
ftNull := parser_types.NewFieldType(mysql.TypeUnspecified)
517517
ftNull.SetFlag(mysql.NotNullFlag)
518518

519519
ftNotNull := parser_types.NewFieldType(mysql.TypeUnspecified)
520520
ftNotNull.SetFlag(mysql.NotNullFlag | mysql.MultipleKeyFlag)
521521

522-
tableInfo := timodel.TableInfo{
522+
tidbTableInfo := timodel.TableInfo{
523523
Columns: []*timodel.ColumnInfo{
524524
{
525525
Name: pmodel.CIStr{O: "a"},
@@ -590,12 +590,16 @@ func TestBuildTiDBTableInfoWithoutVirtualColumns(t *testing.T) {
590590
IsCommonHandle: false,
591591
PKIsHandle: false,
592592
}
593-
infoWithourVirtualCols := BuildTiDBTableInfoWithoutVirtualColumns(&tableInfo)
593+
// test BuildTiDBTableInfoWithoutVirtualColumns
594+
infoWithourVirtualCols := BuildTiDBTableInfoWithoutVirtualColumns(&tidbTableInfo)
594595
require.Equal(t, 3, len(infoWithourVirtualCols.Columns))
595596
require.Equal(t, 0, infoWithourVirtualCols.Columns[0].Offset)
596597
require.Equal(t, "a", infoWithourVirtualCols.Columns[0].Name.O)
597598
require.Equal(t, 1, infoWithourVirtualCols.Columns[1].Offset)
598599
require.Equal(t, "b", infoWithourVirtualCols.Columns[1].Name.O)
599600
require.Equal(t, 2, infoWithourVirtualCols.Columns[2].Offset)
600601
require.Equal(t, "d", infoWithourVirtualCols.Columns[2].Name.O)
602+
603+
tableInfo := WrapTableInfo(100, "test", 1000, &tidbTableInfo)
604+
require.Equal(t, []int{2}, tableInfo.VirtualColumnsOffset)
601605
}

pkg/filter/expr_filter.go

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"github.com/pingcap/log"
2222
"github.com/pingcap/tidb/pkg/expression"
2323
"github.com/pingcap/tidb/pkg/parser"
24+
"github.com/pingcap/tidb/pkg/parser/ast"
2425
"github.com/pingcap/tidb/pkg/sessionctx"
26+
"github.com/pingcap/tidb/pkg/table"
2527
"github.com/pingcap/tidb/pkg/types"
2628
"github.com/pingcap/tidb/pkg/util/chunk"
2729
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
@@ -274,8 +276,9 @@ func (r *dmlExprFilterRule) shouldSkipDML(
274276
return false, err
275277
}
276278
return r.skipDMLByExpression(
277-
rawRow.RowDatums,
279+
rawRow.RowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
278280
exprs,
281+
ti,
279282
)
280283
case row.IsUpdate():
281284
oldExprs, err := r.getUpdateOldExpr(ti)
@@ -287,15 +290,17 @@ func (r *dmlExprFilterRule) shouldSkipDML(
287290
return false, err
288291
}
289292
ignoreOld, err := r.skipDMLByExpression(
290-
rawRow.PreRowDatums,
293+
rawRow.PreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
291294
oldExprs,
295+
ti,
292296
)
293297
if err != nil {
294298
return false, err
295299
}
296300
ignoreNew, err := r.skipDMLByExpression(
297-
rawRow.RowDatums,
301+
rawRow.RowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
298302
newExprs,
303+
ti,
299304
)
300305
if err != nil {
301306
return false, err
@@ -307,25 +312,70 @@ func (r *dmlExprFilterRule) shouldSkipDML(
307312
return false, err
308313
}
309314
return r.skipDMLByExpression(
310-
rawRow.PreRowDatums,
315+
rawRow.PreRowDatumsWithVirtualCols(ti.VirtualColumnsOffset),
311316
exprs,
317+
ti,
312318
)
313319
default:
314320
log.Warn("unknown row changed event type")
315321
return false, nil
316322
}
317323
}
318324

325+
func (r *dmlExprFilterRule) buildRowWithVirtualColumns(
326+
rowData []types.Datum,
327+
tableInfo *model.TableInfo,
328+
) (chunk.Row, error) {
329+
row := chunk.MutRowFromDatums(rowData).ToRow()
330+
if len(tableInfo.VirtualColumnsOffset) == 0 {
331+
// If there is no virtual column, we can return the row directly.
332+
return row, nil
333+
}
334+
335+
columns, _, err := expression.ColumnInfos2ColumnsAndNames(r.sessCtx.GetExprCtx(),
336+
ast.CIStr{} /* unused */, tableInfo.Name, tableInfo.Columns, tableInfo.TableInfo)
337+
if err != nil {
338+
return chunk.Row{}, err
339+
}
340+
var fts []*types.FieldType
341+
for _, col := range columns {
342+
fts = append(fts, col.GetType(r.sessCtx.GetExprCtx().GetEvalCtx()))
343+
}
344+
ch := chunk.NewEmptyChunk(fts)
345+
ch.AppendRow(row)
346+
347+
vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(r.sessCtx.GetExprCtx().GetEvalCtx(), columns)
348+
err = table.FillVirtualColumnValue(vColFts, vColOffsets, columns, tableInfo.Columns, r.sessCtx.GetExprCtx(), ch)
349+
if err != nil {
350+
return chunk.Row{}, err
351+
}
352+
return ch.GetRow(0), nil
353+
}
354+
355+
func collectVirtualColumnOffsetsAndTypes(ctx expression.EvalContext, cols []*expression.Column) ([]int, []*types.FieldType) {
356+
var offsets []int
357+
var fts []*types.FieldType
358+
for i, col := range cols {
359+
if col.VirtualExpr != nil {
360+
offsets = append(offsets, i)
361+
fts = append(fts, col.GetType(ctx))
362+
}
363+
}
364+
return offsets, fts
365+
}
366+
319367
func (r *dmlExprFilterRule) skipDMLByExpression(
320368
rowData []types.Datum,
321369
expr expression.Expression,
370+
tableInfo *model.TableInfo,
322371
) (bool, error) {
323372
if len(rowData) == 0 || expr == nil {
324373
return false, nil
325374
}
326-
327-
row := chunk.MutRowFromDatums(rowData).ToRow()
328-
375+
row, err := r.buildRowWithVirtualColumns(rowData, tableInfo)
376+
if err != nil {
377+
return false, errors.Trace(err)
378+
}
329379
d, err := expr.Eval(r.sessCtx.GetExprCtx().GetEvalCtx(), row)
330380
if err != nil {
331381
log.Error("failed to eval expression", zap.Error(err))

0 commit comments

Comments
 (0)