Skip to content

Commit 5ec35a8

Browse files
authored
[common] Fix NPE of JsonConverter when job run on cluster(#470)
1 parent 625ad26 commit 5ec35a8

File tree

1 file changed

+19
-7
lines changed

1 file changed

+19
-7
lines changed

flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/JsonDebeziumDeserializationSchema.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,35 @@
3737
public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
3838

3939
private static final long serialVersionUID = 1L;
40-
private static final JsonConverter CONVERTER = new JsonConverter();
40+
41+
private transient JsonConverter jsonConverter;
42+
43+
/**
44+
* Configuration whether to enable {@link JsonConverterConfig.SCHEMAS_ENABLE_CONFIG} to include
45+
* schema in messages.
46+
*/
47+
private final Boolean includeSchema;
4148

4249
public JsonDebeziumDeserializationSchema() {
4350
this(false);
4451
}
4552

46-
public JsonDebeziumDeserializationSchema(boolean includeSchema) {
47-
final HashMap<String, Object> configs = new HashMap<>();
48-
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
49-
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
50-
CONVERTER.configure(configs);
53+
public JsonDebeziumDeserializationSchema(Boolean includeSchema) {
54+
this.includeSchema = includeSchema;
5155
}
5256

5357
@Override
5458
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
59+
if (jsonConverter == null) {
60+
// initialize jsonConverter
61+
jsonConverter = new JsonConverter();
62+
final HashMap<String, Object> configs = new HashMap<>(2);
63+
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
64+
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
65+
jsonConverter.configure(configs);
66+
}
5567
byte[] bytes =
56-
CONVERTER.fromConnectData(record.topic(), record.valueSchema(), record.value());
68+
jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
5769
out.collect(new String(bytes));
5870
}
5971

0 commit comments

Comments
 (0)