Skip to content

Commit f62be30

Browse files
committed
[oracle] Use oracle connection in context for each reader subtask (apache#2254)
1 parent 4c1a19c commit f62be30

File tree

4 files changed

+7
-46
lines changed

4 files changed

+7
-46
lines changed

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,7 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
141141
@Override
142142
public OracleSourceFetchTaskContext createFetchTaskContext(
143143
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
144-
final OracleConnection jdbcConnection =
145-
createOracleConnection(taskSourceConfig.getDbzConfiguration());
146-
return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection);
144+
return new OracleSourceFetchTaskContext(taskSourceConfig, this);
147145
}
148146

149147
@Override

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import io.debezium.pipeline.source.spi.ChangeEventSource;
3838
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
3939
import io.debezium.pipeline.spi.ChangeRecordEmitter;
40-
import io.debezium.pipeline.spi.OffsetContext;
4140
import io.debezium.pipeline.spi.SnapshotResult;
4241
import io.debezium.relational.RelationalSnapshotChangeEventSource;
4342
import io.debezium.relational.SnapshotChangeRecordEmitter;
@@ -59,7 +58,6 @@
5958
import java.util.Map;
6059

6160
import static com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask.RedoLogSplitReadTask;
62-
import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection;
6361
import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.currentRedoLogOffset;
6462
import static com.ververica.cdc.connectors.oracle.source.utils.OracleUtils.buildSplitScanQuery;
6563
import static com.ververica.cdc.connectors.oracle.source.utils.OracleUtils.readTableSplitDataStatement;
@@ -163,10 +161,6 @@ private StreamSplit createBackfillRedoLogSplit(
163161

164162
private RedoLogSplitReadTask createBackfillRedoLogReadTask(
165163
StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
166-
OracleConnectorConfig oracleConnectorConfig =
167-
context.getSourceConfig().getDbzConnectorConfig();
168-
final OffsetContext.Loader<OracleOffsetContext> loader =
169-
new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
170164
// we should only capture events for the current table,
171165
// otherwise, we may can't find corresponding schema
172166
Configuration dezConf =
@@ -180,7 +174,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask(
180174
// task to read binlog and backfill for current split
181175
return new RedoLogSplitReadTask(
182176
new OracleConnectorConfig(dezConf),
183-
createOracleConnection(context.getSourceConfig().getDbzConfiguration()),
177+
context.getConnection(),
184178
context.getDispatcher(),
185179
context.getErrorHandler(),
186180
context.getDatabaseSchema(),

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.time.Instant;
6868
import java.util.Map;
6969

70+
import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection;
7071
import static com.ververica.cdc.connectors.oracle.util.ChunkUtils.getChunkKeyColumn;
7172

7273
/** The context for fetch task that fetching data of snapshot split from Oracle data source. */
@@ -90,11 +91,9 @@ public class OracleSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
9091
private OracleErrorHandler errorHandler;
9192

9293
public OracleSourceFetchTaskContext(
93-
JdbcSourceConfig sourceConfig,
94-
JdbcDataSourceDialect dataSourceDialect,
95-
OracleConnection connection) {
94+
JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
9695
super(sourceConfig, dataSourceDialect);
97-
this.connection = connection;
96+
this.connection = createOracleConnection(sourceConfig.getDbzConfiguration());
9897
this.metadataProvider = new OracleEventMetadataProvider();
9998
}
10099

@@ -108,7 +107,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
108107
.getDbzConfiguration()
109108
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
110109
sourceSplitBase.getTableSchemas().values());
111-
this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig);
110+
this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig, connection);
112111
// todo logMiner or xStream
113112
this.offsetContext =
114113
loadStartingOffsetState(

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
import org.apache.flink.table.types.logical.RowType;
2020

2121
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
22-
import io.debezium.config.Configuration;
2322
import io.debezium.connector.oracle.OracleConnection;
2423
import io.debezium.connector.oracle.OracleConnectorConfig;
2524
import io.debezium.connector.oracle.OracleDatabaseSchema;
2625
import io.debezium.connector.oracle.OracleDefaultValueConverter;
2726
import io.debezium.connector.oracle.OracleTopicSelector;
2827
import io.debezium.connector.oracle.OracleValueConverters;
2928
import io.debezium.connector.oracle.StreamingAdapter;
30-
import io.debezium.jdbc.JdbcConfiguration;
3129
import io.debezium.jdbc.JdbcConnection;
3230
import io.debezium.relational.TableId;
3331
import io.debezium.schema.TopicSelector;
@@ -243,12 +241,9 @@ public static PreparedStatement readTableSplitDataStatement(
243241

244242
/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
245243
public static OracleDatabaseSchema createOracleDatabaseSchema(
246-
OracleConnectorConfig dbzOracleConfig) {
244+
OracleConnectorConfig dbzOracleConfig, OracleConnection oracleConnection) {
247245
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
248246
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
249-
OracleConnection oracleConnection =
250-
OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig());
251-
// OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig);
252247
OracleValueConverters oracleValueConverters =
253248
new OracleValueConverters(dbzOracleConfig, oracleConnection);
254249
OracleDefaultValueConverter defaultValueConverter =
@@ -264,31 +259,6 @@ public static OracleDatabaseSchema createOracleDatabaseSchema(
264259
tableNameCaseSensitivity);
265260
}
266261

267-
/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
268-
public static OracleDatabaseSchema createOracleDatabaseSchema(
269-
OracleConnectorConfig dbzOracleConfig, boolean tableIdCaseInsensitive) {
270-
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
271-
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
272-
OracleConnection oracleConnection =
273-
OracleConnectionUtils.createOracleConnection(
274-
JdbcConfiguration.adapt((Configuration) dbzOracleConfig));
275-
OracleValueConverters oracleValueConverters =
276-
new OracleValueConverters(dbzOracleConfig, oracleConnection);
277-
OracleDefaultValueConverter defaultValueConverter =
278-
new OracleDefaultValueConverter(oracleValueConverters, oracleConnection);
279-
StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
280-
tableIdCaseInsensitive
281-
? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE
282-
: StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE;
283-
return new OracleDatabaseSchema(
284-
dbzOracleConfig,
285-
oracleValueConverters,
286-
defaultValueConverter,
287-
schemaNameAdjuster,
288-
topicSelector,
289-
tableNameCaseSensitivity);
290-
}
291-
292262
public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) {
293263
return getRedoLogPosition(dataRecord.sourceOffset());
294264
}

0 commit comments

Comments
 (0)