Skip to content

Commit 1446ad8

Browse files
committed
Accept startup options in table factory
1 parent 724bc84 commit 1446ad8

File tree

1 file changed

+7
-20
lines changed

1 file changed

+7
-20
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.table.connector.source.DynamicTableSource;
2424
import org.apache.flink.table.factories.DynamicTableSourceFactory;
2525
import org.apache.flink.table.factories.FactoryUtil;
26-
import org.apache.flink.util.Preconditions;
2726

2827
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
2928
import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange;
@@ -109,7 +108,6 @@ public DynamicTableSource createDynamicTableSource(Context context) {
109108
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
110109
if (enableParallelRead) {
111110
validatePrimaryKeyIfEnableParallel(physicalSchema);
112-
validateStartupOptionIfEnableParallel(startupOptions);
113111
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
114112
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
115113
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
@@ -204,15 +202,15 @@ private static StartupOptions getStartupOptions(ReadableConfig config) {
204202
return StartupOptions.latest();
205203

206204
case SCAN_STARTUP_MODE_VALUE_EARLIEST:
205+
return StartupOptions.earliest();
206+
207207
case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
208+
return StartupOptions.specificOffset(
209+
config.get(SCAN_STARTUP_SPECIFIC_OFFSET_FILE),
210+
config.get(SCAN_STARTUP_SPECIFIC_OFFSET_POS));
211+
208212
case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
209-
throw new ValidationException(
210-
String.format(
211-
"Unsupported option value '%s', the options [%s, %s, %s] are not supported correctly, please do not use them until they're correctly supported",
212-
modeString,
213-
SCAN_STARTUP_MODE_VALUE_EARLIEST,
214-
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
215-
SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
213+
return StartupOptions.timestamp(config.get(SCAN_STARTUP_TIMESTAMP_MILLIS));
216214

217215
default:
218216
throw new ValidationException(
@@ -234,17 +232,6 @@ private void validatePrimaryKeyIfEnableParallel(ResolvedSchema physicalSchema) {
234232
}
235233
}
236234

237-
private void validateStartupOptionIfEnableParallel(StartupOptions startupOptions) {
238-
// validate mode
239-
Preconditions.checkState(
240-
startupOptions.startupMode == StartupMode.INITIAL
241-
|| startupOptions.startupMode == StartupMode.LATEST_OFFSET,
242-
String.format(
243-
"MySql Parallel Source only supports startup mode 'initial' and 'latest-offset',"
244-
+ " but actual is %s",
245-
startupOptions.startupMode));
246-
}
247-
248235
private String validateAndGetServerId(ReadableConfig configuration) {
249236
final String serverIdValue = configuration.get(MySqlSourceOptions.SERVER_ID);
250237
if (serverIdValue != null) {

0 commit comments

Comments
 (0)