Skip to content

Commit 1599c95

Browse files
authored
codec(ticdc): fix incorrect encoding default "null" in Avro protocol (#12191)
close #11994
1 parent dae0aed commit 1599c95

File tree

8 files changed

+67
-18
lines changed

8 files changed

+67
-18
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ require (
5959
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
6060
github.com/klauspost/compress v1.18.0
6161
github.com/labstack/gommon v0.4.0
62-
github.com/linkedin/goavro/v2 v2.11.1
62+
github.com/linkedin/goavro/v2 v2.14.0
6363
github.com/mailru/easyjson v0.7.7
6464
github.com/mattn/go-shellwords v1.0.12
6565
github.com/modern-go/reflect2 v1.0.2

go.sum

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -744,8 +744,8 @@ github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNB
744744
github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
745745
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
746746
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
747-
github.com/linkedin/goavro/v2 v2.11.1 h1:4cuAtbDfqkKnBXp9E+tRkIJGa6W6iAjwonwt8O1f4U0=
748-
github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
747+
github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
748+
github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
749749
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
750750
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a h1:N9zuLhTvBSRt0gWSiJswwQ2HqDmtX/ZCDJURnKUt1Ik=
751751
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
@@ -1053,6 +1053,7 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
10531053
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
10541054
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
10551055
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
1056+
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
10561057
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
10571058
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
10581059
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=

pkg/sink/codec/avro/avro.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -560,10 +560,7 @@ func (a *BatchEncoder) columns2AvroSchema(tableName model.TableName, input avroE
560560
}
561561
} else {
562562
if colx.GetFlag().IsNullable() {
563-
// the string literal "null" must be coerced to a `nil`
564-
// see https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/record.go#L109-L114
565-
// https://stackoverflow.com/questions/22938124/avro-field-default-values
566-
if defaultValue == nil || defaultValue == "null" {
563+
if defaultValue == nil {
567564
field["type"] = []interface{}{"null", avroType}
568565
} else {
569566
field["type"] = []interface{}{avroType, "null"}
@@ -1044,3 +1041,10 @@ func SetupEncoderAndSchemaRegistry4Testing(
10441041
func TeardownEncoderAndSchemaRegistry4Testing() {
10451042
stopHTTPInterceptForTestingRegistry()
10461043
}
1044+
1045+
// GenCodec generate avro codec.
1046+
// Don't treat string literal "null" as null, because we can distinguish them.
1047+
// See https://github.com/pingcap/tiflow/issues/11994
1048+
func GenCodec(schema string) (*goavro.Codec, error) {
1049+
return goavro.NewCodecWithOptions(schema, &goavro.CodecOption{EnableStringNull: false})
1050+
}

pkg/sink/codec/avro/avro_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ func TestAvroEnvelope(t *testing.T) {
247247
t.Parallel()
248248
cManager := &confluentSchemaManager{}
249249
gManager := &glueSchemaManager{}
250-
avroCodec, err := goavro.NewCodec(`
250+
avroCodec, err := GenCodec(`
251251
{
252252
"type": "record",
253253
"name": "testdb.avroenvelope",
@@ -391,3 +391,38 @@ func TestArvoAppendRowChangedEventWithCallback(t *testing.T) {
391391
require.Equal(t, expected, count, "expected one callback be called")
392392
}
393393
}
394+
395+
func TestStringNull(t *testing.T) {
396+
_, err := goavro.NewCodecWithOptions(`{
397+
"type": "record",
398+
"name": "test",
399+
"fields":
400+
[
401+
{
402+
"type": [
403+
"string",
404+
"null"
405+
],
406+
"default": "null",
407+
"name": "field"
408+
}
409+
]
410+
}`, &goavro.CodecOption{EnableStringNull: true})
411+
require.Error(t, err)
412+
_, err = goavro.NewCodecWithOptions(`{
413+
"type": "record",
414+
"name": "test",
415+
"fields":
416+
[
417+
{
418+
"type": [
419+
"string",
420+
"null"
421+
],
422+
"default": "null",
423+
"name": "field"
424+
}
425+
]
426+
}`, &goavro.CodecOption{EnableStringNull: false})
427+
require.NoError(t, err)
428+
}

pkg/sink/codec/avro/confluent_schema_registry.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,12 @@ func (m *confluentSchemaManager) Lookup(
267267
}
268268

269269
cacheEntry := new(schemaCacheEntry)
270-
cacheEntry.codec, err = goavro.NewCodec(jsonResp.Schema)
270+
codec, err := GenCodec(jsonResp.Schema)
271271
if err != nil {
272272
log.Error("Creating Avro codec failed", zap.Error(err))
273273
return nil, cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
274274
}
275+
cacheEntry.codec = codec
275276
cacheEntry.schemaID.confluentSchemaID = schemaID.confluentSchemaID
276277
cacheEntry.header, err = m.getMsgHeader(schemaID.confluentSchemaID)
277278
if err != nil {
@@ -314,7 +315,7 @@ func (m *confluentSchemaManager) GetCachedOrRegister(
314315
return nil, nil, err
315316
}
316317

317-
codec, err := goavro.NewCodec(schema)
318+
codec, err := GenCodec(schema)
318319
if err != nil {
319320
log.Error("GetCachedOrRegister: Could not make goavro codec", zap.Error(err))
320321
return nil, nil, cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)

pkg/sink/codec/avro/confluent_schema_registry_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestSchemaRegistry(t *testing.T) {
4747
_, err = manager.Lookup(ctx, topic, schemaID{confluentSchemaID: 1})
4848
require.Regexp(t, `.*not\sfound.*`, err)
4949

50-
codec, err := goavro.NewCodec(`{
50+
codec, err := GenCodec(`{
5151
"type": "record",
5252
"name": "test",
5353
"fields":
@@ -67,7 +67,7 @@ func TestSchemaRegistry(t *testing.T) {
6767
require.NoError(t, err)
6868
require.Equal(t, codec.CanonicalSchema(), codec2.CanonicalSchema())
6969

70-
codec, err = goavro.NewCodec(`{
70+
codec, err = GenCodec(`{
7171
"type": "record",
7272
"name": "test",
7373
"fields":
@@ -83,7 +83,15 @@ func TestSchemaRegistry(t *testing.T) {
8383
],
8484
"default": null,
8585
"name": "field2"
86-
}
86+
},
87+
{
88+
"type": [
89+
"string",
90+
"null"
91+
],
92+
"default": "null",
93+
"name": "field3"
94+
}
8795
]
8896
}`)
8997
require.NoError(t, err)
@@ -122,7 +130,7 @@ func TestSchemaRegistryIdempotent(t *testing.T) {
122130
require.NoError(t, err)
123131
}
124132

125-
codec, err := goavro.NewCodec(`{
133+
codec, err := goavro.NewCodecWithOptions(`{
126134
"type": "record",
127135
"name": "test",
128136
"fields":
@@ -140,7 +148,7 @@ func TestSchemaRegistryIdempotent(t *testing.T) {
140148
"name": "field2"
141149
}
142150
]
143-
}`)
151+
}`, &goavro.CodecOption{EnableStringNull: false})
144152
require.NoError(t, err)
145153

146154
id := 0

pkg/sink/codec/avro/glue_schema_registry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func (m *glueSchemaManager) Lookup(
153153
GenWithStackByArgs("schema not found in registry, name: %s, id: %s", schemaName, schemaID.glueSchemaID)
154154
}
155155

156-
codec, err := goavro.NewCodec(schema)
156+
codec, err := GenCodec(schema)
157157
if err != nil {
158158
log.Error("could not make goavro codec", zap.Error(err))
159159
return nil, cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
@@ -206,7 +206,7 @@ func (m *glueSchemaManager) GetCachedOrRegister(
206206
return nil, nil, err
207207
}
208208

209-
codec, err := goavro.NewCodec(schema)
209+
codec, err := GenCodec(schema)
210210
if err != nil {
211211
log.Error("GetCachedOrRegister: Could not make goavro codec", zap.Error(err))
212212
return nil, nil, cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)

pkg/sink/codec/simple/marshaller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ type avroMarshaller struct {
105105
}
106106

107107
func newAvroMarshaller(config *common.Config, schema string) (*avroMarshaller, error) {
108-
codec, err := goavro.NewCodec(schema)
108+
codec, err := goavro.NewCodecWithOptions(schema, &goavro.CodecOption{EnableStringNull: false})
109109
return &avroMarshaller{
110110
codec: codec,
111111
config: config,

0 commit comments

Comments
 (0)