Skip to content

Commit 8287469

Browse files
fix: Address review comments
1 parent 3479cd1 commit 8287469

File tree

8 files changed

+50
-58
lines changed

8 files changed

+50
-58
lines changed

flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessSource.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static <T> Builder<T> builder() {
4343
public static class Builder<T> {
4444

4545
private String pluginName = "decoderbufs";
46-
private String slotName = "flink";
46+
private String name = "flink";
4747
private int port = 15991; // default 15991 port
4848
private String hostname;
4949
private String keyspace;
@@ -117,12 +117,11 @@ public Builder<T> password(String password) {
117117
}
118118

119119
/**
120-
* The name of the Vitess logical decoding slot that was created for streaming changes from
121-
* a particular plug-in for a particular database/schema. The server uses this slot to
122-
* stream events to the connector that you are configuring. Default is "flink".
120+
* Unique name for the connector. Attempting to register again with the same name will fail.
121+
* This property is required by all Kafka Connect connectors. Default is "flink".
123122
*/
124-
public Builder<T> slotName(String slotName) {
125-
this.slotName = slotName;
123+
public Builder<T> name(String name) {
124+
this.name = name;
126125
return this;
127126
}
128127

@@ -191,7 +190,7 @@ public DebeziumSourceFunction<T> build() {
191190
Properties props = new Properties();
192191
props.setProperty("connector.class", VitessConnector.class.getCanonicalName());
193192
props.setProperty("plugin.name", pluginName);
194-
props.setProperty("slot.name", slotName);
193+
props.setProperty("name", name);
195194
// hard code server name, because we don't need to distinguish it, docs:
196195
// Logical name that identifies and provides a namespace for the particular Vitess
197196
// Vtgate server/cluster being monitored. The logical name should be unique across

flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableFactory.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@
2121
import org.apache.flink.configuration.ConfigOption;
2222
import org.apache.flink.configuration.ConfigOptions;
2323
import org.apache.flink.configuration.ReadableConfig;
24-
import org.apache.flink.table.api.TableSchema;
24+
import org.apache.flink.table.catalog.ResolvedSchema;
2525
import org.apache.flink.table.connector.source.DynamicTableSource;
2626
import org.apache.flink.table.factories.DynamicTableFactory;
2727
import org.apache.flink.table.factories.DynamicTableSourceFactory;
2828
import org.apache.flink.table.factories.FactoryUtil;
29-
import org.apache.flink.table.utils.TableSchemaUtils;
3029

3130
import com.ververica.cdc.connectors.vitess.config.TabletType;
3231
import com.ververica.cdc.connectors.vitess.config.VtctldConfig;
@@ -117,14 +116,14 @@ public class VitessTableFactory implements DynamicTableSourceFactory {
117116
.withDescription(
118117
"The name of the Vitess logical decoding plug-in installed on the server.");
119118

120-
private static final ConfigOption<String> SLOT_NAME =
121-
ConfigOptions.key("slot.name")
119+
private static final ConfigOption<String> NAME =
120+
ConfigOptions.key("name")
122121
.stringType()
123122
.defaultValue("flink")
124123
.withDescription(
125-
"The name of the Vitess logical decoding slot that was created for streaming changes "
126-
+ "from a particular plug-in for a particular database/schema. The server uses this slot "
127-
+ "to stream events to the connector that you are configuring. Default is \"flink\".");
124+
"Unique name for the connector."
125+
+ " Attempting to register again with the same name will fail. "
126+
+ "This property is required by all Kafka Connect connectors. Default is flink.");
128127

129128
@Override
130129
public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
@@ -148,9 +147,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
148147
.build();
149148
TabletType tabletType = TabletType.valueOf(config.get(TABLET_TYPE));
150149
String pluginName = config.get(DECODING_PLUGIN_NAME);
151-
String slotName = config.get(SLOT_NAME);
152-
TableSchema physicalSchema =
153-
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
150+
String name = config.get(NAME);
151+
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
154152

155153
return new VitessTableSource(
156154
physicalSchema,
@@ -163,7 +161,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
163161
vtctldConfig,
164162
tabletType,
165163
pluginName,
166-
slotName,
164+
name,
167165
getDebeziumProperties(context.getCatalogTable().getOptions()));
168166
}
169167

@@ -191,7 +189,7 @@ public Set<ConfigOption<?>> optionalOptions() {
191189
options.add(PASSWORD);
192190
options.add(TABLET_TYPE);
193191
options.add(DECODING_PLUGIN_NAME);
194-
options.add(SLOT_NAME);
192+
options.add(NAME);
195193
return options;
196194
}
197195
}

flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableSource.java

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package com.ververica.cdc.connectors.vitess.table;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22-
import org.apache.flink.table.api.TableSchema;
22+
import org.apache.flink.table.catalog.ResolvedSchema;
2323
import org.apache.flink.table.connector.ChangelogMode;
2424
import org.apache.flink.table.connector.source.DynamicTableSource;
2525
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -47,21 +47,21 @@
4747
*/
4848
public class VitessTableSource implements ScanTableSource {
4949

50-
private final TableSchema physicalSchema;
51-
private String pluginName;
52-
private String slotName;
53-
private int port;
54-
private String hostname;
55-
private String keyspace;
56-
private String username;
57-
private String password;
58-
private String tableName;
59-
private VtctldConfig vtctldConfig;
60-
private TabletType tabletType;
61-
private Properties dbzProperties;
50+
private final ResolvedSchema physicalSchema;
51+
private final String pluginName;
52+
private final String name;
53+
private final int port;
54+
private final String hostname;
55+
private final String keyspace;
56+
private final String username;
57+
private final String password;
58+
private final String tableName;
59+
private final VtctldConfig vtctldConfig;
60+
private final TabletType tabletType;
61+
private final Properties dbzProperties;
6262

6363
public VitessTableSource(
64-
TableSchema physicalSchema,
64+
ResolvedSchema physicalSchema,
6565
int port,
6666
String hostname,
6767
String keyspace,
@@ -71,7 +71,7 @@ public VitessTableSource(
7171
VtctldConfig vtctldConfig,
7272
TabletType tabletType,
7373
String pluginName,
74-
String slotName,
74+
String name,
7575
Properties dbzProperties) {
7676
this.physicalSchema = physicalSchema;
7777
this.port = port;
@@ -83,7 +83,7 @@ public VitessTableSource(
8383
this.vtctldConfig = checkNotNull(vtctldConfig);
8484
this.tabletType = checkNotNull(tabletType);
8585
this.pluginName = checkNotNull(pluginName);
86-
this.slotName = slotName;
86+
this.name = name;
8787
this.dbzProperties = dbzProperties;
8888
}
8989

@@ -102,7 +102,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
102102
RowType physicalDataType =
103103
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
104104
TypeInformation<RowData> typeInfo =
105-
scanContext.createTypeInformation(physicalSchema.toRowDataType());
105+
scanContext.createTypeInformation(physicalSchema.toPhysicalRowDataType());
106+
106107
DebeziumDeserializationSchema<RowData> deserializer =
107108
RowDataDebeziumDeserializeSchema.newBuilder()
108109
.setPhysicalRowType(physicalDataType)
@@ -121,7 +122,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
121122
.tabletType(tabletType)
122123
.decodingPluginName(pluginName)
123124
.vtctldConfig(vtctldConfig)
124-
.slotName(slotName)
125+
.name(name)
125126
.debeziumProperties(dbzProperties)
126127
.deserializer(deserializer)
127128
.build();
@@ -141,7 +142,7 @@ public DynamicTableSource copy() {
141142
vtctldConfig,
142143
tabletType,
143144
pluginName,
144-
slotName,
145+
name,
145146
dbzProperties);
146147
}
147148

@@ -157,7 +158,7 @@ public boolean equals(Object o) {
157158
return port == that.port
158159
&& Objects.equals(physicalSchema, that.physicalSchema)
159160
&& Objects.equals(pluginName, that.pluginName)
160-
&& Objects.equals(slotName, that.slotName)
161+
&& Objects.equals(name, that.name)
161162
&& Objects.equals(hostname, that.hostname)
162163
&& Objects.equals(keyspace, that.keyspace)
163164
&& Objects.equals(username, that.username)
@@ -173,7 +174,7 @@ public int hashCode() {
173174
return Objects.hash(
174175
physicalSchema,
175176
pluginName,
176-
slotName,
177+
name,
177178
port,
178179
hostname,
179180
keyspace,
@@ -193,8 +194,8 @@ public String toString() {
193194
+ ", pluginName='"
194195
+ pluginName
195196
+ '\''
196-
+ ", slotName='"
197-
+ slotName
197+
+ ", name='"
198+
+ name
198199
+ '\''
199200
+ ", port="
200201
+ port

flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessSourceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class VitessSourceTest extends VitessTestBase {
6161

6262
@Before
6363
public void before() {
64-
initializeMysqlTable("inventory");
64+
initializeTable("inventory");
6565
}
6666

6767
@Test

flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public Connection getJdbcConnection() throws SQLException {
7676
* Executes a JDBC statement using the default jdbc config without autocommitting the
7777
* connection.
7878
*/
79-
protected void initializeMysqlTable(String sqlFile) {
79+
protected void initializeTable(String sqlFile) {
8080
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
8181
final URL ddlTestFile = VitessTestBase.class.getClassLoader().getResource(ddlFile);
8282
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);

flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessConnectorITCase.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void before() {
6262
@Test
6363
public void testConsumingAllEvents()
6464
throws SQLException, ExecutionException, InterruptedException {
65-
initializeMysqlTable("inventory");
65+
initializeTable("inventory");
6666
String sourceDDL =
6767
String.format(
6868
"CREATE TABLE debezium_source ("
@@ -152,9 +152,9 @@ public void testConsumingAllEvents()
152152
result.getJobClient().get().cancel().get();
153153
}
154154

155-
@Test // TODO add DATE, TIMESTAMP, TIME type mapping
155+
@Test
156156
public void testAllTypes() throws Throwable {
157-
initializeMysqlTable("column_type_test");
157+
initializeTable("column_type_test");
158158
String sourceDDL =
159159
String.format(
160160
"CREATE TABLE full_types (\n"
@@ -174,12 +174,6 @@ public void testAllTypes() throws Throwable {
174174
+ " decimal_c DECIMAL(8, 4),\n"
175175
+ " numeric_c DECIMAL(6, 0),\n"
176176
+ " boolean_c BOOLEAN,\n"
177-
// + " date_c DATE,\n"
178-
// + " time_c TIME(0),\n"
179-
// + " datetime3_c TIMESTAMP(3),\n"
180-
// + " datetime6_c TIMESTAMP(6),\n"
181-
// + " timestamp_c TIMESTAMP(0),\n"
182-
// + " file_uuid BYTES,\n"
183177
+ " primary key (`id`) not enforced"
184178
+ ") WITH ("
185179
+ " 'connector' = 'vitess-cdc',"

flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessTableFactoryTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.flink.table.connector.source.DynamicTableSource;
3131
import org.apache.flink.table.factories.Factory;
3232
import org.apache.flink.table.factories.FactoryUtil;
33-
import org.apache.flink.table.utils.TableSchemaUtils;
3433
import org.apache.flink.util.ExceptionUtils;
3534

3635
import com.ververica.cdc.connectors.vitess.config.TabletType;
@@ -64,6 +63,7 @@ public class VitessTableFactoryTest {
6463
new ArrayList<>(),
6564
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
6665

66+
private static final String MY_SCHEMA = "public";
6767
private static final String MY_LOCALHOST = "localhost";
6868
private static final String MY_USERNAME = "flinkuser";
6969
private static final String MY_PASSWORD = "flinkpw";
@@ -79,7 +79,7 @@ public void testCommonProperties() {
7979
DynamicTableSource actualSource = createTableSource(properties);
8080
VitessTableSource expectedSource =
8181
new VitessTableSource(
82-
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
82+
SCHEMA,
8383
15991,
8484
MY_LOCALHOST,
8585
MY_KEYSPACE,
@@ -101,7 +101,7 @@ public void testOptionalProperties() {
101101
options.put("vtctl.port", "5445");
102102
options.put("decoding.plugin.name", "wal2json");
103103
options.put("debezium.snapshot.mode", "never");
104-
options.put("slot.name", "flink");
104+
options.put("name", "flink");
105105
options.put("tablet-type", "MASTER");
106106
options.put("username", MY_USERNAME);
107107
options.put("password", MY_PASSWORD);
@@ -111,7 +111,7 @@ public void testOptionalProperties() {
111111
dbzProperties.put("snapshot.mode", "never");
112112
VitessTableSource expectedSource =
113113
new VitessTableSource(
114-
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
114+
SCHEMA,
115115
5444,
116116
MY_LOCALHOST,
117117
MY_KEYSPACE,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.vitess;
19+
package com.ververica.cdc.connectors.postgres;
2020

2121
/** This is used to generate a dummy docs jar for this module to pass OSS repository rule. */
2222
public class DummyDocs {}

0 commit comments

Comments
 (0)