|
| 1 | +/* |
| 2 | + * Copyright 2022 Ververica Inc. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package com.ververica.cdc.connectors.oracle.source.read.fetch; |
| 18 | + |
| 19 | +import org.apache.flink.table.api.DataTypes; |
| 20 | +import org.apache.flink.table.types.DataType; |
| 21 | + |
| 22 | +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; |
| 23 | +import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect; |
| 24 | +import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; |
| 25 | +import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit; |
| 26 | +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; |
| 27 | +import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher; |
| 28 | +import com.ververica.cdc.connectors.oracle.source.OracleDialect; |
| 29 | +import com.ververica.cdc.connectors.oracle.source.OracleSourceTestBase; |
| 30 | +import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig; |
| 31 | +import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory; |
| 32 | +import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask; |
| 33 | +import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext; |
| 34 | +import com.ververica.cdc.connectors.oracle.utils.OracleTestUtils; |
| 35 | +import com.ververica.cdc.connectors.oracle.utils.RecordsFormatter; |
| 36 | +import io.debezium.connector.oracle.OracleConnection; |
| 37 | +import io.debezium.data.Envelope; |
| 38 | +import io.debezium.jdbc.JdbcConnection; |
| 39 | +import io.debezium.pipeline.EventDispatcher; |
| 40 | +import io.debezium.pipeline.spi.OffsetContext; |
| 41 | +import io.debezium.relational.TableId; |
| 42 | +import io.debezium.schema.DataCollectionSchema; |
| 43 | +import org.apache.kafka.connect.data.Struct; |
| 44 | +import org.apache.kafka.connect.header.ConnectHeaders; |
| 45 | +import org.apache.kafka.connect.source.SourceRecord; |
| 46 | +import org.junit.Test; |
| 47 | + |
| 48 | +import java.sql.SQLException; |
| 49 | +import java.util.ArrayList; |
| 50 | +import java.util.Arrays; |
| 51 | +import java.util.Collection; |
| 52 | +import java.util.Iterator; |
| 53 | +import java.util.List; |
| 54 | +import java.util.Properties; |
| 55 | +import java.util.function.Supplier; |
| 56 | +import java.util.stream.Collectors; |
| 57 | + |
| 58 | +import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection; |
| 59 | +import static org.junit.Assert.assertNotNull; |
| 60 | +import static org.junit.Assert.assertTrue; |
| 61 | + |
| 62 | +/** Tests for {@link OracleScanFetchTask}. */ |
| 63 | +public class OracleScanFetchTaskTest extends OracleSourceTestBase { |
| 64 | + |
| 65 | + @Test |
| 66 | + public void testChangingDataInSnapshotScan() throws Exception { |
| 67 | + OracleTestUtils.createAndInitialize(OracleTestUtils.ORACLE_CONTAINER, "customer.sql"); |
| 68 | + |
| 69 | + String tableName = ORACLE_SCHEMA + ".CUSTOMERS"; |
| 70 | + |
| 71 | + OracleSourceConfigFactory sourceConfigFactory = |
| 72 | + getConfigFactory(new String[] {tableName}, 10); |
| 73 | + OracleSourceConfig sourceConfig = sourceConfigFactory.create(0); |
| 74 | + OracleDialect oracleDialect = new OracleDialect(sourceConfigFactory); |
| 75 | + |
| 76 | + String[] changingDataSql = |
| 77 | + new String[] { |
| 78 | + "UPDATE " + tableName + " SET address = 'Hangzhou' where id = 103", |
| 79 | + "DELETE FROM " + tableName + " where id = 102", |
| 80 | + "INSERT INTO " + tableName + " VALUES(102, 'user_2','Shanghai','123567891234')", |
| 81 | + "UPDATE " + tableName + " SET address = 'Shanghai' where id = 103", |
| 82 | + "UPDATE " + tableName + " SET address = 'Hangzhou' where id = 110", |
| 83 | + "UPDATE " + tableName + " SET address = 'Hangzhou' where id = 111", |
| 84 | + }; |
| 85 | + |
| 86 | + MakeChangeEventTaskContext makeChangeEventTaskContext = |
| 87 | + new MakeChangeEventTaskContext( |
| 88 | + sourceConfig, |
| 89 | + oracleDialect, |
| 90 | + createOracleConnection( |
| 91 | + sourceConfig.getDbzConnectorConfig().getJdbcConfig()), |
| 92 | + () -> executeSql(sourceConfig, changingDataSql)); |
| 93 | + |
| 94 | + final DataType dataType = |
| 95 | + DataTypes.ROW( |
| 96 | + DataTypes.FIELD("ID", DataTypes.BIGINT()), |
| 97 | + DataTypes.FIELD("NAME", DataTypes.STRING()), |
| 98 | + DataTypes.FIELD("ADDRESS", DataTypes.STRING()), |
| 99 | + DataTypes.FIELD("PHONE_NUMBER", DataTypes.STRING())); |
| 100 | + |
| 101 | + List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, oracleDialect); |
| 102 | + |
| 103 | + String[] expected = |
| 104 | + new String[] { |
| 105 | + "+I[101, user_1, Shanghai, 123567891234]", |
| 106 | + "+I[102, user_2, Shanghai, 123567891234]", |
| 107 | + "+I[103, user_3, Shanghai, 123567891234]", |
| 108 | + "+I[109, user_4, Shanghai, 123567891234]", |
| 109 | + "+I[110, user_5, Hangzhou, 123567891234]", |
| 110 | + "+I[111, user_6, Hangzhou, 123567891234]", |
| 111 | + "+I[118, user_7, Shanghai, 123567891234]", |
| 112 | + "+I[121, user_8, Shanghai, 123567891234]", |
| 113 | + "+I[123, user_9, Shanghai, 123567891234]", |
| 114 | + }; |
| 115 | + |
| 116 | + List<String> actual = |
| 117 | + readTableSnapshotSplits(snapshotSplits, makeChangeEventTaskContext, 1, dataType); |
| 118 | + assertEqualsInAnyOrder(Arrays.asList(expected), actual); |
| 119 | + } |
| 120 | + |
| 121 | + private List<String> readTableSnapshotSplits( |
| 122 | + List<SnapshotSplit> snapshotSplits, |
| 123 | + OracleSourceFetchTaskContext taskContext, |
| 124 | + int scanSplitsNum, |
| 125 | + DataType dataType) |
| 126 | + throws Exception { |
| 127 | + IncrementalSourceScanFetcher sourceScanFetcher = |
| 128 | + new IncrementalSourceScanFetcher(taskContext, 0); |
| 129 | + |
| 130 | + List<SourceRecord> result = new ArrayList<>(); |
| 131 | + for (int i = 0; i < scanSplitsNum; i++) { |
| 132 | + SnapshotSplit sqlSplit = snapshotSplits.get(i); |
| 133 | + if (sourceScanFetcher.isFinished()) { |
| 134 | + sourceScanFetcher.submitTask( |
| 135 | + taskContext.getDataSourceDialect().createFetchTask(sqlSplit)); |
| 136 | + } |
| 137 | + Iterator<SourceRecords> res; |
| 138 | + while ((res = sourceScanFetcher.pollSplitRecords()) != null) { |
| 139 | + while (res.hasNext()) { |
| 140 | + SourceRecords sourceRecords = res.next(); |
| 141 | + result.addAll(sourceRecords.getSourceRecordList()); |
| 142 | + } |
| 143 | + } |
| 144 | + } |
| 145 | + |
| 146 | + sourceScanFetcher.close(); |
| 147 | + |
| 148 | + assertNotNull(sourceScanFetcher.getExecutorService()); |
| 149 | + assertTrue(sourceScanFetcher.getExecutorService().isTerminated()); |
| 150 | + |
| 151 | + return formatResult(result, dataType); |
| 152 | + } |
| 153 | + |
| 154 | + private List<String> formatResult(List<SourceRecord> records, DataType dataType) { |
| 155 | + final RecordsFormatter formatter = new RecordsFormatter(dataType); |
| 156 | + return formatter.format(records); |
| 157 | + } |
| 158 | + |
| 159 | + private List<SnapshotSplit> getSnapshotSplits( |
| 160 | + OracleSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect) { |
| 161 | + String databaseName = sourceConfig.getDatabaseList().get(0); |
| 162 | + List<TableId> tableIdList = |
| 163 | + sourceConfig.getTableList().stream() |
| 164 | + .map(tableId -> TableId.parse(databaseName + "." + tableId)) |
| 165 | + .collect(Collectors.toList()); |
| 166 | + final ChunkSplitter chunkSplitter = sourceDialect.createChunkSplitter(sourceConfig); |
| 167 | + |
| 168 | + List<SnapshotSplit> snapshotSplitList = new ArrayList<>(); |
| 169 | + for (TableId table : tableIdList) { |
| 170 | + Collection<SnapshotSplit> snapshotSplits = chunkSplitter.generateSplits(table); |
| 171 | + snapshotSplitList.addAll(snapshotSplits); |
| 172 | + } |
| 173 | + return snapshotSplitList; |
| 174 | + } |
| 175 | + |
| 176 | + public static OracleSourceConfigFactory getConfigFactory( |
| 177 | + String[] captureTables, int splitSize) { |
| 178 | + Properties debeziumProperties = new Properties(); |
| 179 | + debeziumProperties.setProperty("log.mining.strategy", "online_catalog"); |
| 180 | + debeziumProperties.setProperty("log.mining.continuous.mine", "true"); |
| 181 | + |
| 182 | + return (OracleSourceConfigFactory) |
| 183 | + new OracleSourceConfigFactory() |
| 184 | + .hostname(ORACLE_CONTAINER.getHost()) |
| 185 | + .port(ORACLE_CONTAINER.getOraclePort()) |
| 186 | + .username(ORACLE_CONTAINER.getUsername()) |
| 187 | + .password(ORACLE_CONTAINER.getPassword()) |
| 188 | + .databaseList(ORACLE_DATABASE) |
| 189 | + .tableList(captureTables) |
| 190 | + .debeziumProperties(debeziumProperties) |
| 191 | + .splitSize(splitSize); |
| 192 | + } |
| 193 | + |
| 194 | + private boolean executeSql(OracleSourceConfig sourceConfig, String[] sqlStatements) { |
| 195 | + JdbcConnection connection = |
| 196 | + createOracleConnection(sourceConfig.getDbzConnectorConfig().getJdbcConfig()); |
| 197 | + try { |
| 198 | + connection.setAutoCommit(false); |
| 199 | + connection.execute(sqlStatements); |
| 200 | + connection.commit(); |
| 201 | + } catch (SQLException e) { |
| 202 | + LOG.error("Failed to execute sql statements.", e); |
| 203 | + return false; |
| 204 | + } |
| 205 | + return true; |
| 206 | + } |
| 207 | + |
| 208 | + class MakeChangeEventTaskContext extends OracleSourceFetchTaskContext { |
| 209 | + |
| 210 | + private Supplier<Boolean> makeChangeEventFunction; |
| 211 | + |
| 212 | + public MakeChangeEventTaskContext( |
| 213 | + JdbcSourceConfig jdbcSourceConfig, |
| 214 | + OracleDialect oracleDialect, |
| 215 | + OracleConnection connection, |
| 216 | + Supplier<Boolean> makeChangeEventFunction) { |
| 217 | + super(jdbcSourceConfig, oracleDialect, connection); |
| 218 | + this.makeChangeEventFunction = makeChangeEventFunction; |
| 219 | + } |
| 220 | + |
| 221 | + @Override |
| 222 | + public EventDispatcher.SnapshotReceiver getSnapshotReceiver() { |
| 223 | + EventDispatcher.SnapshotReceiver snapshotReceiver = super.getSnapshotReceiver(); |
| 224 | + return new EventDispatcher.SnapshotReceiver() { |
| 225 | + |
| 226 | + @Override |
| 227 | + public void changeRecord( |
| 228 | + DataCollectionSchema schema, |
| 229 | + Envelope.Operation operation, |
| 230 | + Object key, |
| 231 | + Struct value, |
| 232 | + OffsetContext offset, |
| 233 | + ConnectHeaders headers) |
| 234 | + throws InterruptedException { |
| 235 | + snapshotReceiver.changeRecord(schema, operation, key, value, offset, headers); |
| 236 | + } |
| 237 | + |
| 238 | + @Override |
| 239 | + public void completeSnapshot() throws InterruptedException { |
| 240 | + snapshotReceiver.completeSnapshot(); |
| 241 | + // make change events |
| 242 | + makeChangeEventFunction.get(); |
| 243 | + Thread.sleep(120 * 1000); |
| 244 | + } |
| 245 | + }; |
| 246 | + } |
| 247 | + } |
| 248 | +} |
0 commit comments