Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.base.source.reader.external;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -202,6 +203,11 @@ public void close() {
}
}

@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}

private void assertLowWatermark(SourceRecord lowWatermark) {
checkState(
isLowWatermarkEvent(lowWatermark),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public LsnOffset(Lsn lsn) {
}

public Lsn getLcn() {
return Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY));
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.ArrayList;
import java.util.Map;

import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.buildSplitScanQuery;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.currentLsn;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.readTableSplitDataStatement;
Expand Down Expand Up @@ -90,6 +89,7 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getDatabaseSchema(),
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
sourceFetchContext.getSnapshotReceiver(),
split);
SnapshotSplitChangeEventSourceContext changeEventSourceContext =
new SnapshotSplitChangeEventSourceContext();
Expand All @@ -114,11 +114,15 @@ public void execute(Context context) throws Exception {
}
// execute stream read task
if (snapshotResult.isCompletedOrSkipped()) {
final SqlServerOffsetContext.Loader loader =
new SqlServerOffsetContext.Loader(sourceFetchContext.getDbzConnectorConfig());
final SqlServerOffsetContext streamOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());

final LsnSplitReadTask backfillBinlogReadTask =
createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
sourceFetchContext.getOffsetContext());
new SnapshotBinlogSplitChangeEventSourceContext(), streamOffsetContext);
} else {
taskRunning = false;
throw new IllegalStateException(
Expand Down Expand Up @@ -163,8 +167,8 @@ private LsnSplitReadTask createBackFillLsnSplitReadTask(
.build();
// task to read binlog and backfill for current split
return new LsnSplitReadTask(
new SqlServerConnectorConfig(dezConf),
createSqlServerConnection(context.getSourceConfig().getDbzConfiguration()),
context.getDbzConnectorConfig(),
context.getConnection(),
context.getMetaDataConnection(),
context.getDispatcher(),
context.getErrorHandler(),
Expand Down Expand Up @@ -199,6 +203,7 @@ public static class SqlServerSnapshotSplitReadTask extends AbstractSnapshotChang
private final SnapshotSplit snapshotSplit;
private final SqlServerOffsetContext offsetContext;
private final SnapshotProgressListener snapshotProgressListener;
private final EventDispatcher.SnapshotReceiver snapshotReceiver;

public SqlServerSnapshotSplitReadTask(
SqlServerConnectorConfig connectorConfig,
Expand All @@ -207,6 +212,7 @@ public SqlServerSnapshotSplitReadTask(
SqlServerDatabaseSchema databaseSchema,
SqlServerConnection jdbcConnection,
JdbcSourceEventDispatcher dispatcher,
EventDispatcher.SnapshotReceiver snapshotReceiver,
SnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotProgressListener);
this.offsetContext = previousOffset;
Expand All @@ -217,6 +223,7 @@ public SqlServerSnapshotSplitReadTask(
this.clock = Clock.SYSTEM;
this.snapshotSplit = snapshotSplit;
this.snapshotProgressListener = snapshotProgressListener;
this.snapshotReceiver = snapshotReceiver;
}

@Override
Expand Down Expand Up @@ -269,7 +276,7 @@ protected SnapshotResult doExecute(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark);
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(highWatermark);
dispatcher.dispatchWatermarkEvent(
offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH);
return SnapshotResult.completed(ctx.offset);
Expand All @@ -290,8 +297,6 @@ private void createDataEvents(
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
TableId tableId)
throws Exception {
EventDispatcher.SnapshotReceiver snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
LOG.debug("Snapshotting table {}", tableId);
createDataEventsForTable(
snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.debezium.data.Envelope.FieldName;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
private ChangeEventQueue<DataChangeEvent> queue;
private SqlServerTaskContext taskContext;
private TopicSelector<TableId> topicSelector;
private EventDispatcher.SnapshotReceiver snapshotReceiver;
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
private StreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;

Expand Down Expand Up @@ -143,6 +145,8 @@ public void configure(SourceSplitBase sourceSplitBase) {
metadataProvider,
schemaNameAdjuster);

this.snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();

final DefaultChangeEventSourceMetricsFactory changeEventSourceMetricsFactory =
new DefaultChangeEventSourceMetricsFactory();
this.snapshotChangeEventSourceMetrics =
Expand Down Expand Up @@ -217,6 +221,10 @@ public JdbcSourceEventDispatcher getDispatcher() {
return dispatcher;
}

public EventDispatcher.SnapshotReceiver getSnapshotReceiver() {
return snapshotReceiver;
}

@Override
public SqlServerOffsetContext getOffsetContext() {
return offsetContext;
Expand Down
Loading