Skip to content

Commit 921d987

Browse files
committed
merge converters for change event and snapshot and use real encoding for change event
1 parent 76bf651 commit 921d987

File tree

5 files changed

+73
-139
lines changed

5 files changed

+73
-139
lines changed

docs/content/connectors/oceanbase-cdc(ZH).md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,10 @@ public class OceanBaseSourceExample {
252252
.tableName("test_table")
253253
.hostname("127.0.0.1")
254254
.port(2881)
255+
.jdbcDriver("com.mysql.jdbc.Driver")
255256
.logProxyHost("127.0.0.1")
256257
.logProxyPort(2983)
257-
.serverTimeZone(serverTimezone)
258+
.serverTimeZone(serverTimeZone)
258259
.deserializer(deserializer)
259260
.build();
260261

docs/content/connectors/oceanbase-cdc.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,10 @@ public class OceanBaseSourceExample {
430430
.tableName("test_table")
431431
.hostname("127.0.0.1")
432432
.port(2881)
433+
.jdbcDriver("com.mysql.jdbc.Driver")
433434
.logProxyHost("127.0.0.1")
434435
.logProxyPort(2983)
435-
.serverTimeZone(serverTimezone)
436+
.serverTimeZone(serverTimeZone)
436437
.deserializer(deserializer)
437438
.build();
438439

flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseDeserializationRuntimeConverter.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,13 @@
1616

1717
package com.ververica.cdc.connectors.oceanbase.source;
1818

19-
import com.oceanbase.oms.logmessage.ByteString;
20-
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
21-
2219
import java.io.Serializable;
23-
import java.nio.charset.StandardCharsets;
2420

2521
/**
2622
* Runtime converter that converts objects of OceanBase into objects of Flink Table & SQL internal
2723
* data structures.
2824
*/
2925
public interface OceanBaseDeserializationRuntimeConverter extends Serializable {
3026

31-
default Object convert(Object object) throws Exception {
32-
if (object instanceof ByteString) {
33-
return convertChangeEvent(
34-
((ByteString) object).toString(StandardCharsets.UTF_8.name()));
35-
} else {
36-
return convertSnapshotEvent(object);
37-
}
38-
}
39-
40-
default Object convertSnapshotEvent(Object object) throws Exception {
41-
throw new NotImplementedException();
42-
}
43-
44-
default Object convertChangeEvent(String string) throws Exception {
45-
throw new NotImplementedException();
46-
}
27+
Object convert(Object object);
4728
}

0 commit comments

Comments
 (0)