Skip to content

Commit a3c0edd

Browse files
committed
address comment
1 parent c048f32 commit a3c0edd

File tree

6 files changed

+21
-16
lines changed

6 files changed

+21
-16
lines changed

docs/content/connectors/sqlserver-cdc.md

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,19 +138,12 @@ Connector Options
138138
<td>String</td>
139139
<td>Database name of the SQLServer database to monitor.</td>
140140
</tr>
141-
<tr>
142-
<td>schema-name</td>
143-
<td>required</td>
144-
<td style="word-wrap: break-word;">(none)</td>
145-
<td>String</td>
146-
<td>Schema name of the SQLServer database to monitor.</td>
147-
</tr>
148141
<tr>
149142
<td>table-name</td>
150143
<td>required</td>
151144
<td style="word-wrap: break-word;">(none)</td>
152145
<td>String</td>
153-
<td>Table name of the SQLServer database to monitor.</td>
146+
<td>Table name of the SQLServer database to monitor, e.g.: "db1.table1"</td>
154147
</tr>
155148
<tr>
156149
<td>port</td>

flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
* Inspired from {@link io.debezium.relational.history.MemoryDatabaseHistory} but we will store the
3636
* HistoryRecords in Flink's state for persistence.
3737
*
38-
* <p>Note: This is not a clean solution because we depend on a global variable and all the history
38+
* <p>Note: This is not a clean solution because we depends on a global variable and all the history
3939
* records will be stored in state (grow infinitely). We may need to come up with a
4040
* FileSystemDatabaseHistory in the future to store history in HDFS.
4141
*/

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.debezium.connector.oracle.OracleDatabaseSchema;
3131
import io.debezium.connector.oracle.OracleOffsetContext;
3232
import io.debezium.connector.oracle.OracleValueConverters;
33+
import io.debezium.connector.oracle.logminer.LogMinerOracleOffsetContextLoader;
3334
import io.debezium.heartbeat.Heartbeat;
3435
import io.debezium.pipeline.EventDispatcher;
3536
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
@@ -152,6 +153,12 @@ private StreamSplit createBackfillRedoLogSplit(
152153

153154
private RedoLogSplitReadTask createBackfillRedoLogReadTask(
154155
StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
156+
OracleConnectorConfig oracleConnectorConfig =
157+
context.getSourceConfig().getDbzConnectorConfig();
158+
final OffsetContext.Loader<OracleOffsetContext> loader =
159+
new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
160+
final OracleOffsetContext oracleOffsetContext =
161+
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
155162
// we should only capture events for the current table,
156163
// otherwise, we may can't find corresponding schema
157164
Configuration dezConf =
@@ -287,13 +294,13 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
287294
@Override
288295
protected SnapshotContext prepare(ChangeEventSourceContext changeEventSourceContext)
289296
throws Exception {
290-
return new OracleSnapshotContext();
297+
return new MySqlSnapshotContext();
291298
}
292299

293-
private static class OracleSnapshotContext
300+
private static class MySqlSnapshotContext
294301
extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext {
295302

296-
public OracleSnapshotContext() throws SQLException {
303+
public MySqlSnapshotContext() throws SQLException {
297304
super("");
298305
}
299306
}

flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private void testOracleParallelSource(
129129

130130
String sourceDDL =
131131
format(
132-
"CREATE TABLE customers ("
132+
"CREATE TABLE products ("
133133
+ " ID INT NOT NULL,"
134134
+ " NAME STRING,"
135135
+ " ADDRESS STRING,"
@@ -183,7 +183,7 @@ private void testOracleParallelSource(
183183
"+I[2000, user_21, Shanghai, 123567891234]"
184184
};
185185
tEnv.executeSql(sourceDDL);
186-
TableResult tableResult = tEnv.executeSql("select * from customers");
186+
TableResult tableResult = tEnv.executeSql("select * from products");
187187
CloseableIterator<Row> iterator = tableResult.collect();
188188
JobID jobId = tableResult.getJobClient().get().getJobID();
189189
List<String> expectedSnapshotData = new ArrayList<>();

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ private static DataType convertFromColumn(Column column) {
6565
return DataTypes.DATE();
6666
case Types.TIMESTAMP:
6767
case Types.TIMESTAMP_WITH_TIMEZONE:
68+
return column.length() >= 0
69+
? DataTypes.TIMESTAMP(column.length())
70+
: DataTypes.TIMESTAMP();
6871
case Types.BOOLEAN:
6972
return DataTypes.BOOLEAN();
7073
default:

flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@
4646
import java.util.stream.Collectors;
4747

4848
/**
49-
* A {@link StreamingChangeEventSource} based on SQL Server change data capture functionality. A
50-
* main loop polls database DDL change and change data tables and turns them into change events.
49+
* Copied from Debezium project(1.6.4.final) to add method {@link
50+
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerOffsetContext)}. A {@link
51+
* StreamingChangeEventSource} based on SQL Server change data capture functionality. A main loop
52+
* polls database DDL change and change data tables and turns them into change events.
5153
*
5254
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
5355
* monitors source table and write changes from the table into the change table.

0 commit comments

Comments
 (0)