Skip to content

Commit 7fb7012

Browse files
committed
[cdc-core] Introduce experimental module flink-cdc-connector-mysql-new
1 parent 785fe85 commit 7fb7012

File tree

85 files changed

+435
-393
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+435
-393
lines changed

flink-connector-base/pom.xml renamed to flink-cdc-core/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@ under the License.
2525
</parent>
2626
<modelVersion>4.0.0</modelVersion>
2727

28-
<artifactId>flink-connector-base</artifactId>
28+
<artifactId>flink-cdc-core</artifactId>
2929

3030
<dependencies>
31-
<!-- Debezium dependencies -->
3231
<dependency>
3332
<groupId>com.ververica</groupId>
3433
<artifactId>flink-connector-debezium</artifactId>
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners;
19+
package com.ververica.cdc.connectors.core.assigner;
2020

2121
import javax.annotation.Nullable;
2222

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners;
19+
package com.ververica.cdc.connectors.core.assigner;
2020

2121
import org.apache.flink.table.api.ValidationException;
2222
import org.apache.flink.table.types.logical.RowType;
2323
import org.apache.flink.util.FlinkRuntimeException;
2424

25-
import com.ververica.cdc.connectors.base.schema.BaseSchema;
26-
import com.ververica.cdc.connectors.base.source.config.SourceConfig;
27-
import com.ververica.cdc.connectors.base.source.dialect.SnapshotEventDialect;
28-
import com.ververica.cdc.connectors.base.source.split.SnapshotSplit;
29-
import com.ververica.cdc.connectors.base.source.utils.ObjectUtils;
25+
import com.ververica.cdc.connectors.core.config.SourceConfig;
26+
import com.ververica.cdc.connectors.core.dialect.SnapshotEventDialect;
27+
import com.ververica.cdc.connectors.core.schema.BaseSchema;
28+
import com.ververica.cdc.connectors.core.split.SnapshotSplit;
29+
import com.ververica.cdc.connectors.core.utils.ObjectUtils;
3030
import io.debezium.jdbc.JdbcConnection;
3131
import io.debezium.relational.Column;
3232
import io.debezium.relational.Table;
@@ -45,7 +45,7 @@
4545
import java.util.Map;
4646
import java.util.Objects;
4747

48-
import static com.ververica.cdc.connectors.base.source.utils.ObjectUtils.doubleCompare;
48+
import static com.ververica.cdc.connectors.core.utils.ObjectUtils.doubleCompare;
4949
import static java.math.BigDecimal.ROUND_CEILING;
5050

5151
/** The {@code ChunkSplitter}'s task is to split table into a set of chunks or called splits. */
Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners;
20-
21-
import com.ververica.cdc.connectors.base.schema.BaseSchema;
22-
import com.ververica.cdc.connectors.base.source.assigners.state.HybridPendingSplitsState;
23-
import com.ververica.cdc.connectors.base.source.assigners.state.PendingSplitsState;
24-
import com.ververica.cdc.connectors.base.source.config.SourceConfig;
25-
import com.ververica.cdc.connectors.base.source.dialect.SnapshotEventDialect;
26-
import com.ververica.cdc.connectors.base.source.offset.Offset;
27-
import com.ververica.cdc.connectors.base.source.offset.OffsetFactory;
28-
import com.ververica.cdc.connectors.base.source.split.FinishedSnapshotSplitInfo;
29-
import com.ververica.cdc.connectors.base.source.split.SnapshotSplit;
30-
import com.ververica.cdc.connectors.base.source.split.SourceSplitBase;
31-
import com.ververica.cdc.connectors.base.source.split.StreamSplit;
19+
package com.ververica.cdc.connectors.core.assigner;
20+
21+
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsState;
22+
import com.ververica.cdc.connectors.core.assigner.state.HybridPendingSplitsState;
23+
import com.ververica.cdc.connectors.core.config.SourceConfig;
24+
import com.ververica.cdc.connectors.core.dialect.SnapshotEventDialect;
25+
import com.ververica.cdc.connectors.core.offset.Offset;
26+
import com.ververica.cdc.connectors.core.offset.OffsetFactory;
27+
import com.ververica.cdc.connectors.core.schema.BaseSchema;
28+
import com.ververica.cdc.connectors.core.split.FinishedSnapshotSplitInfo;
29+
import com.ververica.cdc.connectors.core.split.SnapshotSplit;
30+
import com.ververica.cdc.connectors.core.split.SourceSplitBase;
31+
import com.ververica.cdc.connectors.core.split.StreamSplit;
3232
import io.debezium.relational.TableId;
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
@@ -93,7 +93,7 @@ public HybridSplitAssigner(
9393
dialect,
9494
offsetFactory,
9595
baseSchema),
96-
checkpoint.isBinlogSplitAssigned(),
96+
checkpoint.isStreamSplitAssigned(),
9797
sourceConfig.getSplitMetaGroupSize());
9898
}
9999

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners;
19+
package com.ververica.cdc.connectors.core.assigner;
2020

2121
import org.apache.flink.util.FlinkRuntimeException;
2222

23-
import com.ververica.cdc.connectors.base.schema.BaseSchema;
24-
import com.ververica.cdc.connectors.base.source.assigners.state.SnapshotPendingSplitsState;
25-
import com.ververica.cdc.connectors.base.source.config.SourceConfig;
26-
import com.ververica.cdc.connectors.base.source.dialect.SnapshotEventDialect;
27-
import com.ververica.cdc.connectors.base.source.offset.Offset;
28-
import com.ververica.cdc.connectors.base.source.offset.OffsetFactory;
29-
import com.ververica.cdc.connectors.base.source.split.FinishedSnapshotSplitInfo;
30-
import com.ververica.cdc.connectors.base.source.split.SnapshotSplit;
31-
import com.ververica.cdc.connectors.base.source.split.SourceSplitBase;
23+
import com.ververica.cdc.connectors.core.assigner.state.SnapshotPendingSplitsState;
24+
import com.ververica.cdc.connectors.core.config.SourceConfig;
25+
import com.ververica.cdc.connectors.core.dialect.SnapshotEventDialect;
26+
import com.ververica.cdc.connectors.core.offset.Offset;
27+
import com.ververica.cdc.connectors.core.offset.OffsetFactory;
28+
import com.ververica.cdc.connectors.core.schema.BaseSchema;
29+
import com.ververica.cdc.connectors.core.split.FinishedSnapshotSplitInfo;
30+
import com.ververica.cdc.connectors.core.split.SnapshotSplit;
31+
import com.ververica.cdc.connectors.core.split.SourceSplitBase;
3232
import io.debezium.relational.TableId;
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,22 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners;
19+
package com.ververica.cdc.connectors.core.assigner;
2020

2121
import org.apache.flink.api.common.state.CheckpointListener;
2222

23-
import com.ververica.cdc.connectors.base.source.assigners.state.PendingSplitsState;
24-
import com.ververica.cdc.connectors.base.source.offset.Offset;
25-
import com.ververica.cdc.connectors.base.source.split.FinishedSnapshotSplitInfo;
26-
import com.ververica.cdc.connectors.base.source.split.SourceSplitBase;
23+
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsState;
24+
import com.ververica.cdc.connectors.core.offset.Offset;
25+
import com.ververica.cdc.connectors.core.split.FinishedSnapshotSplitInfo;
26+
import com.ververica.cdc.connectors.core.split.SourceSplitBase;
2727

2828
import java.util.Collection;
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.Optional;
3232

3333
/**
34-
* The {@code MySqlSplitAssigner} is responsible for deciding what split should be processed. It
34+
* The {@code SplitAssigner} is responsible for deciding what split should be processed. It
3535
* determines split processing order.
3636
*/
3737
public interface SplitAssigner {
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners;
20-
21-
import com.ververica.cdc.connectors.base.source.assigners.state.PendingSplitsState;
22-
import com.ververica.cdc.connectors.base.source.assigners.state.StreamPendingSplitsState;
23-
import com.ververica.cdc.connectors.base.source.config.SourceConfig;
24-
import com.ververica.cdc.connectors.base.source.dialect.Dialect;
25-
import com.ververica.cdc.connectors.base.source.internal.connection.JdbcConnectionFactory;
26-
import com.ververica.cdc.connectors.base.source.offset.Offset;
27-
import com.ververica.cdc.connectors.base.source.offset.OffsetFactory;
28-
import com.ververica.cdc.connectors.base.source.split.FinishedSnapshotSplitInfo;
29-
import com.ververica.cdc.connectors.base.source.split.SourceSplitBase;
30-
import com.ververica.cdc.connectors.base.source.split.StreamSplit;
19+
package com.ververica.cdc.connectors.core.assigner;
20+
21+
import com.ververica.cdc.connectors.core.assigner.state.PendingSplitsState;
22+
import com.ververica.cdc.connectors.core.assigner.state.StreamPendingSplitsState;
23+
import com.ververica.cdc.connectors.core.internal.connection.JdbcConnectionFactory;
24+
import com.ververica.cdc.connectors.core.config.SourceConfig;
25+
import com.ververica.cdc.connectors.core.dialect.Dialect;
26+
import com.ververica.cdc.connectors.core.offset.Offset;
27+
import com.ververica.cdc.connectors.core.offset.OffsetFactory;
28+
import com.ververica.cdc.connectors.core.split.FinishedSnapshotSplitInfo;
29+
import com.ververica.cdc.connectors.core.split.SourceSplitBase;
30+
import com.ververica.cdc.connectors.core.split.StreamSplit;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,27 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners.state;
19+
package com.ververica.cdc.connectors.core.assigner.state;
2020

2121
import java.util.Objects;
2222

2323
/** A {@link PendingSplitsState} for pending hybrid (snapshot & binlog) splits. */
2424
public class HybridPendingSplitsState extends PendingSplitsState {
2525
private final SnapshotPendingSplitsState snapshotPendingSplits;
26-
private final boolean isBinlogSplitAssigned;
26+
private final boolean isStreamSplitAssigned;
2727

2828
public HybridPendingSplitsState(
29-
SnapshotPendingSplitsState snapshotPendingSplits, boolean isBinlogSplitAssigned) {
29+
SnapshotPendingSplitsState snapshotPendingSplits, boolean isStreamSplitAssigned) {
3030
this.snapshotPendingSplits = snapshotPendingSplits;
31-
this.isBinlogSplitAssigned = isBinlogSplitAssigned;
31+
this.isStreamSplitAssigned = isStreamSplitAssigned;
3232
}
3333

3434
public SnapshotPendingSplitsState getSnapshotPendingSplits() {
3535
return snapshotPendingSplits;
3636
}
3737

38-
public boolean isBinlogSplitAssigned() {
39-
return isBinlogSplitAssigned;
38+
public boolean isStreamSplitAssigned() {
39+
return isStreamSplitAssigned;
4040
}
4141

4242
@Override
@@ -48,13 +48,13 @@ public boolean equals(Object o) {
4848
return false;
4949
}
5050
HybridPendingSplitsState that = (HybridPendingSplitsState) o;
51-
return isBinlogSplitAssigned == that.isBinlogSplitAssigned
51+
return isStreamSplitAssigned == that.isStreamSplitAssigned
5252
&& Objects.equals(snapshotPendingSplits, that.snapshotPendingSplits);
5353
}
5454

5555
@Override
5656
public int hashCode() {
57-
return Objects.hash(snapshotPendingSplits, isBinlogSplitAssigned);
57+
return Objects.hash(snapshotPendingSplits, isStreamSplitAssigned);
5858
}
5959

6060
@Override
@@ -63,7 +63,7 @@ public String toString() {
6363
+ "snapshotPendingSplits="
6464
+ snapshotPendingSplits
6565
+ ", isBinlogSplitAssigned="
66-
+ isBinlogSplitAssigned
66+
+ isStreamSplitAssigned
6767
+ '}';
6868
}
6969
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners.state;
19+
package com.ververica.cdc.connectors.core.assigner.state;
2020

2121
import javax.annotation.Nullable;
2222

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.ververica.cdc.connectors.base.source.assigners.state;
19+
package com.ververica.cdc.connectors.core.assigner.state;
2020

2121
import org.apache.flink.core.io.SimpleVersionedSerializer;
2222
import org.apache.flink.core.memory.DataInputDeserializer;
2323
import org.apache.flink.core.memory.DataOutputSerializer;
2424

25-
import com.ververica.cdc.connectors.base.source.offset.Offset;
26-
import com.ververica.cdc.connectors.base.source.split.SnapshotSplit;
27-
import com.ververica.cdc.connectors.base.source.split.SourceSplitBase;
28-
import com.ververica.cdc.connectors.base.source.split.SourceSplitSerializer;
25+
import com.ververica.cdc.connectors.core.offset.Offset;
26+
import com.ververica.cdc.connectors.core.split.SnapshotSplit;
27+
import com.ververica.cdc.connectors.core.split.SourceSplitBase;
28+
import com.ververica.cdc.connectors.core.split.SourceSplitSerializer;
2929
import io.debezium.relational.TableId;
3030

3131
import java.io.IOException;
@@ -156,7 +156,7 @@ private void serializeSnapshotPendingSplitsState(
156156
private void serializeHybridPendingSplitsState(
157157
HybridPendingSplitsState state, DataOutputSerializer out) throws IOException {
158158
serializeSnapshotPendingSplitsState(state.getSnapshotPendingSplits(), out);
159-
out.writeBoolean(state.isBinlogSplitAssigned());
159+
out.writeBoolean(state.isStreamSplitAssigned());
160160
}
161161

162162
private void serializeBinlogPendingSplitsState(

0 commit comments

Comments
 (0)