Skip to content

Commit 0567bd3

Browse files
author
yexianxun
committed
[cdc-core] Fix BaseSchema and PooledDataSourceFactory classes Serialization exception
1 parent 7fb7012 commit 0567bd3

File tree

19 files changed

+95
-54
lines changed

19 files changed

+95
-54
lines changed

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/assigner/HybridSplitAssigner.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package com.ververica.cdc.connectors.core.assigner;
2020

21-
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsState;
2221
import com.ververica.cdc.connectors.core.assigner.state.HybridPendingSplitsState;
22+
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsState;
2323
import com.ververica.cdc.connectors.core.config.SourceConfig;
2424
import com.ververica.cdc.connectors.core.dialect.SnapshotEventDialect;
2525
import com.ververica.cdc.connectors.core.offset.Offset;
@@ -63,7 +63,7 @@ public HybridSplitAssigner(
6363
boolean isTableIdCaseSensitive,
6464
SnapshotEventDialect dialect,
6565
OffsetFactory offsetFactory,
66-
BaseSchema baseSchema) {
66+
BaseSchema.SchemaFactory schemaFactory) {
6767
this(
6868
new SnapshotSplitAssigner(
6969
sourceConfig,
@@ -72,7 +72,7 @@ public HybridSplitAssigner(
7272
isTableIdCaseSensitive,
7373
dialect,
7474
offsetFactory,
75-
baseSchema),
75+
schemaFactory),
7676
false,
7777
sourceConfig.getSplitMetaGroupSize());
7878
this.offsetFactory = offsetFactory;
@@ -84,15 +84,15 @@ public HybridSplitAssigner(
8484
HybridPendingSplitsState checkpoint,
8585
SnapshotEventDialect dialect,
8686
OffsetFactory offsetFactory,
87-
BaseSchema baseSchema) {
87+
BaseSchema.SchemaFactory schemaFactory) {
8888
this(
8989
new SnapshotSplitAssigner(
9090
sourceConfig,
9191
currentParallelism,
9292
checkpoint.getSnapshotPendingSplits(),
9393
dialect,
9494
offsetFactory,
95-
baseSchema),
95+
schemaFactory),
9696
checkpoint.isStreamSplitAssigned(),
9797
sourceConfig.getSplitMetaGroupSize());
9898
}

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/assigner/SnapshotSplitAssigner.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class SnapshotSplitAssigner implements SplitAssigner {
6767
@Nullable private Long checkpointIdToFinish;
6868
private final SnapshotEventDialect dialect;
6969
private OffsetFactory offsetFactory;
70-
private final BaseSchema baseSchema;
70+
private final BaseSchema.SchemaFactory schemaFactory;
7171

7272
public SnapshotSplitAssigner(
7373
SourceConfig sourceConfig,
@@ -76,7 +76,7 @@ public SnapshotSplitAssigner(
7676
boolean isTableIdCaseSensitive,
7777
SnapshotEventDialect dialect,
7878
OffsetFactory offsetFactory,
79-
BaseSchema baseSchema) {
79+
BaseSchema.SchemaFactory schemaFactory) {
8080
this(
8181
sourceConfig,
8282
currentParallelism,
@@ -90,7 +90,7 @@ public SnapshotSplitAssigner(
9090
true,
9191
dialect,
9292
offsetFactory,
93-
baseSchema);
93+
schemaFactory);
9494
}
9595

9696
public SnapshotSplitAssigner(
@@ -99,7 +99,7 @@ public SnapshotSplitAssigner(
9999
SnapshotPendingSplitsState checkpoint,
100100
SnapshotEventDialect dialect,
101101
OffsetFactory offsetFactory,
102-
BaseSchema baseSchema) {
102+
BaseSchema.SchemaFactory schemaFactory) {
103103
this(
104104
sourceConfig,
105105
currentParallelism,
@@ -113,7 +113,7 @@ public SnapshotSplitAssigner(
113113
checkpoint.isRemainingTablesCheckpointed(),
114114
dialect,
115115
offsetFactory,
116-
baseSchema);
116+
schemaFactory);
117117
}
118118

119119
private SnapshotSplitAssigner(
@@ -129,7 +129,7 @@ private SnapshotSplitAssigner(
129129
boolean isRemainingTablesCheckpointed,
130130
SnapshotEventDialect dialect,
131131
OffsetFactory offsetFactory,
132-
BaseSchema baseSchema) {
132+
BaseSchema.SchemaFactory schemaFactory) {
133133
this.sourceConfig = sourceConfig;
134134
this.currentParallelism = currentParallelism;
135135
this.alreadyProcessedTables = alreadyProcessedTables;
@@ -142,7 +142,7 @@ private SnapshotSplitAssigner(
142142
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
143143
this.dialect = dialect;
144144
this.offsetFactory = offsetFactory;
145-
this.baseSchema = baseSchema;
145+
this.schemaFactory = schemaFactory;
146146
}
147147

148148
@Override
@@ -313,6 +313,7 @@ private boolean allSplitsFinished() {
313313
}
314314

315315
private ChunkSplitter createChunkSplitter(SourceConfig sourceConfig) {
316+
BaseSchema baseSchema = schemaFactory.create(sourceConfig, isTableIdCaseSensitive);
316317
return new ChunkSplitter(baseSchema, sourceConfig, dialect);
317318
}
318319
}

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/assigner/StreamSplitAssigner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsState;
2222
import com.ververica.cdc.connectors.core.assigner.state.StreamPendingSplitsState;
23-
import com.ververica.cdc.connectors.core.internal.connection.JdbcConnectionFactory;
2423
import com.ververica.cdc.connectors.core.config.SourceConfig;
2524
import com.ververica.cdc.connectors.core.dialect.Dialect;
25+
import com.ververica.cdc.connectors.core.internal.connection.JdbcConnectionFactory;
2626
import com.ververica.cdc.connectors.core.offset.Offset;
2727
import com.ververica.cdc.connectors.core.offset.OffsetFactory;
2828
import com.ververica.cdc.connectors.core.split.FinishedSnapshotSplitInfo;

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/dialect/Dialect.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import org.apache.flink.table.types.logical.RowType;
2525
import org.apache.flink.util.FlinkRuntimeException;
2626

27+
import com.ververica.cdc.connectors.core.config.SourceConfig;
2728
import com.ververica.cdc.connectors.core.internal.connection.JdbcConnectionFactory;
2829
import com.ververica.cdc.connectors.core.internal.connection.PooledDataSourceFactory;
2930
import com.ververica.cdc.connectors.core.internal.converter.JdbcSourceRecordConverter;
30-
import com.ververica.cdc.connectors.core.config.SourceConfig;
3131
import com.ververica.cdc.connectors.core.offset.Offset;
3232
import io.debezium.connector.base.ChangeEventQueue;
3333
import io.debezium.jdbc.JdbcConnection;

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/dialect/SnapshotEventDialect.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020

2121
import org.apache.flink.table.types.logical.RowType;
2222

23+
import com.ververica.cdc.connectors.core.config.SourceConfig;
2324
import com.ververica.cdc.connectors.core.internal.converter.JdbcSourceRecordConverter;
25+
import com.ververica.cdc.connectors.core.offset.Offset;
2426
import com.ververica.cdc.connectors.core.reader.split.SnapshotReader;
2527
import com.ververica.cdc.connectors.core.reader.split.dispatcher.SignalEventDispatcher;
26-
import com.ververica.cdc.connectors.core.config.SourceConfig;
27-
import com.ververica.cdc.connectors.core.offset.Offset;
2828
import com.ververica.cdc.connectors.core.split.SnapshotSplit;
2929
import com.ververica.cdc.connectors.core.split.SourceSplitBase;
3030
import io.debezium.DebeziumException;

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/internal/connection/PooledDataSourceFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import com.zaxxer.hikari.HikariConfig;
2323
import com.zaxxer.hikari.HikariDataSource;
2424

25+
import java.io.Serializable;
26+
2527
/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
26-
public abstract class PooledDataSourceFactory {
28+
public abstract class PooledDataSourceFactory implements Serializable {
29+
private static final long serialVersionUID = 1L;
2730

2831
public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
2932
public static final String SERVER_TIMEZONE_KEY = "serverTimezone";

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/reader/BaseSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
2424
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
2525

26+
import com.ververica.cdc.connectors.core.config.SourceConfig;
2627
import com.ververica.cdc.connectors.core.dialect.SnapshotEventDialect;
2728
import com.ververica.cdc.connectors.core.dialect.StreamingEventDialect;
2829
import com.ververica.cdc.connectors.core.reader.split.Reader;
2930
import com.ververica.cdc.connectors.core.reader.split.SnapshotReader;
3031
import com.ververica.cdc.connectors.core.reader.split.StreamReader;
31-
import com.ververica.cdc.connectors.core.config.SourceConfig;
3232
import com.ververica.cdc.connectors.core.split.ChangeEventRecords;
3333
import com.ververica.cdc.connectors.core.split.SourceSplitBase;
3434
import org.apache.kafka.connect.source.SourceRecord;

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/schema/BaseSchema.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919
package com.ververica.cdc.connectors.core.schema;
2020

21+
import com.ververica.cdc.connectors.core.config.SourceConfig;
2122
import io.debezium.jdbc.JdbcConnection;
2223
import io.debezium.relational.TableId;
2324
import io.debezium.relational.history.TableChanges.TableChange;
2425

26+
import java.io.Serializable;
27+
2528
/** Provides as a tool class to obtain table schema information. */
2629
public interface BaseSchema {
2730

@@ -33,4 +36,9 @@ public interface BaseSchema {
3336
* @return An abstract representation of the structure to the tables of a relational database.
3437
*/
3538
TableChange getTableSchema(JdbcConnection jdbc, TableId tableId);
39+
40+
/** A Schema Factory create a {@link BaseSchema}. */
41+
interface SchemaFactory extends Serializable {
42+
BaseSchema create(SourceConfig sourceConfig, boolean isTableIdCaseSensitive);
43+
}
3644
}

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/source/ChangeEventHybridSource.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@
3434
import com.ververica.cdc.connectors.core.assigner.HybridSplitAssigner;
3535
import com.ververica.cdc.connectors.core.assigner.SplitAssigner;
3636
import com.ververica.cdc.connectors.core.assigner.StreamSplitAssigner;
37+
import com.ververica.cdc.connectors.core.assigner.state.HybridPendingSplitsState;
3738
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsState;
3839
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsStateSerializer;
3940
import com.ververica.cdc.connectors.core.assigner.state.StreamPendingSplitsState;
40-
import com.ververica.cdc.connectors.core.dialect.StreamingEventDialect;
41-
import com.ververica.cdc.connectors.core.enumerator.SourceEnumerator;
42-
import com.ververica.cdc.connectors.core.assigner.state.HybridPendingSplitsState;
4341
import com.ververica.cdc.connectors.core.config.SourceConfig;
4442
import com.ververica.cdc.connectors.core.config.SourceConfigFactory;
4543
import com.ververica.cdc.connectors.core.config.StartupMode;
4644
import com.ververica.cdc.connectors.core.dialect.SnapshotEventDialect;
45+
import com.ververica.cdc.connectors.core.dialect.StreamingEventDialect;
46+
import com.ververica.cdc.connectors.core.enumerator.SourceEnumerator;
4747
import com.ververica.cdc.connectors.core.metrics.SourceReaderMetrics;
4848
import com.ververica.cdc.connectors.core.offset.OffsetFactory;
4949
import com.ververica.cdc.connectors.core.reader.BaseRecordEmitter;
@@ -76,23 +76,23 @@ public class ChangeEventHybridSource<T>
7676
private final Validator validator;
7777
private final SnapshotEventDialect snapshotEventDialect;
7878
private final StreamingEventDialect streamingEventDialect;
79-
private final BaseSchema baseSchema;
79+
private final BaseSchema.SchemaFactory schemaFactory;
8080

8181
public ChangeEventHybridSource(
8282
SourceConfigFactory configFactory,
8383
DebeziumDeserializationSchema<T> deserializationSchema,
8484
OffsetFactory offsetFactory,
8585
SnapshotEventDialect snapshotEventDialect,
8686
StreamingEventDialect streamingEventDialect,
87-
BaseSchema baseSchema) {
87+
BaseSchema.SchemaFactory schemaFactory) {
8888
this(
8989
configFactory,
9090
deserializationSchema,
9191
offsetFactory,
9292
snapshotEventDialect,
9393
streamingEventDialect,
9494
Validator.getDefaultValidator(),
95-
baseSchema);
95+
schemaFactory);
9696
}
9797

9898
public ChangeEventHybridSource(
@@ -102,7 +102,7 @@ public ChangeEventHybridSource(
102102
SnapshotEventDialect snapshotEventDialect,
103103
StreamingEventDialect streamingEventDialect,
104104
Validator validator,
105-
BaseSchema baseSchema) {
105+
BaseSchema.SchemaFactory schemaFactory) {
106106
this.configFactory = configFactory;
107107
this.deserializationSchema = deserializationSchema;
108108
this.offsetFactory = offsetFactory;
@@ -116,7 +116,7 @@ public OffsetFactory getOffsetFactory() {
116116
}
117117
};
118118
this.validator = validator;
119-
this.baseSchema = baseSchema;
119+
this.schemaFactory = schemaFactory;
120120
}
121121

122122
@Override
@@ -177,7 +177,7 @@ public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(
177177
isTableIdCaseSensitive,
178178
snapshotEventDialect,
179179
offsetFactory,
180-
baseSchema);
180+
schemaFactory);
181181
} catch (Exception e) {
182182
throw new FlinkRuntimeException(
183183
"Failed to discover captured tables for enumerator", e);
@@ -204,7 +204,7 @@ public SplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator(
204204
(HybridPendingSplitsState) checkpoint,
205205
snapshotEventDialect,
206206
offsetFactory,
207-
baseSchema);
207+
schemaFactory);
208208
} else if (checkpoint instanceof StreamPendingSplitsState) {
209209
splitAssigner =
210210
new StreamSplitAssigner(

flink-cdc-core/src/main/java/com/ververica/cdc/connectors/core/split/SourceSplitSerializer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import java.util.List;
4343
import java.util.Map;
4444

45-
import static com.ververica.cdc.connectors.core.utils.SerializerUtils.rowToSerializedString;
46-
4745
/** A serializer for the {@link SourceSplitBase}. */
4846
public abstract class SourceSplitSerializer
4947
implements SimpleVersionedSerializer<SourceSplitBase>, OffsetDeserializerSerializer {

0 commit comments

Comments
 (0)