Skip to content

Commit b6f77f2

Browse files
authored
sink: check virtual columns in column dispatcher (#1867)
ref pingcap/tiflow#12241
1 parent 922c771 commit b6f77f2

File tree

3 files changed

+43
-22
lines changed

3 files changed

+43
-22
lines changed

downstreamadapter/sink/eventrouter/partition/columns.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/pingcap/log"
2121
"github.com/pingcap/ticdc/pkg/common"
2222
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
23-
"github.com/pingcap/ticdc/pkg/errors"
2423
"github.com/pingcap/ticdc/pkg/hash"
2524
"go.uber.org/zap"
2625
)
@@ -54,13 +53,10 @@ func (r *ColumnsPartitionGenerator) GeneratePartitionIndexAndKey(row *commonEven
5453
rowData = row.PreRow
5554
}
5655

57-
offsets, ok := tableInfo.OffsetsByNames(r.Columns)
58-
if !ok {
59-
log.Error("columns not found when dispatch event",
60-
zap.Any("tableName", tableInfo.GetTableName()),
61-
zap.Strings("columns", r.Columns))
62-
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
63-
"columns not found when dispatch event, table: %v, columns: %v", tableInfo.GetTableName(), r.Columns)
56+
offsets, err := tableInfo.OffsetsByNames(r.Columns)
57+
if err != nil {
58+
log.Error("dispatch event failed", zap.Error(err))
59+
return 0, "", err
6460
}
6561

6662
for _, idx := range offsets {

pkg/common/table_info.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"sync/atomic"
2424

2525
"github.com/pingcap/log"
26+
"github.com/pingcap/ticdc/pkg/errors"
2627
"github.com/pingcap/tidb/pkg/meta/model"
2728
"github.com/pingcap/tidb/pkg/parser/ast"
2829
"github.com/pingcap/tidb/pkg/types"
@@ -375,25 +376,36 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) {
375376
// Column is not case-sensitive on any platform, nor are column aliases.
376377
// So we always match in lowercase.
377378
// See also: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html
378-
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) {
379+
func (ti *TableInfo) OffsetsByNames(names []string) ([]int, error) {
379380
// todo: optimize it
380-
columnOffsets := make(map[string]int, len(ti.columnSchema.Columns))
381+
columnOffsets := make(map[string]int)
382+
virtualGeneratedColumn := make(map[string]struct{})
381383
for idx, col := range ti.columnSchema.Columns {
382384
if col != nil {
383-
columnOffsets[col.Name.L] = idx
385+
if IsColCDCVisible(col) {
386+
columnOffsets[col.Name.L] = idx
387+
} else {
388+
virtualGeneratedColumn[col.Name.L] = struct{}{}
389+
}
384390
}
385391
}
386392

387393
result := make([]int, 0, len(names))
388394
for _, col := range names {
389-
offset, ok := columnOffsets[strings.ToLower(col)]
395+
name := strings.ToLower(col)
396+
if _, ok := virtualGeneratedColumn[name]; ok {
397+
return nil, errors.ErrDispatcherFailed.GenWithStack(
398+
"found virtual generated columns when dispatch event, table: %v, columns: %v column: %v", ti.GetTableName(), names, name)
399+
}
400+
offset, ok := columnOffsets[name]
390401
if !ok {
391-
return nil, false
402+
return nil, errors.ErrDispatcherFailed.GenWithStack(
403+
"columns not found when dispatch event, table: %v, columns: %v, column: %v", ti.GetTableName(), names, name)
392404
}
393405
result = append(result, offset)
394406
}
395407

396-
return result, true
408+
return result, nil
397409
}
398410

399411
func (ti *TableInfo) HasPrimaryKey() bool {

pkg/common/table_info_helper_test.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package common
1616
import (
1717
"testing"
1818

19+
"github.com/pingcap/ticdc/pkg/errors"
1920
"github.com/pingcap/tidb/pkg/meta/model"
2021
"github.com/pingcap/tidb/pkg/parser/ast"
2122
"github.com/pingcap/tidb/pkg/parser/mysql"
@@ -207,28 +208,40 @@ func TestColumnsByNames(t *testing.T) {
207208
Name: ast.NewCIStr("col3"),
208209
ID: 2,
209210
},
211+
{
212+
Name: ast.NewCIStr("col4"),
213+
ID: 3,
214+
GeneratedExprString: "generated",
215+
},
210216
},
211217
})
212218

213219
names := []string{"col1", "col2", "col3"}
214-
offsets, ok := tableInfo.OffsetsByNames(names)
215-
require.True(t, ok)
220+
offsets, err := tableInfo.OffsetsByNames(names)
221+
require.NoError(t, err)
216222
require.Equal(t, []int{1, 0, 2}, offsets)
217223

218224
names = []string{"col2"}
219-
offsets, ok = tableInfo.OffsetsByNames(names)
220-
require.True(t, ok)
225+
offsets, err = tableInfo.OffsetsByNames(names)
226+
require.NoError(t, err)
221227
require.Equal(t, []int{0}, offsets)
222228

223229
names = []string{"col1", "col-not-found"}
224-
offsets, ok = tableInfo.OffsetsByNames(names)
225-
require.False(t, ok)
230+
offsets, err = tableInfo.OffsetsByNames(names)
231+
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
232+
require.ErrorContains(t, err, "columns not found")
226233
require.Nil(t, offsets)
227234

228235
names = []string{"Col1", "COL2", "CoL3"}
229-
offsets, ok = tableInfo.OffsetsByNames(names)
230-
require.True(t, ok)
236+
offsets, err = tableInfo.OffsetsByNames(names)
237+
require.NoError(t, err)
231238
require.Equal(t, []int{1, 0, 2}, offsets)
239+
240+
names = []string{"Col4"}
241+
offsets, err = tableInfo.OffsetsByNames(names)
242+
require.ErrorIs(t, err, errors.ErrDispatcherFailed)
243+
require.ErrorContains(t, err, "found virtual generated columns")
244+
require.Nil(t, offsets)
232245
}
233246

234247
func TestHandleKey(t *testing.T) {

0 commit comments

Comments
 (0)