Skip to content

Commit ea480fb

Browse files
author
yexianxun
committed
[WIP] abstract common components to connector base module : rename ParallelSourceReader, class ChunkSplitter, interface Dialect add APIs, todo abstract SnapshotReader
1 parent 13b3dbe commit ea480fb

22 files changed

+768
-68
lines changed
Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,116 @@
11
package com.ververica.cdc.connectors.base.source;
22

3+
import com.ververica.cdc.connectors.base.source.assigners.state.PendingSplitsState;
4+
import com.ververica.cdc.connectors.base.source.config.SourceConfig;
5+
import com.ververica.cdc.connectors.base.source.config.SourceConfigFactory;
6+
import com.ververica.cdc.connectors.base.source.dialect.Dialect;
7+
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
8+
import com.ververica.cdc.connectors.base.source.offset.OffsetFactory;
9+
import com.ververica.cdc.connectors.base.source.reader.BaseRecordEmitter;
10+
import com.ververica.cdc.connectors.base.source.reader.BaseSplitReader;
11+
import com.ververica.cdc.connectors.base.source.reader.ParallelSourceReader;
12+
import com.ververica.cdc.connectors.base.source.split.SourceSplitBase;
13+
import com.ververica.cdc.connectors.base.source.split.SourceSplitSerializer;
14+
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
15+
import org.apache.flink.api.common.typeinfo.TypeInformation;
16+
import org.apache.flink.api.connector.source.Boundedness;
17+
import org.apache.flink.api.connector.source.Source;
18+
import org.apache.flink.api.connector.source.SourceReader;
19+
import org.apache.flink.api.connector.source.SourceReaderContext;
20+
import org.apache.flink.api.connector.source.SplitEnumerator;
21+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
22+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
23+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
24+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
25+
import org.apache.flink.core.io.SimpleVersionedSerializer;
26+
import org.apache.kafka.connect.source.SourceRecord;
27+
28+
import java.util.function.Supplier;
29+
330
/**
431
* Created by [email protected] on 2022/1/7.
532
*/
6-
public class ChangeEventHybridSource {
33+
public class ChangeEventHybridSource<T>
34+
implements Source<T, SourceSplitBase, PendingSplitsState>, ResultTypeQueryable<T> {
35+
36+
private static final long serialVersionUID = 1L;
37+
38+
private final SourceConfigFactory configFactory;
39+
private final DebeziumDeserializationSchema<T> deserializationSchema;
40+
private final OffsetFactory offsetFactory;
41+
private final Dialect dialect;
42+
43+
public ChangeEventHybridSource(
44+
SourceConfigFactory configFactory,
45+
DebeziumDeserializationSchema<T> deserializationSchema,
46+
OffsetFactory offsetFactory,
47+
Dialect dialect) {
48+
this.configFactory = configFactory;
49+
this.deserializationSchema = deserializationSchema;
50+
this.offsetFactory = offsetFactory;
51+
this.dialect = dialect;
52+
}
53+
54+
@Override
55+
public Boundedness getBoundedness() {
56+
return Boundedness.CONTINUOUS_UNBOUNDED;
57+
}
58+
59+
@Override
60+
public SourceReader<T, SourceSplitBase> createReader(SourceReaderContext readerContext) throws Exception {
61+
// create source config for the given subtask (e.g. unique server id)
62+
SourceConfig sourceConfig =
63+
configFactory.createConfig(readerContext.getIndexOfSubtask());
64+
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
65+
new FutureCompletingBlockingQueue<>();
66+
final SourceReaderMetrics sourceReaderMetrics =
67+
new SourceReaderMetrics(readerContext.metricGroup());
68+
sourceReaderMetrics.registerMetrics();
69+
Supplier<BaseSplitReader> splitReaderSupplier =
70+
() -> new BaseSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
71+
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer() {
72+
@Override
73+
public OffsetFactory getOffsetFactory() {
74+
return offsetFactory;
75+
}
76+
};
77+
return new ParallelSourceReader<>(
78+
elementsQueue,
79+
splitReaderSupplier,
80+
new BaseRecordEmitter<>(
81+
deserializationSchema,
82+
sourceReaderMetrics,
83+
sourceConfig.isIncludeSchemaChanges(),
84+
offsetFactory),
85+
readerContext.getConfiguration(),
86+
readerContext,
87+
sourceConfig,
88+
sourceSplitSerializer,
89+
dialect);
90+
}
91+
92+
@Override
93+
public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator(SplitEnumeratorContext<SourceSplitBase> enumContext) throws Exception {
94+
return null;
95+
}
96+
97+
@Override
98+
public SplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator(SplitEnumeratorContext<SourceSplitBase> enumContext, PendingSplitsState checkpoint) throws Exception {
99+
return null;
100+
}
101+
102+
@Override
103+
public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
104+
return null;
105+
}
106+
107+
@Override
108+
public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() {
109+
return null;
110+
}
111+
112+
@Override
113+
public TypeInformation<T> getProducedType() {
114+
return null;
115+
}
7116
}

flink-connector-base/src/main/java/com/ververica/cdc/connectors/base/source/assigners/ChunkSplitter.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
import static com.ververica.cdc.connectors.base.source.utils.StatementUtils.queryNextChunkMax;
5353
import static java.math.BigDecimal.ROUND_CEILING;
5454

55-
public abstract class ChunkSplitter {
55+
public class ChunkSplitter {
5656

5757
private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class);
5858

@@ -69,11 +69,6 @@ public ChunkSplitter(
6969
this.dialect = dialect;
7070
}
7171

72-
public abstract RowType getSplitType(Column splitColumn);
73-
74-
/** Checks whether split column is evenly distributed across its range. */
75-
public abstract boolean isEvenlySplitColumn(Column splitColumn);
76-
7772
/** Generates all snapshot splits (chunks) for the give table path. */
7873
public Collection<SnapshotSplit> generateSplits(TableId tableId) {
7974
try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
@@ -92,7 +87,7 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
9287

9388
// convert chunks into splits
9489
List<SnapshotSplit> splits = new ArrayList<>();
95-
RowType splitType = getSplitType(splitColumn);
90+
RowType splitType = dialect.getSplitType(splitColumn);
9691
for (int i = 0; i < chunks.size(); i++) {
9792
ChunkRange chunk = chunks.get(i);
9893
SnapshotSplit split =
@@ -143,7 +138,7 @@ private List<ChunkRange> splitTableIntoChunks(
143138
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
144139
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
145140

146-
if (isEvenlySplitColumn(splitColumn)) {
141+
if (dialect.isEvenlySplitColumn(splitColumn)) {
147142
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
148143
double distributionFactor =
149144
calculateDistributionFactor(tableId, min, max, approximateRowCnt);

flink-connector-base/src/main/java/com/ververica/cdc/connectors/base/source/assigners/StreamSplitAssigner.java

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818

1919
package com.ververica.cdc.connectors.base.source.assigners;
2020

21-
import com.ververica.cdc.connectors.base.source.assigners.state.IncrementalPendingSplitsState;
21+
import com.ververica.cdc.connectors.base.source.assigners.state.StreamPendingSplitsState;
2222
import com.ververica.cdc.connectors.base.source.assigners.state.PendingSplitsState;
2323
import com.ververica.cdc.connectors.base.source.config.SourceConfig;
24+
import com.ververica.cdc.connectors.base.source.dialect.Dialect;
2425
import com.ververica.cdc.connectors.base.source.internal.connection.JdbcConnectionFactory;
2526
import com.ververica.cdc.connectors.base.source.offset.Offset;
2627
import com.ververica.cdc.connectors.base.source.split.FinishedSnapshotSplitInfo;
@@ -29,46 +30,50 @@
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
3132

33+
import java.util.ArrayList;
3234
import java.util.Collection;
3335
import java.util.Collections;
36+
import java.util.HashMap;
3437
import java.util.List;
3538
import java.util.Map;
3639
import java.util.Optional;
3740

38-
public abstract class StreamSplitAssigner implements SplitAssigner {
41+
public class StreamSplitAssigner implements SplitAssigner {
3942

4043
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitAssigner.class);
4144
private static final String BINLOG_SPLIT_ID = "binlog-split";
4245

4346
private final SourceConfig sourceConfig;
4447

45-
private boolean isBinlogSplitAssigned;
48+
private boolean isStreamSplitAssigned;
4649

4750
private JdbcConnectionFactory jdbcConnectionFactory;
51+
private final Dialect dialect;
4852

49-
public StreamSplitAssigner(SourceConfig sourceConfig) {
50-
this(sourceConfig, false);
53+
public StreamSplitAssigner(SourceConfig sourceConfig, Dialect dialect) {
54+
this(sourceConfig, false, dialect);
5155
}
5256

5357
public StreamSplitAssigner(
54-
SourceConfig sourceConfig, IncrementalPendingSplitsState checkpoint) {
55-
this(sourceConfig, checkpoint.isBinlogSplitAssigned());
58+
SourceConfig sourceConfig, StreamPendingSplitsState checkpoint, Dialect dialect) {
59+
this(sourceConfig, checkpoint.isStreamSplitAssigned(), dialect);
5660
}
5761

58-
private StreamSplitAssigner(SourceConfig sourceConfig, boolean isBinlogSplitAssigned) {
62+
private StreamSplitAssigner(SourceConfig sourceConfig, boolean isStreamSplitAssigned, Dialect dialect) {
5963
this.sourceConfig = sourceConfig;
60-
this.isBinlogSplitAssigned = isBinlogSplitAssigned;
64+
this.isStreamSplitAssigned = isStreamSplitAssigned;
65+
this.dialect = dialect;
6166
}
6267

6368
@Override
6469
public void open() {}
6570

6671
@Override
6772
public Optional<SourceSplitBase> getNext() {
68-
if (isBinlogSplitAssigned) {
73+
if (isStreamSplitAssigned) {
6974
return Optional.empty();
7075
} else {
71-
isBinlogSplitAssigned = true;
76+
isStreamSplitAssigned = true;
7277
return Optional.of(createBinlogSplit());
7378
}
7479
}
@@ -91,12 +96,12 @@ public void onFinishedSplits(Map<String, Offset> splitFinishedOffsets) {
9196
@Override
9297
public void addSplits(Collection<SourceSplitBase> splits) {
9398
// we don't store the split, but will re-create binlog split later
94-
isBinlogSplitAssigned = false;
99+
isStreamSplitAssigned = false;
95100
}
96101

97102
@Override
98103
public PendingSplitsState snapshotState(long checkpointId) {
99-
return new IncrementalPendingSplitsState(isBinlogSplitAssigned);
104+
return new StreamPendingSplitsState(isStreamSplitAssigned);
100105
}
101106

102107
@Override
@@ -109,5 +114,14 @@ public void close() {}
109114

110115
// ------------------------------------------------------------------------------------------
111116

112-
public abstract StreamSplit createBinlogSplit();
117+
public StreamSplit createBinlogSplit() {
118+
return new StreamSplit(
119+
BINLOG_SPLIT_ID,
120+
dialect.createCurrentOffset(sourceConfig),
121+
dialect.getInitialOffset(),
122+
new ArrayList<>(),
123+
new HashMap<>(),
124+
0
125+
);
126+
}
113127
}

flink-connector-base/src/main/java/com/ververica/cdc/connectors/base/source/assigners/state/PendingSplitsStateSerializer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ public byte[] serialize(PendingSplitsState state) throws IOException {
7777
if (state instanceof SnapshotPendingSplitsState) {
7878
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
7979
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
80-
} else if (state instanceof IncrementalPendingSplitsState) {
80+
} else if (state instanceof StreamPendingSplitsState) {
8181
out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
82-
serializeBinlogPendingSplitsState((IncrementalPendingSplitsState) state, out);
82+
serializeBinlogPendingSplitsState((StreamPendingSplitsState) state, out);
8383
} else if (state instanceof HybridPendingSplitsState) {
8484
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
8585
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
@@ -165,8 +165,8 @@ private void serializeHybridPendingSplitsState(
165165
}
166166

167167
private void serializeBinlogPendingSplitsState(
168-
IncrementalPendingSplitsState state, DataOutputSerializer out) throws IOException {
169-
out.writeBoolean(state.isBinlogSplitAssigned());
168+
StreamPendingSplitsState state, DataOutputSerializer out) throws IOException {
169+
out.writeBoolean(state.isStreamSplitAssigned());
170170
}
171171

172172
// ------------------------------------------------------------------------------------------
@@ -229,9 +229,9 @@ private HybridPendingSplitsState deserializeHybridPendingSplitsState(
229229
return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned);
230230
}
231231

232-
private IncrementalPendingSplitsState deserializeBinlogPendingSplitsState(
232+
private StreamPendingSplitsState deserializeBinlogPendingSplitsState(
233233
DataInputDeserializer in) throws IOException {
234-
return new IncrementalPendingSplitsState(in.readBoolean());
234+
return new StreamPendingSplitsState(in.readBoolean());
235235
}
236236

237237
// ------------------------------------------------------------------------------------------
Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
import java.util.Objects;
2222

2323
/** A {@link PendingSplitsState} for pending binlog splits. */
24-
public class IncrementalPendingSplitsState extends PendingSplitsState {
24+
public class StreamPendingSplitsState extends PendingSplitsState {
2525

26-
private final boolean isBinlogSplitAssigned;
26+
private final boolean isStreamSplitAssigned;
2727

28-
public IncrementalPendingSplitsState(boolean isBinlogSplitAssigned) {
29-
this.isBinlogSplitAssigned = isBinlogSplitAssigned;
28+
public StreamPendingSplitsState(boolean isStreamSplitAssigned) {
29+
this.isStreamSplitAssigned = isStreamSplitAssigned;
3030
}
3131

32-
public boolean isBinlogSplitAssigned() {
33-
return isBinlogSplitAssigned;
32+
public boolean isStreamSplitAssigned() {
33+
return isStreamSplitAssigned;
3434
}
3535

3636
@Override
@@ -41,17 +41,17 @@ public boolean equals(Object o) {
4141
if (o == null || getClass() != o.getClass()) {
4242
return false;
4343
}
44-
IncrementalPendingSplitsState that = (IncrementalPendingSplitsState) o;
45-
return isBinlogSplitAssigned == that.isBinlogSplitAssigned;
44+
StreamPendingSplitsState that = (StreamPendingSplitsState) o;
45+
return isStreamSplitAssigned == that.isStreamSplitAssigned;
4646
}
4747

4848
@Override
4949
public int hashCode() {
50-
return Objects.hash(isBinlogSplitAssigned);
50+
return Objects.hash(isStreamSplitAssigned);
5151
}
5252

5353
@Override
5454
public String toString() {
55-
return "BinlogPendingSplitsState{" + "isBinlogSplitAssigned=" + isBinlogSplitAssigned + '}';
55+
return "StreamPendingSplitsState{" + "isStreamSplitAssigned=" + isStreamSplitAssigned + '}';
5656
}
5757
}

0 commit comments

Comments
 (0)