Skip to content

Commit cd19d56

Browse files
committed
Rename EventRecordSerializationSchema
1 parent 0cdefda commit cd19d56

File tree

4 files changed

+24
-11
lines changed

4 files changed

+24
-11
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.starrocks.connector.flink.table.data.DefaultStarRocksRowData;
2020
import com.starrocks.connector.flink.table.data.StarRocksRowData;
21-
import com.starrocks.connector.flink.table.sink.v2.RecordSerializer;
21+
import com.starrocks.connector.flink.table.sink.v2.RecordSerializationSchema;
2222
import com.starrocks.connector.flink.tools.JsonWrapper;
2323
import com.ververica.cdc.common.data.RecordData;
2424
import com.ververica.cdc.common.event.CreateTableEvent;
@@ -38,7 +38,7 @@
3838
import static com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.createFieldGetter;
3939

4040
/** Serializer for the input {@link Event}. It will serialize a row to a json string. */
41-
public class EventRecordSerializer implements RecordSerializer<Event> {
41+
public class EventRecordSerializationSchema implements RecordSerializationSchema<Event> {
4242

4343
private static final long serialVersionUID = 1L;
4444

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public StarRocksDataSink(
5454
@Override
5555
public EventSinkProvider getEventSinkProvider() {
5656
StarRocksSink<Event> starRocksSink =
57-
SinkFunctionFactory.createSink(sinkOptions, new EventRecordSerializer());
57+
SinkFunctionFactory.createSink(sinkOptions, new EventRecordSerializationSchema());
5858
return FlinkSinkProvider.of(starRocksSink);
5959
}
6060

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,14 @@ private void applyCreateTable(CreateTableEvent createTableEvent) {
8383
if (!catalog.databaseExists(starRocksTable.getDatabaseName())) {
8484
catalog.createDatabase(starRocksTable.getDatabaseName(), true);
8585
}
86-
catalog.createTable(starRocksTable, true);
86+
87+
try {
88+
catalog.createTable(starRocksTable, true);
89+
LOG.info("Successful to create table, event: {}", createTableEvent);
90+
} catch (StarRocksCatalogException e) {
91+
LOG.error("Failed to create table, event: {}", createTableEvent.tableId(), e);
92+
throw new RuntimeException("Failed to create table, event: " + createTableEvent, e);
93+
}
8794
}
8895

8996
private void applyAddColumn(AddColumnEvent addColumnEvent) {
@@ -119,7 +126,10 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) {
119126
alterException = e;
120127
}
121128

122-
// Check the actual table
129+
// Check whether the columns have been actually added to the table.
130+
// This is useful for duplicate schema change after failover. Adding
131+
// same columns will fail on StarRocks side, but it should be successful
132+
// on CDC side
123133
StarRocksTable table = null;
124134
try {
125135
table = catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
@@ -184,7 +194,10 @@ private void applyDropColumn(DropColumnEvent dropColumnEvent) {
184194
alterException = e;
185195
}
186196

187-
// Check the actual table
197+
// Check whether the columns have been actually dropped from the table.
198+
// This is useful for duplicate schema change after failover. Drop
199+
// non-existed columns will fail on StarRocks side, but it should be
200+
// successful on CDC side
188201
StarRocksTable table = null;
189202
try {
190203
table = catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import com.ververica.cdc.common.types.TimestampType;
4141
import com.ververica.cdc.common.types.VarCharType;
4242
import com.ververica.cdc.common.utils.SchemaUtils;
43-
import com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializer;
43+
import com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema;
4444
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
4545
import org.junit.After;
4646
import org.junit.Before;
@@ -56,15 +56,15 @@
5656
import static org.junit.Assert.assertEquals;
5757
import static org.junit.Assert.assertNull;
5858

59-
/** Tests for {@link EventRecordSerializer}. */
60-
public class EventRecordSerializerTest {
59+
/** Tests for {@link EventRecordSerializationSchema}. */
60+
public class EventRecordSerializationSchemaTest {
6161

62-
private EventRecordSerializer serializer;
62+
private EventRecordSerializationSchema serializer;
6363
private ObjectMapper objectMapper;
6464

6565
@Before
6666
public void setup() {
67-
this.serializer = new EventRecordSerializer();
67+
this.serializer = new EventRecordSerializationSchema();
6868
this.serializer.open();
6969
this.objectMapper = new ObjectMapper();
7070
}

0 commit comments

Comments
 (0)