Skip to content

Commit 35ff0a2

Browse files
authored
sink(ticdc): check virtual columns in column dispatcher (#12254)
close #12241
1 parent fb2490a commit 35ff0a2

File tree

5 files changed

+54
-27
lines changed

5 files changed

+54
-27
lines changed

cdc/model/schema_storage.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pingcap/tidb/pkg/parser/types"
2424
"github.com/pingcap/tidb/pkg/table/tables"
2525
"github.com/pingcap/tidb/pkg/util/rowcodec"
26+
"github.com/pingcap/tiflow/pkg/errors"
2627
"go.uber.org/zap"
2728
)
2829

@@ -469,25 +470,36 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) {
469470
// Column is not case-sensitive on any platform, nor are column aliases.
470471
// So we always match in lowercase.
471472
// See also: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html
472-
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) {
473+
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, error) {
473474
// todo: optimize it
474-
columnOffsets := make(map[string]int, len(ti.Columns))
475+
columnOffsets := make(map[string]int)
476+
virtualGeneratedColumn := make(map[string]struct{})
475477
for _, col := range ti.Columns {
476478
if col != nil {
477-
columnOffsets[col.Name.L] = ti.columnsOffset[col.ID]
479+
if IsColCDCVisible(col) {
480+
columnOffsets[col.Name.L] = ti.columnsOffset[col.ID]
481+
} else {
482+
virtualGeneratedColumn[col.Name.L] = struct{}{}
483+
}
478484
}
479485
}
480486

481487
result := make([]int, 0, len(names))
482488
for _, col := range names {
483-
offset, ok := columnOffsets[strings.ToLower(col)]
489+
name := strings.ToLower(col)
490+
if _, ok := virtualGeneratedColumn[name]; ok {
491+
return nil, errors.ErrDispatcherFailed.GenWithStack(
492+
"found virtual generated columns when dispatch event, table: %v, columns: %v column: %v", ti.GetTableName(), names, name)
493+
}
494+
offset, ok := columnOffsets[name]
484495
if !ok {
485-
return nil, false
496+
return nil, errors.ErrDispatcherFailed.GenWithStack(
497+
"columns not found when dispatch event, table: %v, columns: %v, column: %v", ti.GetTableName(), names, name)
486498
}
487499
result = append(result, offset)
488500
}
489501

490-
return result, true
502+
return result, nil
491503
}
492504

493505
// GetPrimaryKeyColumnNames returns the primary key column names

cdc/model/schema_storage_test.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/pingcap/tidb/pkg/parser/charset"
2222
"github.com/pingcap/tidb/pkg/parser/mysql"
2323
parser_types "github.com/pingcap/tidb/pkg/parser/types"
24+
"github.com/pingcap/tiflow/pkg/errors"
2425
"github.com/stretchr/testify/require"
2526
)
2627

@@ -342,28 +343,40 @@ func TestColumnsByNames(t *testing.T) {
342343
Name: pmodel.NewCIStr("col3"),
343344
ID: 2,
344345
},
346+
{
347+
Name: pmodel.NewCIStr("col4"),
348+
ID: 3,
349+
GeneratedExprString: "generated",
350+
},
345351
},
346352
})
347353

348354
names := []string{"col1", "col2", "col3"}
349-
offsets, ok := tableInfo.OffsetsByNames(names)
350-
require.True(t, ok)
355+
offsets, err := tableInfo.OffsetsByNames(names)
356+
require.NoError(t, err)
351357
require.Equal(t, []int{1, 0, 2}, offsets)
352358

353359
names = []string{"col2"}
354-
offsets, ok = tableInfo.OffsetsByNames(names)
355-
require.True(t, ok)
360+
offsets, err = tableInfo.OffsetsByNames(names)
361+
require.NoError(t, err)
356362
require.Equal(t, []int{0}, offsets)
357363

358364
names = []string{"col1", "col-not-found"}
359-
offsets, ok = tableInfo.OffsetsByNames(names)
360-
require.False(t, ok)
365+
offsets, err = tableInfo.OffsetsByNames(names)
366+
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
367+
require.ErrorContains(t, err, "columns not found")
361368
require.Nil(t, offsets)
362369

363370
names = []string{"Col1", "COL2", "CoL3"}
364-
offsets, ok = tableInfo.OffsetsByNames(names)
365-
require.True(t, ok)
371+
offsets, err = tableInfo.OffsetsByNames(names)
372+
require.NoError(t, err)
366373
require.Equal(t, []int{1, 0, 2}, offsets)
374+
375+
names = []string{"Col4"}
376+
offsets, err = tableInfo.OffsetsByNames(names)
377+
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
378+
require.ErrorContains(t, err, "found virtual generated columns")
379+
require.Nil(t, offsets)
367380
}
368381

369382
func TestBuildTiDBTableInfoWithIntPrimaryKey(t *testing.T) {

cdc/sink/dmlsink/mq/dispatcher/event_router.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,9 @@ func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error {
150150
}
151151
}
152152
case *partition.ColumnsDispatcher:
153-
_, ok := table.OffsetsByNames(v.Columns)
154-
if !ok {
155-
return cerror.ErrDispatcherFailed.GenWithStack(
156-
"columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns)
153+
_, err := table.OffsetsByNames(v.Columns)
154+
if err != nil {
155+
return err
157156
}
158157
default:
159158
}

cdc/sink/dmlsink/mq/dispatcher/partition/columns.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919

2020
"github.com/pingcap/log"
2121
"github.com/pingcap/tiflow/cdc/model"
22-
"github.com/pingcap/tiflow/pkg/errors"
2322
"github.com/pingcap/tiflow/pkg/hash"
2423
"go.uber.org/zap"
2524
)
@@ -55,13 +54,10 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent,
5554
dispatchCols = row.PreColumns
5655
}
5756

58-
offsets, ok := row.TableInfo.OffsetsByNames(r.Columns)
59-
if !ok {
60-
log.Error("columns not found when dispatch event",
61-
zap.Any("tableName", row.TableInfo.GetTableName()),
62-
zap.Strings("columns", r.Columns))
63-
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
64-
"columns not found when dispatch event, table: %v, columns: %v", row.TableInfo.GetTableName(), r.Columns)
57+
offsets, err := row.TableInfo.OffsetsByNames(r.Columns)
58+
if err != nil {
59+
log.Error("dispatch event failed", zap.Error(err))
60+
return 0, "", err
6561
}
6662

6763
for idx := 0; idx < len(r.Columns); idx++ {

cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func TestColumnsDispatcher(t *testing.T) {
3535
{ID: 1, Name: pmodel.NewCIStr("col2"), Offset: 1, FieldType: *types.NewFieldType(mysql.TypeLong)},
3636
{ID: 2, Name: pmodel.NewCIStr("col1"), Offset: 0, FieldType: *types.NewFieldType(mysql.TypeLong)},
3737
{ID: 3, Name: pmodel.NewCIStr("col3"), Offset: 2, FieldType: *types.NewFieldType(mysql.TypeLong)},
38+
{ID: 4, Name: pmodel.NewCIStr("col4"), Offset: 3, FieldType: *types.NewFieldType(mysql.TypeLong), GeneratedExprString: "generated"},
3839
},
3940
}
4041
tableInfo := model.WrapTableInfo(100, "test", 33, tidbTableInfo)
@@ -47,9 +48,15 @@ func TestColumnsDispatcher(t *testing.T) {
4748
},
4849
}
4950

50-
p := NewColumnsDispatcher([]string{"col-2", "col-not-found"})
51+
p := NewColumnsDispatcher([]string{"col2", "col-not-found"})
5152
_, _, err := p.DispatchRowChangedEvent(event, 16)
5253
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
54+
require.ErrorContains(t, err, "columns not found")
55+
56+
p = NewColumnsDispatcher([]string{"col4"})
57+
_, _, err = p.DispatchRowChangedEvent(event, 16)
58+
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
59+
require.ErrorContains(t, err, "found virtual generated columns")
5360

5461
p = NewColumnsDispatcher([]string{"col2", "col1"})
5562
index, _, err := p.DispatchRowChangedEvent(event, 16)

0 commit comments

Comments
 (0)