-
Notifications
You must be signed in to change notification settings - Fork 490
feat(process): introduce record processor factory and enrich conversion/transform pipeline #2796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
5211a93
to
80502e9
Compare
… conversion/transform pipeline - Add RecordProcessorFactory to support dynamic creation of processors based on schema and transform configs - Refactor RegistryConverterFactory for improved schema format handling and converter instantiation - Implement SchemaFormat, TableTopicConvertType, and TableTopicTransformType enums for config-driven processing - Enhance Converter interface and conversion records to include key, value, and timestamp fields - Refactor AvroRegistryConverter and ProtobufRegistryConverter to return unified RecordData objects - Add ProtoToAvroConverter for robust Protobuf-to-Avro conversion - Update transform chain: add KafkaMetadataTransform for metadata enrichment, refactor DebeziumUnwrapTransform - Update DefaultRecordProcessor and TransformContext to support partition-aware processing - Improve error handling and code clarity across conversion and transform modules
80502e9
to
0b078da
Compare
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/automq/table/process/convert/RegistrySchemaConverter.java
Show resolved
Hide resolved
core/src/main/java/kafka/automq/table/process/convert/RegistrySchemaConverter.java
Outdated
Show resolved
Hide resolved
List<Schema.Field> originalFields = new ArrayList<>(); | ||
for (Schema.Field field : originalSchema.getFields()) { | ||
// Create a new Schema.Field instance for each original field | ||
originalFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal(), field.order())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need create a new Schema.Field here instead of reusing the old Schema.Field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, the newly created Field must be used; using the existing one will result in failure. (f.position != -1)
for (Field f : fields) {
if (f.position != -1) {
throw new AvroRuntimeException("Field already used: " + f);
}
....
…ters for improved record processing
6301e2f
to
046fd8f
Compare
…mentation for converters and transforms
046fd8f
to
e1e6ebe
Compare
…in ConversionResult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a comprehensive record processing factory and transformation pipeline that replaces the previous simplified schema-based approach with a flexible converter/transform architecture. The changes add support for granular configuration of key/value conversion and transformation types while maintaining backward compatibility with deprecated schema-based configurations.
Key changes:
- Introduced a
RecordProcessorFactory
for dynamic processor creation based on configuration - Added new converter types (RAW, STRING, BY_SCHEMA_ID, BY_LATEST_SCHEMA) and transform types (NONE, FLATTEN, FLATTEN_DEBEZIUM)
- Replaced the monolithic schema type configuration with separate key/value conversion and transformation settings
Reviewed Changes
Copilot reviewed 42 out of 42 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
LogConfig.java | Added new table topic converter and transform configuration options |
TableTopicConvertType.java | Enum defining converter types (raw, string, schema-based) |
TableTopicTransformType.java | Enum defining transformation types (none, flatten, debezium) |
TableTopicSchemaType.java | Added NONE option to existing schema type enum |
RecordProcessorFactoryTest.java | Comprehensive test suite for the new processor factory |
Various converter classes | New converter implementations for different data types |
Various transform classes | Updated and new transform implementations |
Test classes | Updated test classes to work with new architecture |
Comments suppressed due to low confidence (1)
core/src/test/java/kafka/automq/table/process/RecordProcessorFactoryTest.java:1
- [nitpick] Inconsistent case handling for enum names. Other test methods use the enum name directly (e.g.,
TableTopicTransformType.NONE.name
) while this line applies toLowerCase(). Consider using consistent approach throughout the test class.
/*
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
Show resolved
Hide resolved
core/src/main/java/kafka/automq/table/process/convert/ProtobufRegistryConverter.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/automq/table/process/RecordAssembler.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/automq/table/process/DefaultRecordProcessor.java
Outdated
Show resolved
Hide resolved
core/src/main/java/kafka/automq/table/process/transform/SchemalessTransform.java
Outdated
Show resolved
Hide resolved
…ey and value fields
…or key and value fields
4e2c8e3
to
58e78af
Compare
…ions from TopicConfig and LogConfig
…on/transform pipeline (#2796) * feat(process): introduce flexible record processor factory and enrich conversion/transform pipeline - Add RecordProcessorFactory to support dynamic creation of processors based on schema and transform configs - Refactor RegistryConverterFactory for improved schema format handling and converter instantiation - Implement SchemaFormat, TableTopicConvertType, and TableTopicTransformType enums for config-driven processing - Enhance Converter interface and conversion records to include key, value, and timestamp fields - Refactor AvroRegistryConverter and ProtobufRegistryConverter to return unified RecordData objects - Add ProtoToAvroConverter for robust Protobuf-to-Avro conversion - Update transform chain: add KafkaMetadataTransform for metadata enrichment, refactor DebeziumUnwrapTransform - Update DefaultRecordProcessor and TransformContext to support partition-aware processing - Improve error handling and code clarity across conversion and transform modules
…on/transform pipeline (#2796) * feat(process): introduce flexible record processor factory and enrich conversion/transform pipeline - Add RecordProcessorFactory to support dynamic creation of processors based on schema and transform configs - Refactor RegistryConverterFactory for improved schema format handling and converter instantiation - Implement SchemaFormat, TableTopicConvertType, and TableTopicTransformType enums for config-driven processing - Enhance Converter interface and conversion records to include key, value, and timestamp fields - Refactor AvroRegistryConverter and ProtobufRegistryConverter to return unified RecordData objects - Add ProtoToAvroConverter for robust Protobuf-to-Avro conversion - Update transform chain: add KafkaMetadataTransform for metadata enrichment, refactor DebeziumUnwrapTransform - Update DefaultRecordProcessor and TransformContext to support partition-aware processing - Improve error handling and code clarity across conversion and transform modules
…on/transform pipeline (#2796) * feat(process): introduce flexible record processor factory and enrich conversion/transform pipeline - Add RecordProcessorFactory to support dynamic creation of processors based on schema and transform configs - Refactor RegistryConverterFactory for improved schema format handling and converter instantiation - Implement SchemaFormat, TableTopicConvertType, and TableTopicTransformType enums for config-driven processing - Enhance Converter interface and conversion records to include key, value, and timestamp fields - Refactor AvroRegistryConverter and ProtobufRegistryConverter to return unified RecordData objects - Add ProtoToAvroConverter for robust Protobuf-to-Avro conversion - Update transform chain: add KafkaMetadataTransform for metadata enrichment, refactor DebeziumUnwrapTransform - Update DefaultRecordProcessor and TransformContext to support partition-aware processing - Improve error handling and code clarity across conversion and transform modules
Config
(Add)
automq.table.topic.convert.value.type
(String)raw
,string
,by_schema_id
, andby_latest_schema
. Schema Registry URL required forby_schema_id
andby_latest_schema
.(Add)
automq.table.topic.convert.key.type
(String)raw
,string
,by_schema_id
, andby_latest_schema
. Schema Registry URL required forby_schema_id
andby_latest_schema
.(Add)
automq.table.topic.convert.value.subject
(String, Optional){topic-name}-value
if not specified.(Add)
automq.table.topic.convert.value.message.full.name
(String, Optional)(Add)
automq.table.topic.convert.key.subject
(String, Optional){topic-name}-key
if not specified.(Add)
automq.table.topic.convert.key.message.full.name
(String, Optional)(Add)
automq.table.topic.transform.value.type
(String)none
,flatten
, andflatten_debezium
.none
: No transformation applied.flatten
: Extract fields from structured records, promoting nested fields to top level.flatten_debezium
: Process Debezium CDC events, extracting before/after states based on operation type.(Deprecated)
automq.table.topic.schema.type
(String)schema
=convert.value.type=by_schema_id
+transform.value.type=flatten
changelist: