Skip to content

Commit f86ef92

Browse files
whhezhangchaoming.zcm
authored andcommitted
[pipeline-connector][mysql] fix timestamp with timezone format (apache#2952)
* fix ts with tz parser * test timestamp with default value * fix related test * use timestamp string in test cases
1 parent f25b463 commit f86ef92

File tree

5 files changed

+48
-30
lines changed

5 files changed

+48
-30
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.math.BigDecimal;
4646
import java.sql.Connection;
4747
import java.sql.Statement;
48+
import java.sql.Timestamp;
4849
import java.time.Instant;
4950
import java.time.ZoneId;
5051
import java.util.Arrays;
@@ -118,6 +119,7 @@ public void testMysql57TimeDataTypes() throws Throwable {
118119
DataTypes.TIMESTAMP(0),
119120
DataTypes.TIMESTAMP(3),
120121
DataTypes.TIMESTAMP(6),
122+
DataTypes.TIMESTAMP_LTZ(0),
121123
DataTypes.TIMESTAMP_LTZ(0));
122124

123125
Object[] expectedSnapshot =
@@ -131,10 +133,11 @@ public void testMysql57TimeDataTypes() throws Throwable {
131133
// Because Flink's BinaryWriter force write int value for TIME(6).
132134
// See BinaryWriter#write for detail.
133135
64822123,
134-
TimestampData.fromMillis(1595008822000L),
135-
TimestampData.fromMillis(1595008822123L),
136-
TimestampData.fromMillis(1595008822123L, 456000),
137-
LocalZonedTimestampData.fromEpochMillis(1595008822000L, 0)
136+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
137+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
138+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
139+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
140+
null
138141
};
139142

140143
Object[] expectedStreamRecord =
@@ -145,10 +148,11 @@ public void testMysql57TimeDataTypes() throws Throwable {
145148
64822000,
146149
64822123,
147150
null,
148-
TimestampData.fromMillis(1595008822000L),
149-
TimestampData.fromMillis(1595008822123L),
150-
TimestampData.fromMillis(1595008822123L, 456000),
151-
LocalZonedTimestampData.fromEpochMillis(1595008822000L, 0)
151+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
152+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
153+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
154+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
155+
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
152156
};
153157

154158
testTimeDataTypes(
@@ -170,7 +174,8 @@ public void testMysql8TimeDataTypes() throws Throwable {
170174
DataTypes.TIMESTAMP(6),
171175
DataTypes.TIMESTAMP_LTZ(0),
172176
DataTypes.TIMESTAMP_LTZ(3),
173-
DataTypes.TIMESTAMP_LTZ(6));
177+
DataTypes.TIMESTAMP_LTZ(6),
178+
DataTypes.TIMESTAMP_LTZ(0));
174179

175180
Object[] expectedSnapshot =
176181
new Object[] {
@@ -183,13 +188,13 @@ public void testMysql8TimeDataTypes() throws Throwable {
183188
// Because Flink's BinaryWriter force write int value for TIME(6).
184189
// See BinaryWriter#write for detail.
185190
64822123,
186-
TimestampData.fromMillis(1595008822000L),
187-
TimestampData.fromMillis(1595008822123L),
188-
TimestampData.fromMillis(1595008822123L, 456000),
189-
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22Z")),
190-
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22.123Z")),
191-
LocalZonedTimestampData.fromInstant(
192-
Instant.parse("2020-07-17T18:00:22.123456Z"))
191+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
192+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
193+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
194+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
195+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
196+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
197+
null
193198
};
194199

195200
Object[] expectedStreamRecord =
@@ -200,13 +205,13 @@ public void testMysql8TimeDataTypes() throws Throwable {
200205
64822000,
201206
64822123,
202207
null,
203-
TimestampData.fromMillis(1595008822000L),
204-
TimestampData.fromMillis(1595008822123L),
205-
TimestampData.fromMillis(1595008822123L, 456000),
206-
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22Z")),
207-
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22.123Z")),
208-
LocalZonedTimestampData.fromInstant(
209-
Instant.parse("2020-07-17T18:00:22.123456Z"))
208+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
209+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
210+
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
211+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
212+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
213+
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
214+
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
210215
};
211216

212217
testTimeDataTypes(
@@ -317,6 +322,10 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
317322
assertThat(recordFields(streamRecord, COMMON_TYPES)).isEqualTo(expectedStreamRecord);
318323
}
319324

325+
private Instant toInstant(String ts) {
326+
return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
327+
}
328+
320329
private void testTimeDataTypes(
321330
UniqueDatabase database,
322331
RowType recordType,
@@ -340,7 +349,8 @@ private void testTimeDataTypes(
340349

341350
try (Connection connection = database.getJdbcConnection();
342351
Statement statement = connection.createStatement()) {
343-
statement.execute("UPDATE time_types SET time_6_c = null WHERE id = 1;");
352+
statement.execute(
353+
"UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;");
344354
}
345355

346356
List<Event> streamResults = fetchResults(iterator, 1);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public void testMysql57AccessTimeTypesSchema() {
130130
DataTypes.TIMESTAMP(0),
131131
DataTypes.TIMESTAMP(3),
132132
DataTypes.TIMESTAMP(6),
133+
DataTypes.TIMESTAMP_LTZ(0),
133134
DataTypes.TIMESTAMP_LTZ(0)
134135
},
135136
new String[] {
@@ -142,7 +143,8 @@ public void testMysql57AccessTimeTypesSchema() {
142143
"datetime_c",
143144
"datetime3_c",
144145
"datetime6_c",
145-
"timestamp_c"
146+
"timestamp_c",
147+
"timestamp_def_c"
146148
}))
147149
.build();
148150
assertThat(actualSchema).isEqualTo(expectedSchema);
@@ -176,7 +178,8 @@ public void testMysql8AccessTimeTypesSchema() {
176178
DataTypes.TIMESTAMP(6),
177179
DataTypes.TIMESTAMP_LTZ(0),
178180
DataTypes.TIMESTAMP_LTZ(3),
179-
DataTypes.TIMESTAMP_LTZ(6)
181+
DataTypes.TIMESTAMP_LTZ(6),
182+
DataTypes.TIMESTAMP_LTZ(0)
180183
},
181184
new String[] {
182185
"id",
@@ -190,7 +193,8 @@ public void testMysql8AccessTimeTypesSchema() {
190193
"datetime6_c",
191194
"timestamp_c",
192195
"timestamp3_c",
193-
"timestamp6_c"
196+
"timestamp6_c",
197+
"timestamp_def_c"
194198
}))
195199
.build();
196200
assertThat(actualSchema).isEqualTo(expectedSchema);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ CREATE TABLE time_types
106106
datetime3_c DATETIME(3),
107107
datetime6_c DATETIME(6),
108108
timestamp_c TIMESTAMP NULL,
109+
timestamp_def_c TIMESTAMP NULL DEFAULT '2000-01-01 00:00:00',
109110
PRIMARY KEY (id)
110111
) DEFAULT CHARSET=utf8;
111112

@@ -119,4 +120,5 @@ VALUES (DEFAULT,
119120
'2020-07-17 18:00:22',
120121
'2020-07-17 18:00:22.123',
121122
'2020-07-17 18:00:22.123456',
122-
'2020-07-17 18:00:22');
123+
'2020-07-17 18:00:22',
124+
NULL);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ CREATE TABLE time_types
108108
timestamp_c TIMESTAMP(0),
109109
timestamp3_c TIMESTAMP(3),
110110
timestamp6_c TIMESTAMP(6),
111+
timestamp_def_c TIMESTAMP NULL DEFAULT '2000-01-01 00:00:00',
111112
PRIMARY KEY (id)
112113
) DEFAULT CHARSET=utf8;
113114

@@ -123,4 +124,5 @@ VALUES (DEFAULT,
123124
'2020-07-17 18:00:22.123456',
124125
'2020-07-17 18:00:22',
125126
'2020-07-17 18:00:22.123',
126-
'2020-07-17 18:00:22.123456');
127+
'2020-07-17 18:00:22.123456',
128+
NULL);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ protected DataType inferString(Object value, Schema schema) {
142142
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
143143
int nano =
144144
Optional.ofNullable((String) value)
145-
.map(Instant::parse)
145+
.map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from))
146146
.map(Instant::getNano)
147147
.orElse(0);
148148

0 commit comments

Comments
 (0)