Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.mongodb.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
Expand Down Expand Up @@ -54,7 +54,7 @@
*/
public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetadata {

private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final String hosts;
private final String connectionOptions;
private final String username;
Expand Down Expand Up @@ -83,7 +83,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
protected List<String> metadataKeys;

public MongoDBTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
String hosts,
@Nullable String username,
@Nullable String password,
Expand Down Expand Up @@ -136,7 +136,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
scanContext.createTypeInformation(physicalSchema.toPhysicalRowDataType());

DebeziumDeserializationSchema<RowData> deserializer =
new MongoDBConnectorDeserializationSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;

import java.time.ZoneId;
import java.util.HashSet;
Expand Down Expand Up @@ -208,8 +207,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
? ZoneId.systemDefault()
: ZoneId.of(zoneId);

TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand All @@ -28,7 +29,6 @@
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;

import org.junit.Test;
Expand All @@ -43,7 +43,6 @@
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_ALL;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void testCommonProperties() {
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
MongoDBTableSource expectedSource =
new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
MY_HOSTS,
USER,
PASSWORD,
Expand Down Expand Up @@ -130,7 +129,7 @@ public void testOptionalProperties() {

MongoDBTableSource expectedSource =
new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
MY_HOSTS,
USER,
PASSWORD,
Expand Down Expand Up @@ -164,7 +163,7 @@ public void testMetadataColumns() {

MongoDBTableSource expectedSource =
new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA_WITH_METADATA,
MY_HOSTS,
USER,
PASSWORD,
Expand Down Expand Up @@ -222,7 +221,7 @@ private static DynamicTableSource createTableSource(
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(schema).toSchema(),
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public DebeziumSourceFunction<T> build() {
}

if (dbzProperties != null) {
dbzProperties.forEach(props::put);
props.putAll(dbzProperties);
// Add default configurations for compatibility when set the legacy mysql connector
// implementation
if (LEGACY_IMPLEMENTATION_VALUE.equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public MySqlSourceConfig createConfig(int subtaskId) {

// override the user-defined debezium properties
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
props.putAll(dbzProperties);
}

return new MySqlSourceConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.mysql.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
Expand Down Expand Up @@ -58,7 +58,7 @@
*/
public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata {

private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
Expand Down Expand Up @@ -91,7 +91,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
protected List<String> metadataKeys;

public MySqlTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
Expand Down Expand Up @@ -136,7 +136,7 @@ public MySqlTableSource(
}

public MySqlTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;

import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
Expand Down Expand Up @@ -89,8 +88,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));

TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
String serverId = validateAndGetServerId(config);
StartupOptions startupOptions = getStartupOptions(config);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
Expand Down Expand Up @@ -214,7 +212,7 @@ private static StartupOptions getStartupOptions(ReadableConfig config) {
}
}

private void validatePrimaryKeyIfEnableParallel(TableSchema physicalSchema) {
private void validatePrimaryKeyIfEnableParallel(ResolvedSchema physicalSchema) {
if (!physicalSchema.getPrimaryKey().isPresent()) {
throw new ValidationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
Expand All @@ -30,7 +31,6 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;

import org.junit.Test;
Expand All @@ -54,7 +54,6 @@
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -101,7 +100,7 @@ public void testCommonProperties() {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -140,7 +139,7 @@ public void testEnableParallelReadSource() {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -176,7 +175,7 @@ public void testEnableParallelReadSourceWithSingleServerId() {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -210,7 +209,7 @@ public void testEnableParallelReadSourceLatestOffset() {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -247,7 +246,7 @@ public void testOptionalProperties() {
dbzProperties.put("snapshot.mode", "never");
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3307,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -304,7 +303,7 @@ public void testStartupFromInitial() {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -369,7 +368,7 @@ public void testStartupFromLatestOffset() {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -406,8 +405,7 @@ public void testMetadataColumns() {

MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(
fromResolvedSchema(SCHEMA_WITH_METADATA)),
SCHEMA_WITH_METADATA,
3306,
MY_LOCALHOST,
MY_DATABASE,
Expand Down Expand Up @@ -646,7 +644,7 @@ private static DynamicTableSource createTableSource(
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(schema).toSchema(),
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public DebeziumSourceFunction<T> build() {
}

if (dbzProperties != null) {
dbzProperties.forEach(props::put);
props.putAll(dbzProperties);
}

return new DebeziumSourceFunction<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.oracle.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
Expand Down Expand Up @@ -52,7 +52,7 @@
*/
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {

private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
Expand All @@ -74,7 +74,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
protected List<String> metadataKeys;

public OracleTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
Expand Down Expand Up @@ -113,8 +113,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);

DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;

import com.ververica.cdc.debezium.table.DebeziumOptions;

Expand Down Expand Up @@ -110,8 +109,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
StartupOptions startupOptions = getStartupOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();

return new OracleTableSource(
physicalSchema,
Expand Down
Loading