Skip to content

Commit 1e9b23d

Browse files
committed
Rename TableConfig to TableCreateConfig
1 parent 4bed098 commit 1e9b23d

File tree

6 files changed

+23
-21
lines changed

6 files changed

+23
-21
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ public class StarRocksDataSink implements DataSink, Serializable {
3737
private final StarRocksSinkOptions sinkOptions;
3838

3939
/** Configurations for creating a StarRocks table. */
40-
private final TableConfig tableConfig;
40+
private final TableCreateConfig tableCreateConfig;
4141

4242
/** Configurations for schema change. */
4343
private final SchemaChangeConfig schemaChangeConfig;
4444

4545
public StarRocksDataSink(
4646
StarRocksSinkOptions sinkOptions,
47-
TableConfig tableConfig,
47+
TableCreateConfig tableCreateConfig,
4848
SchemaChangeConfig schemaChangeConfig) {
4949
this.sinkOptions = sinkOptions;
50-
this.tableConfig = tableConfig;
50+
this.tableCreateConfig = tableCreateConfig;
5151
this.schemaChangeConfig = schemaChangeConfig;
5252
}
5353

@@ -65,6 +65,6 @@ public MetadataApplier getMetadataApplier() {
6565
sinkOptions.getJdbcUrl(),
6666
sinkOptions.getUsername(),
6767
sinkOptions.getPassword());
68-
return new StarRocksMetadataApplier(catalog, tableConfig, schemaChangeConfig);
68+
return new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
6969
}
7070
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public class StarRocksDataSinkFactory implements DataSinkFactory {
5252
@Override
5353
public DataSink createDataSink(Context context) {
5454
StarRocksSinkOptions sinkOptions = buildSinkConnectorOptions(context.getConfiguration());
55-
TableConfig tableConfig = TableConfig.from(context.getConfiguration());
55+
TableCreateConfig tableCreateConfig = TableCreateConfig.from(context.getConfiguration());
5656
SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(context.getConfiguration());
57-
return new StarRocksDataSink(sinkOptions, tableConfig, schemaChangeConfig);
57+
return new StarRocksDataSink(sinkOptions, tableCreateConfig, schemaChangeConfig);
5858
}
5959

6060
private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,16 @@ public class StarRocksMetadataApplier implements MetadataApplier {
4646
private static final Logger LOG = LoggerFactory.getLogger(StarRocksMetadataApplier.class);
4747

4848
private final StarRocksCatalog catalog;
49-
private final TableConfig tableConfig;
49+
private final TableCreateConfig tableCreateConfig;
5050
private final SchemaChangeConfig schemaChangeConfig;
5151
private boolean isOpened;
5252

5353
public StarRocksMetadataApplier(
5454
StarRocksCatalog catalog,
55-
TableConfig tableConfig,
55+
TableCreateConfig tableCreateConfig,
5656
SchemaChangeConfig schemaChangeConfig) {
5757
this.catalog = catalog;
58-
this.tableConfig = tableConfig;
58+
this.tableCreateConfig = tableCreateConfig;
5959
this.schemaChangeConfig = schemaChangeConfig;
6060
this.isOpened = false;
6161
}
@@ -86,7 +86,9 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
8686
private void applyCreateTable(CreateTableEvent createTableEvent) {
8787
StarRocksTable starRocksTable =
8888
StarRocksUtils.toStarRocksTable(
89-
createTableEvent.tableId(), createTableEvent.getSchema(), tableConfig);
89+
createTableEvent.tableId(),
90+
createTableEvent.getSchema(),
91+
tableCreateConfig);
9092
if (!catalog.databaseExists(starRocksTable.getDatabaseName())) {
9193
catalog.createDatabase(starRocksTable.getDatabaseName(), true);
9294
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class StarRocksUtils {
5353

5454
/** Convert a source table to {@link StarRocksTable}. */
5555
public static StarRocksTable toStarRocksTable(
56-
TableId tableId, Schema schema, TableConfig tableConfig) {
56+
TableId tableId, Schema schema, TableCreateConfig tableCreateConfig) {
5757
if (schema.primaryKeys().isEmpty()) {
5858
throw new RuntimeException(
5959
String.format(
@@ -96,10 +96,10 @@ public static StarRocksTable toStarRocksTable(
9696
// use primary keys as distribution keys by default
9797
.setDistributionKeys(schema.primaryKeys())
9898
.setComment(schema.comment());
99-
if (tableConfig.getNumBuckets().isPresent()) {
100-
tableBuilder.setNumBuckets(tableConfig.getNumBuckets().get());
99+
if (tableCreateConfig.getNumBuckets().isPresent()) {
100+
tableBuilder.setNumBuckets(tableCreateConfig.getNumBuckets().get());
101101
}
102-
tableBuilder.setTableProperties(tableConfig.getProperties());
102+
tableBuilder.setTableProperties(tableCreateConfig.getProperties());
103103
return tableBuilder.build();
104104
}
105105

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* primary_key_table/#create-a-table">StarRocks Documentation</a> for how to create a StarRocks
3737
* primary key table.
3838
*/
39-
public class TableConfig implements Serializable {
39+
public class TableCreateConfig implements Serializable {
4040

4141
private static final long serialVersionUID = 1L;
4242

@@ -46,7 +46,7 @@ public class TableConfig implements Serializable {
4646
/** Properties for the table. */
4747
private final Map<String, String> properties;
4848

49-
public TableConfig(@Nullable Integer numBuckets, Map<String, String> properties) {
49+
public TableCreateConfig(@Nullable Integer numBuckets, Map<String, String> properties) {
5050
this.numBuckets = numBuckets;
5151
this.properties = new HashMap<>(properties);
5252
}
@@ -59,7 +59,7 @@ public Map<String, String> getProperties() {
5959
return Collections.unmodifiableMap(properties);
6060
}
6161

62-
public static TableConfig from(Configuration config) {
62+
public static TableCreateConfig from(Configuration config) {
6363
Integer numBuckets = config.get(TABLE_CREATE_NUM_BUCKETS);
6464
Map<String, String> tableProperties =
6565
config.toMap().entrySet().stream()
@@ -73,6 +73,6 @@ public static TableConfig from(Configuration config) {
7373
.length())
7474
.toLowerCase(),
7575
Map.Entry::getValue));
76-
return new TableConfig(numBuckets, tableProperties);
76+
return new TableCreateConfig(numBuckets, tableProperties);
7777
}
7878
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import com.ververica.cdc.common.types.TimestampType;
3535
import com.ververica.cdc.connectors.starrocks.sink.SchemaChangeConfig;
3636
import com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier;
37-
import com.ververica.cdc.connectors.starrocks.sink.TableConfig;
37+
import com.ververica.cdc.connectors.starrocks.sink.TableCreateConfig;
3838
import org.junit.Before;
3939
import org.junit.Test;
4040

@@ -64,10 +64,10 @@ public void setup() {
6464
.put("table.create.properties.replication_num", "5")
6565
.build());
6666
SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(configuration);
67-
TableConfig tableConfig = TableConfig.from(configuration);
67+
TableCreateConfig tableCreateConfig = TableCreateConfig.from(configuration);
6868
this.catalog = new MockStarRocksCatalog();
6969
this.metadataApplier =
70-
new StarRocksMetadataApplier(catalog, tableConfig, schemaChangeConfig);
70+
new StarRocksMetadataApplier(catalog, tableCreateConfig, schemaChangeConfig);
7171
}
7272

7373
@Test

0 commit comments

Comments
 (0)