Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceReader;
Expand All @@ -35,6 +36,7 @@
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getFetchTimestamp;
Expand All @@ -52,7 +54,7 @@
* emit records rather than emit the records directly.
*/
public class JdbcSourceRecordEmitter<T>
implements RecordEmitter<SourceRecord, T, SourceSplitState> {
implements RecordEmitter<SourceRecords, T, SourceSplitState> {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceRecordEmitter.class);
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
Expand All @@ -78,6 +80,15 @@ public JdbcSourceRecordEmitter(

@Override
public void emitRecord(
SourceRecords sourceRecords, SourceOutput<T> output, SourceSplitState splitState)
throws Exception {
final Iterator<SourceRecord> elementIterator = sourceRecords.iterator();
while (elementIterator.hasNext()) {
processElement(elementIterator.next(), output, splitState);
}
}

private void processElement(
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState)
throws Exception {
if (isWatermarkEvent(element)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
import com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState;
import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.JdbcIncrementalSourceReader;
import com.ververica.cdc.connectors.base.source.reader.JdbcSourceSplitReader;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.function.Supplier;
Expand Down Expand Up @@ -101,7 +101,7 @@ public Boundedness getBoundedness() {
public SourceReader createReader(SourceReaderContext readerContext) {
// create source config for the given subtask (e.g. unique server id)
JdbcSourceConfig sourceConfig = configFactory.create(readerContext.getIndexOfSubtask());
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final SourceReaderMetrics sourceReaderMetrics =
new SourceReaderMetrics(readerContext.metricGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;

import org.apache.kafka.connect.source.SourceRecord;

import javax.annotation.Nullable;

import java.util.Collections;
Expand All @@ -29,11 +27,11 @@
/**
* An implementation of {@link RecordsWithSplitIds} which contains the records of one table split.
*/
public final class ChangeEventRecords implements RecordsWithSplitIds<SourceRecord> {
public final class ChangeEventRecords implements RecordsWithSplitIds<SourceRecords> {

@Nullable private String splitId;
@Nullable private Iterator<SourceRecord> recordsForCurrentSplit;
@Nullable private final Iterator<SourceRecord> recordsForSplit;
@Nullable private Iterator<SourceRecords> recordsForCurrentSplit;
@Nullable private final Iterator<SourceRecords> recordsForSplit;
private final Set<String> finishedSnapshotSplits;

public ChangeEventRecords(
Expand All @@ -59,8 +57,8 @@ public String nextSplit() {

@Nullable
@Override
public SourceRecord nextRecordFromSplit() {
final Iterator<SourceRecord> recordsForSplit = this.recordsForCurrentSplit;
public SourceRecords nextRecordFromSplit() {
final Iterator<SourceRecords> recordsForSplit = this.recordsForCurrentSplit;
if (recordsForSplit != null) {
if (recordsForSplit.hasNext()) {
return recordsForSplit.next();
Expand All @@ -78,7 +76,7 @@ public Set<String> finishedSplits() {
}

public static ChangeEventRecords forRecords(
final String splitId, final Iterator<SourceRecord> recordsForSplit) {
final String splitId, final Iterator<SourceRecords> recordsForSplit) {
return new ChangeEventRecords(splitId, recordsForSplit, Collections.emptySet());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.base.source.meta.split;

import org.apache.kafka.connect.source.SourceRecord;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/** Data structure to describe a set of {@link SourceRecord}. */
public final class SourceRecords {

private final List<SourceRecord> sourceRecords;

public SourceRecords(List<SourceRecord> sourceRecords) {
this.sourceRecords = sourceRecords;
}

public List<SourceRecord> getSourceRecordList() {
return sourceRecords;
}

public Iterator<SourceRecord> iterator() {
return sourceRecords.iterator();
}

public static SourceRecords fromSingleRecord(SourceRecord record) {
final List<SourceRecord> records = new ArrayList<>();
records.add(record);
return new SourceRecords(records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,7 +68,7 @@
@Experimental
public class JdbcIncrementalSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
SourceRecord, T, SourceSplitBase, SourceSplitState> {
SourceRecords, T, SourceSplitBase, SourceSplitState> {

private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);

Expand All @@ -80,9 +80,9 @@ public class JdbcIncrementalSourceReader<T>
private final JdbcDataSourceDialect dialect;

public JdbcIncrementalSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue,
Supplier<JdbcSourceSplitReader> splitReaderSupplier,
RecordEmitter<SourceRecord, T, SourceSplitState> recordEmitter,
RecordEmitter<SourceRecords, T, SourceSplitState> recordEmitter,
Configuration config,
SourceReaderContext context,
JdbcSourceConfig sourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.Fetcher;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
Expand All @@ -43,13 +44,13 @@

/** Basic class read {@link SourceSplitBase} and return {@link SourceRecord}. */
@Experimental
public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSplitBase> {
public class JdbcSourceSplitReader implements SplitReader<SourceRecords, SourceSplitBase> {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitReader.class);
private final Queue<SourceSplitBase> splits;
private final int subtaskId;

@Nullable private Fetcher<SourceRecord, SourceSplitBase> currentFetcher;
@Nullable private Fetcher<SourceRecords, SourceSplitBase> currentFetcher;
@Nullable private String currentSplitId;
private final JdbcDataSourceDialect dataSourceDialect;
private final JdbcSourceConfig sourceConfig;
Expand All @@ -63,9 +64,9 @@ public JdbcSourceSplitReader(
}

@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
checkSplitOrStartNext();
Iterator<SourceRecord> dataIt = null;
Iterator<SourceRecords> dataIt = null;
try {
dataIt = currentFetcher.pollSplitRecords();
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;

import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
Expand Down Expand Up @@ -55,7 +56,7 @@
/**
* Fetcher to fetch data from table split, the split is the snapshot split {@link SnapshotSplit}.
*/
public class JdbcSourceScanFetcher implements Fetcher<SourceRecord, SourceSplitBase> {
public class JdbcSourceScanFetcher implements Fetcher<SourceRecords, SourceSplitBase> {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceScanFetcher.class);

Expand Down Expand Up @@ -115,7 +116,7 @@ public boolean isFinished() {

@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();

if (hasNextElement.get()) {
Expand Down Expand Up @@ -168,7 +169,10 @@ public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
normalizedRecords.add(lowWatermark);
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
normalizedRecords.add(highWatermark);
return normalizedRecords.iterator();

final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
sourceRecordsSet.add(new SourceRecords(normalizedRecords));
return sourceRecordsSet.iterator();
}
// the data has been polled, no more data
reachEnd.compareAndSet(false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.utils.SourceRecordUtils;
Expand Down Expand Up @@ -50,7 +51,7 @@
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.splitKeyRangeContains;

/** Fetcher to fetch data from table split, the split is the stream split {@link StreamSplit}. */
public class JdbcSourceStreamFetcher implements Fetcher<SourceRecord, SourceSplitBase> {
public class JdbcSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceStreamFetcher.class);

private final JdbcSourceFetchTaskContext taskContext;
Expand Down Expand Up @@ -102,7 +103,7 @@ public boolean isFinished() {

@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
if (streamFetchTask.isRunning()) {
Expand All @@ -113,7 +114,9 @@ public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
}
}
}
return sourceRecords.iterator();
List<SourceRecords> sourceRecordsSet = new ArrayList<>();
sourceRecordsSet.add(new SourceRecords(sourceRecords));
return sourceRecordsSet.iterator();
}

private void checkReadException() {
Expand Down