Skip to content

Commit 7f969d7

Browse files
committed
[oracle] close the oracle connection after the backfill task finished
1 parent 26b4445 commit 7f969d7

File tree

1 file changed

+21
-20
lines changed

1 file changed

+21
-20
lines changed

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import io.debezium.pipeline.source.spi.ChangeEventSource;
3838
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
3939
import io.debezium.pipeline.spi.ChangeRecordEmitter;
40-
import io.debezium.pipeline.spi.OffsetContext;
4140
import io.debezium.pipeline.spi.SnapshotResult;
4241
import io.debezium.relational.RelationalSnapshotChangeEventSource;
4342
import io.debezium.relational.SnapshotChangeRecordEmitter;
@@ -130,19 +129,23 @@ public void execute(Context context) throws Exception {
130129
}
131130
// execute redoLog read task
132131
if (snapshotResult.isCompletedOrSkipped()) {
133-
final RedoLogSplitReadTask backfillBinlogReadTask =
134-
createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext);
135-
136-
final LogMinerOracleOffsetContextLoader loader =
137-
new LogMinerOracleOffsetContextLoader(
138-
((OracleSourceFetchTaskContext) context).getDbzConnectorConfig());
139-
final OracleOffsetContext oracleOffsetContext =
140-
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
141-
backfillBinlogReadTask.execute(
142-
new SnapshotBinlogSplitChangeEventSourceContext(),
143-
sourceFetchContext.getPartition(),
144-
oracleOffsetContext);
145-
taskRunning = false;
132+
try (OracleConnection connection =
133+
createOracleConnection(
134+
sourceFetchContext.getSourceConfig().getDbzConfiguration())) {
135+
final RedoLogSplitReadTask backfillBinlogReadTask =
136+
createBackfillRedoLogReadTask(
137+
backfillBinlogSplit, sourceFetchContext, connection);
138+
final LogMinerOracleOffsetContextLoader loader =
139+
new LogMinerOracleOffsetContextLoader(
140+
((OracleSourceFetchTaskContext) context).getDbzConnectorConfig());
141+
final OracleOffsetContext oracleOffsetContext =
142+
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
143+
backfillBinlogReadTask.execute(
144+
new SnapshotBinlogSplitChangeEventSourceContext(),
145+
sourceFetchContext.getPartition(),
146+
oracleOffsetContext);
147+
taskRunning = false;
148+
}
146149
} else {
147150
taskRunning = false;
148151
throw new IllegalStateException(
@@ -162,11 +165,9 @@ private StreamSplit createBackfillRedoLogSplit(
162165
}
163166

164167
private RedoLogSplitReadTask createBackfillRedoLogReadTask(
165-
StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
166-
OracleConnectorConfig oracleConnectorConfig =
167-
context.getSourceConfig().getDbzConnectorConfig();
168-
final OffsetContext.Loader<OracleOffsetContext> loader =
169-
new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
168+
StreamSplit backfillBinlogSplit,
169+
OracleSourceFetchTaskContext context,
170+
OracleConnection connection) {
170171
// we should only capture events for the current table,
171172
// otherwise, we may can't find corresponding schema
172173
Configuration dezConf =
@@ -180,7 +181,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask(
180181
// task to read binlog and backfill for current split
181182
return new RedoLogSplitReadTask(
182183
new OracleConnectorConfig(dezConf),
183-
createOracleConnection(context.getSourceConfig().getDbzConfiguration()),
184+
connection,
184185
context.getDispatcher(),
185186
context.getErrorHandler(),
186187
context.getDatabaseSchema(),

0 commit comments

Comments
 (0)