Skip to content

Commit a6221a5

Browse files
authored
codec(ticdc): support header line for CSV protocol (#12183)
close #12179
1 parent 6171842 commit a6221a5

File tree

13 files changed

+196
-11
lines changed

13 files changed

+196
-11
lines changed

cdc/api/v2/model.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
295295
BinaryEncodingMethod: c.Sink.CSVConfig.BinaryEncodingMethod,
296296
OutputOldValue: c.Sink.CSVConfig.OutputOldValue,
297297
OutputHandleKey: c.Sink.CSVConfig.OutputHandleKey,
298+
OutputFieldHeader: c.Sink.CSVConfig.OutputFieldHeader,
298299
}
299300
}
300301
var pulsarConfig *config.PulsarConfig
@@ -599,6 +600,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
599600
BinaryEncodingMethod: cloned.Sink.CSVConfig.BinaryEncodingMethod,
600601
OutputOldValue: cloned.Sink.CSVConfig.OutputOldValue,
601602
OutputHandleKey: cloned.Sink.CSVConfig.OutputHandleKey,
603+
OutputFieldHeader: cloned.Sink.CSVConfig.OutputFieldHeader,
602604
}
603605
}
604606
var kafkaConfig *KafkaConfig
@@ -983,6 +985,7 @@ type CSVConfig struct {
983985
BinaryEncodingMethod string `json:"binary_encoding_method"`
984986
OutputOldValue bool `json:"output_old_value"`
985987
OutputHandleKey bool `json:"output_handle_key"`
988+
OutputFieldHeader bool `json:"output_field_header"`
986989
}
987990

988991
// LargeMessageHandleConfig denotes the large message handling config

cdc/sink/dmlsink/cloudstorage/dml_worker.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,12 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
255255
buf := bytes.NewBuffer(make([]byte, 0, task.size))
256256
rowsCnt := 0
257257
bytesCnt := int64(0)
258+
// There is always only one message here in task.msgs
258259
for _, msg := range task.msgs {
260+
if msg.Key != nil && rowsCnt == 0 {
261+
buf.Write(msg.Key)
262+
bytesCnt += int64(len(msg.Key))
263+
}
259264
bytesCnt += int64(len(msg.Value))
260265
rowsCnt += msg.GetRowsCount()
261266
buf.Write(msg.Value)

pkg/config/config_test_data.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ const (
247247
"include-commit-ts": true,
248248
"binary-encoding-method":"base64",
249249
"output-old-value": false,
250-
"output-handle-key": false
250+
"output-handle-key": false,
251+
"output-field-header": false
251252
},
252253
"date-separator": "month",
253254
"enable-partition-separator": true,
@@ -423,7 +424,8 @@ const (
423424
"include-commit-ts": true,
424425
"binary-encoding-method":"base64",
425426
"output-old-value": false,
426-
"output-handle-key": false
427+
"output-handle-key": false,
428+
"output-field-header": false
427429
},
428430
"terminator": "\r\n",
429431
"transaction-atomicity": "",

pkg/config/sink.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ type CSVConfig struct {
260260
OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
261261
// output handle key
262262
OutputHandleKey bool `toml:"output-handle-key" json:"output-handle-key"`
263+
// output field header
264+
OutputFieldHeader bool `toml:"output-field-header" json:"output-field-header"`
263265
}
264266

265267
func (c *CSVConfig) validateAndAdjust() error {

pkg/sink/codec/common/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ type Config struct {
8989
DebeziumDisableSchema bool
9090
// Debezium only. Whether before value should be included in the output.
9191
DebeziumOutputOldValue bool
92+
// CSV only. Whether header should be included in the output.
93+
CSVOutputFieldHeader bool
9294
}
9395

9496
// EncodingFormatType is the type of encoding format
@@ -129,6 +131,7 @@ func NewConfig(protocol config.Protocol) *Config {
129131
DebeziumOutputOldValue: true,
130132
OpenOutputOldValue: true,
131133
DebeziumDisableSchema: false,
134+
CSVOutputFieldHeader: false,
132135
}
133136
}
134137

@@ -233,6 +236,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
233236
c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod
234237
c.OutputOldValue = replicaConfig.Sink.CSVConfig.OutputOldValue
235238
c.OutputHandleKey = replicaConfig.Sink.CSVConfig.OutputHandleKey
239+
c.CSVOutputFieldHeader = replicaConfig.Sink.CSVConfig.OutputFieldHeader
236240
}
237241
if replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.LargeMessageHandle != nil {
238242
c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle

pkg/sink/codec/csv/csv_decoder.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ import (
1818
"io"
1919

2020
"github.com/pingcap/errors"
21+
"github.com/pingcap/log"
2122
lconfig "github.com/pingcap/tidb/pkg/lightning/config"
2223
"github.com/pingcap/tidb/pkg/lightning/mydump"
2324
"github.com/pingcap/tidb/pkg/lightning/worker"
2425
"github.com/pingcap/tiflow/cdc/model"
2526
cerror "github.com/pingcap/tiflow/pkg/errors"
2627
"github.com/pingcap/tiflow/pkg/sink/codec"
2728
"github.com/pingcap/tiflow/pkg/sink/codec/common"
29+
"go.uber.org/zap"
2830
)
2931

3032
const defaultIOConcurrency = 1
@@ -52,11 +54,13 @@ func NewBatchDecoder(ctx context.Context,
5254
backslashEscape = true
5355
}
5456
cfg := &lconfig.CSVConfig{
55-
Separator: codecConfig.Delimiter,
56-
Delimiter: codecConfig.Quote,
57-
Terminator: codecConfig.Terminator,
58-
Null: []string{codecConfig.NullString},
59-
BackslashEscape: backslashEscape,
57+
Separator: codecConfig.Delimiter,
58+
Delimiter: codecConfig.Quote,
59+
Terminator: codecConfig.Terminator,
60+
Null: []string{codecConfig.NullString},
61+
BackslashEscape: backslashEscape,
62+
HeaderSchemaMatch: true,
63+
Header: codecConfig.CSVOutputFieldHeader,
6064
}
6165
csvParser, err := mydump.NewCSVParser(ctx, cfg,
6266
mydump.NewStringReader(string(value)),
@@ -65,6 +69,21 @@ func NewBatchDecoder(ctx context.Context,
6569
if err != nil {
6670
return nil, err
6771
}
72+
if codecConfig.CSVOutputFieldHeader {
73+
err := csvParser.ReadColumns()
74+
if err != nil {
75+
return nil, err
76+
}
77+
header := csvParser.Columns()
78+
log.Info("parser CSV header", zap.Any("header", header), zap.Any("cap", cap(header)))
79+
// check column name
80+
idx := len(header) - len(tableInfo.Columns)
81+
for i, col := range tableInfo.Columns {
82+
if col.Name.O != header[idx+i] {
83+
log.Panic("check column name order failed", zap.Any("col", col.Name.O), zap.Any("header", header[idx+i]))
84+
}
85+
}
86+
}
6887
return &batchDecoder{
6988
codecConfig: codecConfig,
7089
tableInfo: tableInfo,

pkg/sink/codec/csv/csv_encoder.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
// BatchEncoder encodes the events into the byte of a batch into.
2626
type BatchEncoder struct {
27+
header []byte
2728
valueBuf *bytes.Buffer
2829
callback func()
2930
batchSize int
@@ -36,6 +37,9 @@ func (b *BatchEncoder) AppendTxnEvent(
3637
callback func(),
3738
) error {
3839
for _, rowEvent := range e.Rows {
40+
if b.config.CSVOutputFieldHeader && b.batchSize == 0 {
41+
b.setHeader(rowEvent)
42+
}
3943
row, err := rowChangedEvent2CSVMsg(b.config, rowEvent)
4044
if err != nil {
4145
return err
@@ -53,7 +57,7 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
5357
return nil
5458
}
5559

56-
ret := common.NewMsg(config.ProtocolCsv, nil,
60+
ret := common.NewMsg(config.ProtocolCsv, b.header,
5761
b.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil)
5862
ret.SetRowsCount(b.batchSize)
5963
ret.Callback = b.callback
@@ -64,10 +68,29 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
6468
}
6569
b.callback = nil
6670
b.batchSize = 0
71+
b.header = nil
6772

6873
return []*common.Message{ret}
6974
}
7075

76+
func (b *BatchEncoder) setHeader(rowEvent *model.RowChangedEvent) {
77+
buf := &bytes.Buffer{}
78+
columns := rowEvent.Columns
79+
if rowEvent.IsDelete() {
80+
columns = rowEvent.PreColumns
81+
}
82+
colNames := make([]string, 0, len(columns))
83+
for _, col := range columns {
84+
if col == nil {
85+
continue
86+
}
87+
info := rowEvent.TableInfo.ForceGetColumnInfo(col.ColumnID)
88+
colNames = append(colNames, info.Name.O)
89+
}
90+
buf.Write(encodeHeader(b.config, colNames))
91+
b.header = buf.Bytes()
92+
}
93+
7194
// newBatchEncoder creates a new csv BatchEncoder.
7295
func newBatchEncoder(config *common.Config) codec.TxnEventEncoder {
7396
return &BatchEncoder{

pkg/sink/codec/csv/csv_encoder_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package csv
1515

1616
import (
17+
"strings"
1718
"testing"
1819

1920
"github.com/pingcap/tiflow/cdc/entry"
@@ -102,3 +103,55 @@ func TestCSVAppendRowChangedEventWithCallback(t *testing.T) {
102103
msgs[0].Callback()
103104
require.Equal(t, 1, count, "expected all callbacks to be called")
104105
}
106+
107+
func TestCSVBatchCodecWithHeader(t *testing.T) {
108+
helper := entry.NewSchemaTestHelper(t)
109+
defer helper.Close()
110+
111+
ddl := helper.DDL2Event("create table test.table1(col1 int primary key)")
112+
event1 := helper.DML2Event("insert into test.table1 values (1)", "test", "table1")
113+
event2 := helper.DML2Event("insert into test.table1 values (2)", "test", "table1")
114+
115+
cs := &model.SingleTableTxn{
116+
Rows: []*model.RowChangedEvent{
117+
event1,
118+
event2,
119+
},
120+
}
121+
cfg := &common.Config{
122+
Delimiter: ",",
123+
Quote: "\"",
124+
Terminator: "\n",
125+
NullString: "\\N",
126+
IncludeCommitTs: true,
127+
CSVOutputFieldHeader: true,
128+
}
129+
encoder := newBatchEncoder(cfg)
130+
err := encoder.AppendTxnEvent(cs, nil)
131+
require.Nil(t, err)
132+
messages := encoder.Build()
133+
require.Len(t, messages, 1)
134+
header := strings.Split(string(messages[0].Key), cfg.Terminator)[0]
135+
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,ticdc-meta$commit-ts,col1", header)
136+
require.Equal(t, len(cs.Rows), messages[0].GetRowsCount())
137+
138+
cfg.CSVOutputFieldHeader = false
139+
encoder = newBatchEncoder(cfg)
140+
err = encoder.AppendTxnEvent(cs, nil)
141+
require.Nil(t, err)
142+
messages1 := encoder.Build()
143+
require.Len(t, messages1, 1)
144+
require.Equal(t, messages1[0].Value, messages[0].Value)
145+
require.Equal(t, len(cs.Rows), messages1[0].GetRowsCount())
146+
147+
cfg.CSVOutputFieldHeader = true
148+
cs = &model.SingleTableTxn{
149+
TableInfo: ddl.TableInfo,
150+
Rows: nil,
151+
}
152+
encoder = newBatchEncoder(cfg)
153+
err = encoder.AppendTxnEvent(cs, nil)
154+
require.Nil(t, err)
155+
messages = encoder.Build()
156+
require.Len(t, messages, 0)
157+
}

pkg/sink/codec/csv/csv_message.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ func newCSVMessage(config *common.Config) *csvMessage {
100100
// Col2: Table name, the name of the source table.
101101
// Col3: Schema name, the name of the source schema.
102102
// Col4: Commit TS, the commit-ts of the source txn (optional).
103-
// Col5-n: one or more columns that represent the data to be changed.
103+
// Column 5: The is-update column only exists when the value of output-old-value is true.(optional)
104+
// Col6-n: one or more columns that represent the data to be changed.
104105
func (c *csvMessage) encode() []byte {
105106
strBuilder := new(strings.Builder)
106107
if c.opType == operationUpdate && c.config.OutputOldValue && len(c.preColumns) != 0 {
@@ -116,7 +117,7 @@ func (c *csvMessage) encode() []byte {
116117
c.encodeMeta(c.opType.String(), strBuilder)
117118
c.encodeColumns(c.columns, strBuilder)
118119
}
119-
return []byte(strBuilder.String())
120+
return common.UnsafeStringToBytes(strBuilder.String())
120121
}
121122

122123
func (c *csvMessage) encodeMeta(opType string, b *strings.Builder) {
@@ -486,3 +487,32 @@ func csvColumns2RowChangeColumns(csvConfig *common.Config, csvCols []any, ticols
486487

487488
return cols, nil
488489
}
490+
491+
// The header should contain the name corresponding to the file record field,
492+
// and should have the same number as the record field.
493+
// | ticdc-meta$operation | ticdc-meta$table | ticdc-meta$schema | ticdc-meta$commit-ts | ticdc-meta$is-update | col1 | col2 | ... |
494+
func encodeHeader(config *common.Config, colNames []string) []byte {
495+
if !config.CSVOutputFieldHeader {
496+
return nil
497+
}
498+
strBuilder := new(strings.Builder)
499+
strBuilder.WriteString("ticdc-meta$operation")
500+
strBuilder.WriteString(config.Delimiter)
501+
strBuilder.WriteString("ticdc-meta$table")
502+
strBuilder.WriteString(config.Delimiter)
503+
strBuilder.WriteString("ticdc-meta$schema")
504+
if config.IncludeCommitTs {
505+
strBuilder.WriteString(config.Delimiter)
506+
strBuilder.WriteString("ticdc-meta$commit-ts")
507+
}
508+
if config.OutputOldValue {
509+
strBuilder.WriteString(config.Delimiter)
510+
strBuilder.WriteString("ticdc-meta$is-update")
511+
}
512+
for _, name := range colNames {
513+
strBuilder.WriteString(config.Delimiter)
514+
strBuilder.WriteString(name)
515+
}
516+
strBuilder.WriteString(config.Terminator)
517+
return common.UnsafeStringToBytes(strBuilder.String())
518+
}

pkg/sink/codec/csv/csv_message_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,3 +1101,35 @@ func TestCSVMessageDecode(t *testing.T) {
11011101
}
11021102
}
11031103
}
1104+
1105+
func TestEncodeHeader(t *testing.T) {
1106+
cfg := &common.Config{
1107+
OutputOldValue: true,
1108+
IncludeCommitTs: true,
1109+
Delimiter: " ",
1110+
CSVOutputFieldHeader: true,
1111+
}
1112+
colNames := []string{"col1", "col2"}
1113+
header := encodeHeader(cfg, colNames)
1114+
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema ticdc-meta$commit-ts ticdc-meta$is-update col1 col2", string(header))
1115+
1116+
cfg.OutputOldValue = false
1117+
header = encodeHeader(cfg, colNames)
1118+
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema ticdc-meta$commit-ts col1 col2", string(header))
1119+
1120+
cfg.IncludeCommitTs = false
1121+
header = encodeHeader(cfg, colNames)
1122+
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema col1 col2", string(header))
1123+
1124+
cfg.Delimiter = ","
1125+
header = encodeHeader(cfg, colNames)
1126+
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,col1,col2", string(header))
1127+
1128+
cfg.Terminator = "\n"
1129+
header = encodeHeader(cfg, colNames)
1130+
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,col1,col2\n", string(header))
1131+
1132+
cfg.CSVOutputFieldHeader = false
1133+
header = encodeHeader(cfg, colNames)
1134+
require.Nil(t, header)
1135+
}

0 commit comments

Comments
 (0)