Skip to content

Commit 24bfa61

Browse files
committed
feat(process): introduce record processor factory and enrich conversion/transform pipeline (#2796)
* feat(process): introduce flexible record processor factory and enrich conversion/transform pipeline - Add RecordProcessorFactory to support dynamic creation of processors based on schema and transform configs - Refactor RegistryConverterFactory for improved schema format handling and converter instantiation - Implement SchemaFormat, TableTopicConvertType, and TableTopicTransformType enums for config-driven processing - Enhance Converter interface and conversion records to include key, value, and timestamp fields - Refactor AvroRegistryConverter and ProtobufRegistryConverter to return unified RecordData objects - Add ProtoToAvroConverter for robust Protobuf-to-Avro conversion - Update transform chain: add KafkaMetadataTransform for metadata enrichment, refactor DebeziumUnwrapTransform - Update DefaultRecordProcessor and TransformContext to support partition-aware processing - Improve error handling and code clarity across conversion and transform modules
1 parent 8cc67c4 commit 24bfa61

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+5000
-767
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,5 +378,6 @@
378378
<suppress id="dontUseSystemExit"
379379
files="(BenchTool|S3Utils|AutoMQCLI).java"/>
380380
<suppress checks="ClassDataAbstractionCoupling" files="(StreamControlManagerTest|ControllerStreamManager).java"/>
381+
<suppress files="core[\/]src[\/]test[\/]java[\/]kafka[\/]automq[\/]table[\/]process[\/]proto[\/].*\.java$" checks=".*"/>
381382

382383
</suppressions>

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,39 @@ public class TopicConfig {
265265
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
266266
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
267267
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
268+
268269
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
269-
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
270+
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "[DEPRECATED] The table topic schema type configuration. " +
271+
"This configuration is deprecated and will be removed in a future release. " +
272+
"Please use the new separate converter and transform configurations instead. " +
273+
"Supported values: 'schemaless' (maps to convert.value.type=raw, transform.value.type=none), " +
274+
"'schema' (maps to convert.value.type=by_schema_id, transform.value.type=flatten).";
275+
276+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_CONFIG = "automq.table.topic.convert.value.type";
277+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_DOC = "How to parse Kafka record values. " +
278+
"Supported: 'raw', 'string', 'by_schema_id', 'by_latest_schema'. " +
279+
"Schema Registry URL required for 'by_schema_id' and 'by_latest_schema'.";
280+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_CONFIG = "automq.table.topic.convert.key.type";
281+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_DOC = "How to parse Kafka record keys. " +
282+
"Supported: 'raw', 'string', 'by_schema_id', 'by_latest_schema'. " +
283+
"Schema Registry URL required for 'by_schema_id' and 'by_latest_schema'.";
284+
285+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_BY_LATEST_SCHEMA_PREFIX = "automq.table.topic.convert.value.by_latest_schema.";
286+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_BY_LATEST_SCHEMA_PREFIX = "automq.table.topic.convert.key.by_latest_schema.";
287+
288+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_CONFIG = "automq.table.topic.convert.key.subject";
289+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_DOC = "The Schema Registry subject name for key schemas. " +
290+
"Defaults to '{topic-name}-key' if not specified.";
291+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_CONFIG = "automq.table.topic.convert.key.message.full.name";
292+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_DOC = "The fully qualified message name for Protobuf key schemas. " +
293+
"Used when schema contains multiple message types. Defaults to first message type.";
294+
295+
public static final String AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_CONFIG = "automq.table.topic.transform.value.type";
296+
public static final String AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_DOC = "Transformation to apply to the record value after conversion. " +
297+
"Supported: 'none', 'flatten' (extract fields from structured records), " +
298+
"'flatten_debezium' (process Debezium CDC events). " +
299+
"Note: 'flatten_debezium' requires schema-based conversion.";
300+
270301
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
271302
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
272303
+ "ex. [region, name]";

core/src/main/java/kafka/automq/table/deserializer/SchemaResolutionResolver.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
import kafka.automq.table.deserializer.proto.schema.MessageIndexes;
2323

24-
import org.apache.kafka.common.record.Record;
25-
2624
import java.nio.ByteBuffer;
2725

2826

@@ -45,7 +43,7 @@ public interface SchemaResolutionResolver {
4543
SchemaResolution resolve(String topic, ByteBuffer payload);
4644

4745

48-
int getSchemaId(String topic, Record record);
46+
int getSchemaId(String topic, ByteBuffer payload);
4947

5048
/**
5149
* Container class for resolved schema information.

core/src/main/java/kafka/automq/table/deserializer/proto/HeaderBasedSchemaResolutionResolver.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import kafka.automq.table.process.exception.InvalidDataException;
2525

2626
import org.apache.kafka.common.errors.SerializationException;
27-
import org.apache.kafka.common.record.Record;
2827

2928
import java.nio.ByteBuffer;
3029

@@ -59,9 +58,9 @@ public SchemaResolution resolve(String topic, ByteBuffer payload) {
5958
}
6059

6160
@Override
62-
public int getSchemaId(String topic, Record record) {
61+
public int getSchemaId(String topic, ByteBuffer payload) {
6362
// io.confluent.kafka.serializers.DeserializationContext#constructor
64-
return readSchemaId(record.value().duplicate());
63+
return readSchemaId(payload.duplicate());
6564
}
6665

6766
private int readSchemaId(ByteBuffer buffer) {

core/src/main/java/kafka/automq/table/deserializer/proto/LatestSchemaResolutionResolver.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import kafka.automq.table.deserializer.proto.schema.MessageIndexes;
2424

2525
import org.apache.kafka.common.errors.SerializationException;
26-
import org.apache.kafka.common.record.Record;
2726

2827
import com.automq.stream.utils.Time;
2928

@@ -83,7 +82,7 @@ public SchemaResolution resolve(String topic, ByteBuffer payload) {
8382
}
8483

8584
@Override
86-
public int getSchemaId(String topic, Record record) {
85+
public int getSchemaId(String topic, ByteBuffer payload) {
8786
LatestSchemaResolutionResolver.CachedSchemaInfo cachedInfo = getCachedSchemaInfo(subject);
8887
return cachedInfo.schemaId;
8988
}

core/src/main/java/kafka/automq/table/process/ConversionResult.java

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,63 +19,38 @@
1919

2020
package kafka.automq.table.process;
2121

22-
import org.apache.kafka.common.record.Record;
23-
22+
import org.apache.avro.Schema;
2423
import org.apache.avro.generic.GenericRecord;
2524

2625
import java.util.Objects;
2726

28-
/**
29-
* Result of format conversion operations.
30-
* Contains either a converted GenericRecord or error information.
31-
*/
27+
3228
public final class ConversionResult {
3329

34-
private final Record kafkaRecord;
35-
private final GenericRecord valueRecord;
30+
private final Object value;
31+
private final Schema schema;
3632
private final String schemaIdentity;
3733

38-
/**
39-
* Creates a successful conversion result.
40-
*
41-
* @param kafkaRecord the original Kafka record
42-
* @param value the successfully converted Avro GenericRecord, must not be null
43-
* @param schemaIdentity unique identifier for the record's schema
44-
*/
45-
public ConversionResult(Record kafkaRecord, GenericRecord value, String schemaIdentity) {
46-
this.kafkaRecord = Objects.requireNonNull(kafkaRecord);
47-
this.valueRecord = Objects.requireNonNull(value);
48-
this.schemaIdentity = schemaIdentity;
34+
public ConversionResult(Object value, Schema schema, String schemaIdentity) {
35+
this.value = value;
36+
this.schema = Objects.requireNonNull(schema, "schema cannot be null");
37+
this.schemaIdentity = Objects.requireNonNull(schemaIdentity, "schemaIdentity cannot be null");
4938
}
5039

40+
public ConversionResult(GenericRecord record, String schemaIdentity) {
41+
this.value = Objects.requireNonNull(record, "record cannot be null");
42+
this.schema = record.getSchema();
43+
this.schemaIdentity = Objects.requireNonNull(schemaIdentity, "schemaIdentity cannot be null");
44+
}
5145

52-
public Record getKafkaRecord() {
53-
return kafkaRecord;
46+
public Object getValue() {
47+
return value;
5448
}
5549

56-
/**
57-
* Returns the converted Avro GenericRecord.
58-
*
59-
* <p>This record represents the input data in standardized Avro format,
60-
* ready for processing by the Transform chain. The record's schema contains
61-
* all necessary type information for subsequent operations.</p>
62-
*
63-
* @return the converted GenericRecord, or null if conversion failed
64-
*/
65-
public GenericRecord getValueRecord() {
66-
return valueRecord;
50+
public Schema getSchema() {
51+
return schema;
6752
}
6853

69-
/**
70-
* Returns the schema identity for the converted record.
71-
*
72-
* <p>The schema identity is a unique identifier for the record's schema,
73-
* which can be used for schema evolution tracking, caching, and optimization
74-
* purposes. The format and meaning of this identifier depends on the specific
75-
* converter implementation.</p>
76-
*
77-
* @return the schema identity string, or null if conversion failed
78-
*/
7954
public String getSchemaIdentity() {
8055
return schemaIdentity;
8156
}

core/src/main/java/kafka/automq/table/process/Converter.java

Lines changed: 2 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -20,70 +20,12 @@
2020
package kafka.automq.table.process;
2121

2222
import kafka.automq.table.process.exception.ConverterException;
23-
import kafka.automq.table.process.exception.InvalidDataException;
2423

25-
import org.apache.kafka.common.record.Record;
24+
import java.nio.ByteBuffer;
2625

27-
import org.apache.avro.Schema;
28-
import org.apache.avro.SchemaBuilder;
29-
import org.apache.avro.generic.GenericRecord;
30-
import org.apache.avro.generic.GenericRecordBuilder;
3126

32-
/**
33-
* Interface for a component that converts raw Kafka records into a standardized
34-
* format, typically Avro GenericRecord.
35-
*
36-
* <p>Implementations handle different data formats (e.g., Avro, JSON, Protobuf)
37-
* and may interact with services like a Schema Registry.</p>
38-
*/
3927
public interface Converter {
4028

41-
String VALUE_FIELD_NAME = "value";
42-
String RECORD_NAME = "ValueRecord";
43-
44-
/**
45-
* Converts a raw Kafka record into a structured representation.
46-
*
47-
* <p>This method performs the core format conversion operation. It reads the
48-
* byte[] payload from the Kafka record and converts it to an Avro GenericRecord
49-
* that serves as the standardized internal representation for subsequent processing.</p>
50-
*
51-
* @param topic the name of the Kafka topic from which the record originated,
52-
* used for topic-specific schema resolution and routing
53-
* @param record the Kafka record containing the byte[] payload to convert,
54-
* must not be null
55-
* @return a {@code ConversionResult} containing the successfully converted data
56-
* @throws IllegalArgumentException if topic or record is null
57-
* @throws InvalidDataException if the input record data is invalid (e.g., null value)
58-
* @throws ConverterException if a non-data-related error occurs during conversion (e.g., schema resolution failure)
59-
*/
60-
ConversionResult convert(String topic, Record record) throws ConverterException;
29+
ConversionResult convert(String topic, ByteBuffer buffer) throws ConverterException;
6130

62-
/**
63-
* A convenience wrapper for {@link #buildValueRecord(Object, Schema)}.
64-
* Extracts the schema from the given record.
65-
*
66-
* @param valueRecord the record to wrap.
67-
* @return a new GenericRecord wrapping the value record.
68-
*/
69-
static GenericRecord buildValueRecord(GenericRecord valueRecord) {
70-
return buildValueRecord(valueRecord, valueRecord.getSchema());
71-
}
72-
/**
73-
* A utility to build a standard record that wraps a value.
74-
* The created record will have a single field named "value".
75-
*
76-
* @param value the value to wrap
77-
* @param valueSchema the schema for the value
78-
* @return a new GenericRecord wrapping the value
79-
*/
80-
static GenericRecord buildValueRecord(Object value, Schema valueSchema) {
81-
Schema schema = SchemaBuilder.record(RECORD_NAME)
82-
.fields()
83-
.name(VALUE_FIELD_NAME).type(valueSchema).noDefault()
84-
.endRecord();
85-
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
86-
builder.set(VALUE_FIELD_NAME, value);
87-
return builder.build();
88-
}
8931
}

0 commit comments

Comments
 (0)