Skip to content

Commit cc5d897

Browse files
committed
[oracle] Fix Number and Float type#552
1 parent 0bd39e5 commit cc5d897

File tree

2 files changed

+243
-0
lines changed

2 files changed

+243
-0
lines changed
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.ververica.cdc.connectors.oracle.table;
20+
21+
import org.apache.flink.table.types.logical.LogicalType;
22+
23+
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
24+
import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory;
25+
import io.debezium.data.SpecialValueDecimal;
26+
import io.debezium.data.VariableScaleDecimal;
27+
import org.apache.kafka.connect.data.Schema;
28+
import org.apache.kafka.connect.data.Struct;
29+
30+
import java.math.BigDecimal;
31+
import java.time.ZoneId;
32+
import java.util.Optional;
33+
34+
/** Used to create {@link DeserializationRuntimeConverterFactory} specified to Oracle. */
35+
public class OracleDeserializationConverterFactory {
36+
37+
public static DeserializationRuntimeConverterFactory instance() {
38+
return new DeserializationRuntimeConverterFactory() {
39+
40+
private static final long serialVersionUID = 1L;
41+
42+
@Override
43+
public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
44+
LogicalType logicalType, ZoneId serverTimeZone) {
45+
return wrapNumericConverter(createNumericConverter(logicalType));
46+
}
47+
};
48+
}
49+
50+
/** Creates a runtime converter which assuming input object is not null. */
51+
private static Optional<DeserializationRuntimeConverter> createNumericConverter(
52+
LogicalType type) {
53+
switch (type.getTypeRoot()) {
54+
case BOOLEAN:
55+
return createBooleanConverter();
56+
case TINYINT:
57+
return createByteConverter();
58+
case SMALLINT:
59+
return createShortConverter();
60+
case INTEGER:
61+
return createIntegerConverter();
62+
case BIGINT:
63+
return createLongConverter();
64+
case FLOAT:
65+
return createFloatConverter();
66+
case DOUBLE:
67+
return createDoubleConverter();
68+
default:
69+
// fallback to default converter
70+
return Optional.empty();
71+
}
72+
}
73+
74+
private static Optional<DeserializationRuntimeConverter> wrapNumericConverter(
75+
Optional<DeserializationRuntimeConverter> converterOptional) {
76+
return converterOptional.map(
77+
converter ->
78+
new DeserializationRuntimeConverter() {
79+
private static final long serialVersionUID = 1L;
80+
81+
@Override
82+
public Object convert(Object dbzObj, Schema schema) throws Exception {
83+
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
84+
SpecialValueDecimal decimal =
85+
VariableScaleDecimal.toLogical((Struct) dbzObj);
86+
return converter.convert(
87+
decimal.getDecimalValue().orElse(BigDecimal.ZERO),
88+
schema);
89+
}
90+
return converter.convert(dbzObj, schema);
91+
}
92+
});
93+
}
94+
95+
private static Optional<DeserializationRuntimeConverter> createBooleanConverter() {
96+
return Optional.of(
97+
new DeserializationRuntimeConverter() {
98+
99+
private static final long serialVersionUID = 1L;
100+
101+
@Override
102+
public Object convert(Object dbzObj, Schema schema) {
103+
if (dbzObj instanceof Boolean) {
104+
return dbzObj;
105+
} else if (dbzObj instanceof Byte) {
106+
return (byte) dbzObj != 0;
107+
} else if (dbzObj instanceof Short) {
108+
return (short) dbzObj != 0;
109+
} else if (dbzObj instanceof BigDecimal) {
110+
return ((BigDecimal) dbzObj).shortValue() != 0;
111+
} else {
112+
return Boolean.parseBoolean(dbzObj.toString());
113+
}
114+
}
115+
});
116+
}
117+
118+
private static Optional<DeserializationRuntimeConverter> createByteConverter() {
119+
return Optional.of(
120+
new DeserializationRuntimeConverter() {
121+
122+
private static final long serialVersionUID = 1L;
123+
124+
@Override
125+
public Object convert(Object dbzObj, Schema schema) {
126+
if (dbzObj instanceof Byte) {
127+
return dbzObj;
128+
} else if (dbzObj instanceof BigDecimal) {
129+
return ((BigDecimal) dbzObj).byteValue();
130+
} else {
131+
return Byte.parseByte(dbzObj.toString());
132+
}
133+
}
134+
});
135+
}
136+
137+
private static Optional<DeserializationRuntimeConverter> createShortConverter() {
138+
return Optional.of(
139+
new DeserializationRuntimeConverter() {
140+
141+
private static final long serialVersionUID = 1L;
142+
143+
@Override
144+
public Object convert(Object dbzObj, Schema schema) {
145+
if (dbzObj instanceof Byte) {
146+
return ((Byte) dbzObj).shortValue();
147+
} else if (dbzObj instanceof Short) {
148+
return dbzObj;
149+
} else if (dbzObj instanceof BigDecimal) {
150+
return ((BigDecimal) dbzObj).shortValue();
151+
} else {
152+
return Short.parseShort(dbzObj.toString());
153+
}
154+
}
155+
});
156+
}
157+
158+
private static Optional<DeserializationRuntimeConverter> createIntegerConverter() {
159+
return Optional.of(
160+
new DeserializationRuntimeConverter() {
161+
162+
private static final long serialVersionUID = 1L;
163+
164+
@Override
165+
public Object convert(Object dbzObj, Schema schema) {
166+
if (dbzObj instanceof Integer) {
167+
return dbzObj;
168+
} else if (dbzObj instanceof Long) {
169+
return ((Long) dbzObj).intValue();
170+
} else if (dbzObj instanceof BigDecimal) {
171+
return ((BigDecimal) dbzObj).intValue();
172+
} else {
173+
return Integer.parseInt(dbzObj.toString());
174+
}
175+
}
176+
});
177+
}
178+
179+
private static Optional<DeserializationRuntimeConverter> createLongConverter() {
180+
return Optional.of(
181+
new DeserializationRuntimeConverter() {
182+
183+
private static final long serialVersionUID = 1L;
184+
185+
@Override
186+
public Object convert(Object dbzObj, Schema schema) {
187+
if (dbzObj instanceof Integer) {
188+
return ((Integer) dbzObj).longValue();
189+
} else if (dbzObj instanceof Long) {
190+
return dbzObj;
191+
} else if (dbzObj instanceof BigDecimal) {
192+
return ((BigDecimal) dbzObj).longValue();
193+
} else {
194+
return Long.parseLong(dbzObj.toString());
195+
}
196+
}
197+
});
198+
}
199+
200+
private static Optional<DeserializationRuntimeConverter> createFloatConverter() {
201+
return Optional.of(
202+
new DeserializationRuntimeConverter() {
203+
204+
private static final long serialVersionUID = 1L;
205+
206+
@Override
207+
public Object convert(Object dbzObj, Schema schema) {
208+
if (dbzObj instanceof Float) {
209+
return dbzObj;
210+
} else if (dbzObj instanceof Double) {
211+
return ((Double) dbzObj).floatValue();
212+
} else if (dbzObj instanceof BigDecimal) {
213+
return ((BigDecimal) dbzObj).floatValue();
214+
} else {
215+
return Float.parseFloat(dbzObj.toString());
216+
}
217+
}
218+
});
219+
}
220+
221+
private static Optional<DeserializationRuntimeConverter> createDoubleConverter() {
222+
return Optional.of(
223+
new DeserializationRuntimeConverter() {
224+
225+
private static final long serialVersionUID = 1L;
226+
227+
@Override
228+
public Object convert(Object dbzObj, Schema schema) {
229+
if (dbzObj instanceof Float) {
230+
return ((Float) dbzObj).doubleValue();
231+
} else if (dbzObj instanceof Double) {
232+
return dbzObj;
233+
} else if (dbzObj instanceof BigDecimal) {
234+
return ((BigDecimal) dbzObj).doubleValue();
235+
} else {
236+
return Double.parseDouble(dbzObj.toString());
237+
}
238+
}
239+
});
240+
}
241+
}

flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
9898
RowDataDebeziumDeserializeSchema.newBuilder()
9999
.setPhysicalRowType(rowType)
100100
.setResultTypeInfo(typeInfo)
101+
.setUserDefinedConverterFactory(
102+
OracleDeserializationConverterFactory.instance())
101103
.build();
102104
OracleSource.Builder<RowData> builder =
103105
OracleSource.<RowData>builder()

0 commit comments

Comments
 (0)