Skip to content

Commit 046fd8f

Browse files
committed
feat(config): update topic schema type configuration and enhance documentation for converters and transforms
1 parent ffcbb51 commit 046fd8f

File tree

7 files changed

+38
-32
lines changed

7 files changed

+38
-32
lines changed

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -266,25 +266,40 @@ public class TopicConfig {
266266
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
267267

268268
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
269-
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless (deprecated), schema (deprecated)";
269+
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "[DEPRECATED] The table topic schema type configuration. " +
270+
"This configuration is deprecated and will be removed in a future release. " +
271+
"Please use the new separate converter and transform configurations instead. " +
272+
"Supported values: 'schemaless' (maps to convert.value.type=raw, transform.value.type=none), " +
273+
"'schema' (maps to convert.value.type=by_schema_id, transform.value.type=flatten).";
270274

271275
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_CONFIG = "automq.table.topic.convert.value.type";
272-
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_DOC = "The convert value type for table topic, support: raw, by_schema_id, by_subject_name.";
276+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_DOC = "How to parse Kafka record values. " +
277+
"Supported: 'raw', 'string', 'by_schema_id', 'by_latest_schema'. " +
278+
"Schema Registry URL required for 'by_schema_id' and 'by_latest_schema'.";
273279
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_CONFIG = "automq.table.topic.convert.key.type";
274-
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_DOC = "The convert key type for table topic, support: raw, by_schema_id, by_subject_name.";
280+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_DOC = "How to parse Kafka record keys. " +
281+
"Supported: 'raw', 'string', 'by_schema_id', 'by_latest_schema'. " +
282+
"Schema Registry URL required for 'by_schema_id' and 'by_latest_schema'.";
275283

276284
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_SUBJECT_CONFIG = "automq.table.topic.convert.value.subject";
277-
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_SUBJECT_DOC = "The subject name for schema registry, default is topic-value.";
285+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_SUBJECT_DOC = "The Schema Registry subject name for value schemas. " +
286+
"Defaults to '{topic-name}-value' if not specified.";
278287
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_MESSAGE_FULL_NAME_CONFIG = "automq.table.topic.convert.value.message.full.name";
279-
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_MESSAGE_FULL_NAME_DOC = "The message full name for schema registry, default is the first message in schema.";
288+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_MESSAGE_FULL_NAME_DOC = "The fully qualified message name for Protobuf schemas. " +
289+
"Used when schema contains multiple message types. Defaults to first message type.";
280290

281291
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_CONFIG = "automq.table.topic.convert.key.subject";
282-
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_DOC = "The subject name for schema registry, default is topic-key.";
292+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_SUBJECT_DOC = "The Schema Registry subject name for key schemas. " +
293+
"Defaults to '{topic-name}-key' if not specified.";
283294
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_CONFIG = "automq.table.topic.convert.key.message.full.name";
284-
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_DOC = "The message full name for schema registry, default is the first message in schema.";
295+
public static final String AUTOMQ_TABLE_TOPIC_CONVERT_KEY_MESSAGE_FULL_NAME_DOC = "The fully qualified message name for Protobuf key schemas. " +
296+
"Used when schema contains multiple message types. Defaults to first message type.";
285297

286298
public static final String AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_CONFIG = "automq.table.topic.transform.value.type";
287-
public static final String AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_DOC = "transformation to apply to the record value, support: none, flatten, flatten_debezium.";
299+
public static final String AUTOMQ_TABLE_TOPIC_TRANSFORM_VALUE_TYPE_DOC = "Transformation to apply to the record value after conversion. " +
300+
"Supported: 'none', 'flatten' (extract fields from structured records), " +
301+
"'flatten_debezium' (process Debezium CDC events). " +
302+
"Note: 'flatten_debezium' requires schema-based conversion.";
288303

289304
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
290305
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."

core/src/main/java/kafka/automq/table/process/TransformContext.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,14 @@ public TransformContext(Record kafkaRecord, String topicName, int partition) {
3333
this.partition = partition;
3434
}
3535

36-
/**
37-
* 获取原始的 Kafka 记录(如果有)
38-
*/
3936
public Record getKafkaRecord() {
4037
return kafkaRecord;
4138
}
42-
43-
/**
44-
* 获取主题名称
45-
*/
39+
4640
public String getTopicName() {
4741
return topicName;
4842
}
4943

50-
/**
51-
* 获取分区号
52-
*/
5344
public int getPartition() {
5445
return partition;
5546
}

core/src/main/java/kafka/automq/table/process/convert/ConverterFactory.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@ public ConverterFactory(String registryUrl) {
7878
}
7979

8080
public ConverterFactory(String registryUrl, SchemaRegistryClient client) {
81-
if (client != null && (registryUrl == null || registryUrl.trim().isEmpty())) {
82-
throw new IllegalArgumentException("Registry URL cannot be null or empty when client is provided");
83-
}
8481
this.schemaRegistryUrl = registryUrl;
8582
this.client = client;
8683
}

core/src/test/java/kafka/automq/table/process/DefaultRecordProcessorTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,21 @@
2020

2121
package kafka.automq.table.process;
2222

23-
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
24-
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
25-
import io.confluent.kafka.serializers.KafkaAvroSerializer;
2623
import kafka.automq.table.process.convert.AvroRegistryConverter;
2724
import kafka.automq.table.process.convert.RawConverter;
2825
import kafka.automq.table.process.convert.StringConverter;
2926
import kafka.automq.table.process.exception.ConverterException;
3027
import kafka.automq.table.process.transform.FlattenTransform;
31-
import org.apache.avro.Schema;
32-
import org.apache.avro.SchemaBuilder;
33-
import org.apache.avro.generic.GenericData;
34-
import org.apache.avro.generic.GenericRecord;
35-
import org.apache.avro.generic.GenericRecordBuilder;
28+
3629
import org.apache.kafka.common.header.Header;
3730
import org.apache.kafka.common.header.internals.RecordHeader;
3831
import org.apache.kafka.common.record.Record;
3932
import org.apache.kafka.common.record.TimestampType;
33+
34+
import org.apache.avro.Schema;
35+
import org.apache.avro.SchemaBuilder;
36+
import org.apache.avro.generic.GenericRecord;
37+
import org.apache.avro.generic.GenericRecordBuilder;
4038
import org.junit.jupiter.api.BeforeEach;
4139
import org.junit.jupiter.api.Tag;
4240
import org.junit.jupiter.api.Test;
@@ -45,6 +43,10 @@
4543
import java.util.List;
4644
import java.util.Map;
4745

46+
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
47+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
48+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
49+
4850
import static org.junit.jupiter.api.Assertions.assertEquals;
4951
import static org.junit.jupiter.api.Assertions.assertFalse;
5052
import static org.junit.jupiter.api.Assertions.assertNotEquals;

core/src/test/java/kafka/automq/table/process/RecordProcessorFactoryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ void testBySchemaIdWithAvro() throws Exception {
187187
assertTrue(finalRecord.hasField("_kafka_metadata"));
188188

189189
assertEquals("test-key", finalRecord.get("_kafka_key"));
190-
assertEquals("test123", (finalRecord.get("_kafka_value")));
190+
assertEquals("test123", finalRecord.get("_kafka_value"));
191191

192192
GenericRecord metadataRecord = (GenericRecord) finalRecord.get("_kafka_metadata");
193193
assertEquals(TEST_PARTITION, metadataRecord.get("partition"));
@@ -634,7 +634,7 @@ void testKeyConversionAsString() {
634634

635635
// Check value remains as ByteBuffer
636636
assertTrue(finalRecord.hasField("_kafka_value"));
637-
assertEquals(ByteBuffer.wrap(value), ByteBuffer.wrap((byte[])finalRecord.get("_kafka_value")));
637+
assertEquals(ByteBuffer.wrap(value), ByteBuffer.wrap((byte[]) finalRecord.get("_kafka_value")));
638638
}
639639

640640
// --- Test Group 6: Deprecated Configs ---

server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static java.util.Arrays.asList;
2727

2828
public enum TableTopicSchemaType {
29+
NONE("none"),
2930
SCHEMALESS("schemaless"),
3031
SCHEMA("schema");
3132

storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ public Optional<String> serverConfigName(String configName) {
346346
.define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC)
347347
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
348348
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
349-
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
349+
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.NONE.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
350350
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_CONFIG, STRING, RAW.name, in(TableTopicConvertType.names().toArray(new String[0])), MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_TYPE_DOC)
351351
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_CONFIG, STRING, TableTopicConvertType.STRING.name, in(TableTopicConvertType.names().toArray(new String[0])), MEDIUM, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_KEY_TYPE_DOC)
352352
.define(TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_SUBJECT_CONFIG, STRING, null, null, LOW, TopicConfig.AUTOMQ_TABLE_TOPIC_CONVERT_VALUE_SUBJECT_DOC)

0 commit comments

Comments
 (0)