Skip to content

Commit e110448

Browse files
committed
filter change record with table name
1 parent 921d987 commit e110448

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public class OceanBaseRichSourceFunction<T> extends RichSourceFunction<T>
9393
private final OceanBaseDeserializationSchema<T> deserializer;
9494

9595
private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false);
96-
private final List<LogMessage> logMessageBuffer = new LinkedList<>();
96+
private final List<OceanBaseRecord> changeRecordBuffer = new LinkedList<>();
9797

9898
private transient Set<String> tableSet;
9999
private transient volatile long resolvedTimestamp;
@@ -329,21 +329,23 @@ public void notify(LogMessage message) {
329329
if (!started) {
330330
break;
331331
}
332-
logMessageBuffer.add(message);
332+
OceanBaseRecord record = getChangeRecord(message);
333+
if (record != null) {
334+
changeRecordBuffer.add(record);
335+
}
333336
break;
334337
case COMMIT:
335338
// flush buffer after snapshot completed
336339
if (!shouldReadSnapshot() || snapshotCompleted.get()) {
337-
logMessageBuffer.forEach(
338-
msg -> {
340+
changeRecordBuffer.forEach(
341+
r -> {
339342
try {
340-
deserializer.deserialize(
341-
getChangeRecord(msg), outputCollector);
343+
deserializer.deserialize(r, outputCollector);
342344
} catch (Exception e) {
343345
throw new FlinkRuntimeException(e);
344346
}
345347
});
346-
logMessageBuffer.clear();
348+
changeRecordBuffer.clear();
347349
long timestamp = getCheckpointTimestamp(message);
348350
if (timestamp > resolvedTimestamp) {
349351
resolvedTimestamp = timestamp;
@@ -379,6 +381,9 @@ public void onException(LogProxyClientException e) {
379381

380382
private OceanBaseRecord getChangeRecord(LogMessage message) {
381383
String databaseName = message.getDbName().replace(tenantName + ".", "");
384+
if (!tableSet.contains(String.format("%s.%s", databaseName, message.getTableName()))) {
385+
return null;
386+
}
382387
OceanBaseRecord.SourceInfo sourceInfo =
383388
new OceanBaseRecord.SourceInfo(
384389
tenantName,

0 commit comments

Comments
 (0)