Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import com.ververica.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;

import java.time.ZoneId;

/** A {@link DataSource} for mysql cdc connector. */
@Internal
public class MySqlDataSource implements DataSource {
Expand All @@ -46,9 +44,7 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory) {
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
ZoneId.of(sourceConfig.getServerTimeZone()),
sourceConfig.isIncludeSchemaChanges());
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());

MySqlSource<Event> source =
new MySqlSource<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -63,10 +62,8 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private transient CustomMySqlAntlrDdlParser customParser;

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone,
boolean includeSchemaChanges) {
super(new MySqlSchemaDataTypeInference(), changelogMode, serverTimeZone);
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -83,16 +81,10 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
protected final DebeziumChangelogMode changelogMode;

/** The session time zone in database server. */
protected final ZoneId serverTimeZone;

public DebeziumEventDeserializationSchema(
SchemaDataTypeInference schemaDataTypeInference,
DebeziumChangelogMode changelogMode,
ZoneId serverTimeZone) {
SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) {
this.schemaDataTypeInference = schemaDataTypeInference;
this.changelogMode = changelogMode;
this.serverTimeZone = serverTimeZone;
}

@Override
Expand Down Expand Up @@ -322,8 +314,11 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) {
return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
throw new IllegalArgumentException(
"Unable to convert to TIMESTAMP from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}

protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
Expand All @@ -334,7 +329,7 @@ protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
return LocalZonedTimestampData.fromInstant(instant);
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
"Unable to convert to TIMESTAMP_LTZ from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
Expand Down