Skip to content

Commit c5d0b84

Browse files
authored
feat(table_topic): add table topic support (#2511)
* feat(table_topic): add table topic support Signed-off-by: Robin Han <[email protected]> * feat(table_topic): adapt to jdk11 Signed-off-by: Robin Han <[email protected]> * feat(table_topic): add unit test Signed-off-by: Robin Han <[email protected]> --------- Signed-off-by: Robin Han <[email protected]>
1 parent cb3aa11 commit c5d0b84

File tree

131 files changed

+15651
-5
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+15651
-5
lines changed

automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.automq.stream.s3.operator.ObjectStorage.ObjectInfo;
2525
import com.automq.stream.s3.operator.ObjectStorage.ObjectPath;
2626
import com.automq.stream.s3.operator.ObjectStorage.WriteOptions;
27+
import com.automq.stream.utils.Threads;
2728
import com.fasterxml.jackson.databind.ObjectMapper;
2829
import com.fasterxml.jackson.databind.node.ArrayNode;
2930
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -161,12 +162,16 @@ public void run() {
161162
CompletableFuture.allOf(deleteFutures).join();
162163
}
163164
}
164-
165-
Thread.sleep(Duration.ofMinutes(1).toMillis());
165+
if (Threads.sleep(Duration.ofMinutes(1).toMillis())) {
166+
break;
167+
}
166168
} catch (InterruptedException e) {
167169
break;
168170
} catch (Exception e) {
169171
LOGGER.error("Cleanup s3 metrics failed", e);
172+
if (Threads.sleep(Duration.ofMinutes(1).toMillis())) {
173+
break;
174+
}
170175
}
171176
}
172177
}

build.gradle

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,33 @@ project(':core') {
979979
implementation libs.opentelemetryJmx
980980
implementation libs.awsSdkAuth
981981

982+
// table topic start
983+
implementation ("org.apache.avro:avro:${versions.avro}")
984+
implementation ("org.apache.avro:avro-protobuf:${versions.avro}")
985+
implementation('com.google.protobuf:protobuf-java:3.25.5')
986+
implementation ("org.apache.iceberg:iceberg-core:${versions.iceberg}")
987+
implementation ("org.apache.iceberg:iceberg-api:${versions.iceberg}")
988+
implementation ("org.apache.iceberg:iceberg-data:${versions.iceberg}")
989+
implementation ("org.apache.iceberg:iceberg-parquet:${versions.iceberg}")
990+
implementation ("org.apache.iceberg:iceberg-common:${versions.iceberg}")
991+
implementation ("org.apache.iceberg:iceberg-aws:${versions.iceberg}")
992+
implementation ("software.amazon.awssdk:glue:${versions.awsSdk}")
993+
implementation ("software.amazon.awssdk:s3tables:${versions.awsSdk}")
994+
implementation 'software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.0'
995+
implementation 'org.apache.hadoop:hadoop-common:3.4.1'
996+
implementation (libs.kafkaAvroSerializer) {
997+
exclude group: 'org.apache.kafka', module: 'kafka-clients'
998+
}
999+
1000+
// > Protobuf ext start
1001+
// Wire Runtime for schema handling
1002+
implementation ("com.squareup.wire:wire-schema:${versions.wire}")
1003+
implementation ("com.squareup.wire:wire-runtime:${versions.wire}")
1004+
implementation 'com.google.api.grpc:proto-google-common-protos:2.52.0'
1005+
// > Protobuf ext end
1006+
1007+
// table topic end
1008+
9821009
implementation(libs.oshi) {
9831010
exclude group: 'org.slf4j', module: '*'
9841011
}
@@ -1255,6 +1282,7 @@ project(':core') {
12551282
}
12561283
}
12571284

1285+
12581286
project(':metadata') {
12591287
base {
12601288
archivesName = "kafka-metadata"
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Copyright 2025, AutoMQ HK Limited.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package kafka.automq.table;
21+
22+
import kafka.server.KafkaConfig;
23+
24+
import com.automq.stream.s3.operator.AwsObjectStorage;
25+
import com.automq.stream.s3.operator.BucketURI;
26+
import com.automq.stream.utils.IdURI;
27+
28+
import org.apache.commons.lang3.StringUtils;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.security.UserGroupInformation;
31+
import org.apache.iceberg.CatalogUtil;
32+
import org.apache.iceberg.catalog.Catalog;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.io.File;
37+
import java.nio.charset.StandardCharsets;
38+
import java.nio.file.Files;
39+
import java.nio.file.Path;
40+
import java.nio.file.Paths;
41+
import java.security.PrivilegedAction;
42+
import java.util.Base64;
43+
import java.util.HashMap;
44+
import java.util.Map;
45+
import java.util.Optional;
46+
import java.util.function.Supplier;
47+
48+
import static java.nio.file.StandardOpenOption.CREATE;
49+
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
50+
51+
public class CatalogFactory {
52+
private static final Logger LOGGER = LoggerFactory.getLogger(CatalogFactory.class);
53+
private static final String CATALOG_TYPE_CONFIG = "type";
54+
55+
public Catalog newCatalog(KafkaConfig config) {
56+
return new Builder(config).build();
57+
}
58+
59+
static class Builder {
60+
final KafkaConfig config;
61+
62+
String catalogImpl;
63+
BucketURI bucketURI;
64+
final Map<String, Object> catalogConfigs;
65+
final Map<String, Object> hadoopConfigs;
66+
final Map<String, String> options = new HashMap<>();
67+
final Configuration hadoopConf = new Configuration();
68+
UserGroupInformation ugi = null;
69+
Catalog catalog = null;
70+
71+
Builder(KafkaConfig config) {
72+
this.config = config;
73+
catalogConfigs = config.originalsWithPrefix("automq.table.topic.catalog.");
74+
hadoopConfigs = config.originalsWithPrefix("automq.table.topic.hadoop.");
75+
String catalogType = Optional.ofNullable(catalogConfigs.get(CATALOG_TYPE_CONFIG)).map(Object::toString).orElse(null);
76+
if (StringUtils.isBlank(catalogType)) {
77+
return;
78+
}
79+
bucketURI = config.automq().dataBuckets().get(0);
80+
CredentialProviderHolder.setup(bucketURI);
81+
options.put("ref", "main");
82+
options.put("client.region", bucketURI.region());
83+
options.put("client.credentials-provider", "kafka.automq.table.CredentialProviderHolder");
84+
switch (catalogType) {
85+
case "glue":
86+
withGlue();
87+
break;
88+
case "nessie":
89+
withNessie();
90+
break;
91+
case "tablebucket":
92+
withTableBucket();
93+
break;
94+
case "hive":
95+
withHive();
96+
break;
97+
case "rest":
98+
withRest();
99+
break;
100+
default:
101+
throw new IllegalArgumentException("Unsupported catalog type: " + catalogType);
102+
}
103+
catalogConfigs.forEach((k, v) -> options.put(k, v.toString()));
104+
hadoopConfigs.forEach((k, v) -> hadoopConf.set(k, v.toString()));
105+
options.remove(CATALOG_TYPE_CONFIG);
106+
LOGGER.info("[TABLE_MANAGER_START],catalog={},options={},hadoopConfig={}", catalogType, options, hadoopConf);
107+
this.catalog = runAs(() -> CatalogUtil.loadCatalog(catalogImpl, catalogType, options, hadoopConf));
108+
}
109+
110+
public Catalog build() {
111+
return catalog;
112+
}
113+
114+
private void withGlue() {
115+
catalogImpl = "org.apache.iceberg.aws.glue.GlueCatalog";
116+
if (StringUtils.isNotBlank(bucketURI.endpoint())) {
117+
options.put("glue.endpoint", bucketURI.endpoint().replaceFirst("s3", "glue"));
118+
}
119+
putDataBucketAsWarehouse(false);
120+
}
121+
122+
private void withNessie() {
123+
// nessie config extension e.g.
124+
// automq.table.topic.catalog.uri=http://localhost:19120/api/v2
125+
catalogImpl = "org.apache.iceberg.nessie.NessieCatalog";
126+
putDataBucketAsWarehouse(false);
127+
}
128+
129+
private void withTableBucket() {
130+
// table bucket config extension e.g.
131+
// automq.table.topic.catalog.warehouse=table bucket arn
132+
catalogImpl = "software.amazon.s3tables.iceberg.S3TablesCatalog";
133+
}
134+
135+
private void withHive() {
136+
// hive config extension e.g.
137+
// automq.table.topic.catalog.uri=thrift://xxx:9083
138+
// kerberos authentication
139+
// - automq.table.topic.catalog.auth=kerberos://?principal=base64(clientPrincipal)&keytab=base64(keytabFile)&krb5conf=base64(krb5confFile)
140+
// - automq.table.topic.hadoop.metastore.kerberos.principal=serverPrincipal
141+
142+
// simple authentication
143+
// - automq.table.topic.catalog.auth=simple://?username=xxx
144+
catalogImpl = "org.apache.iceberg.hive.HiveCatalog";
145+
putDataBucketAsWarehouse(true);
146+
147+
IdURI uri = IdURI.parse("0@" + catalogConfigs.getOrDefault("auth", "none://?"));
148+
try {
149+
switch (uri.protocol()) {
150+
case "kerberos": {
151+
System.setProperty("sun.security.krb5.debug", "true");
152+
String configBasePath = config.metadataLogDir();
153+
System.setProperty(
154+
"java.security.krb5.conf",
155+
base64Config2file(uri.extensionString("krb5conf"), configBasePath, "krb5.conf")
156+
);
157+
Configuration configuration = new Configuration();
158+
configuration.set("hadoop.security.authentication", "Kerberos");
159+
UserGroupInformation.setConfiguration(configuration);
160+
UserGroupInformation.loginUserFromKeytab(
161+
decodeBase64(uri.extensionString("principal")),
162+
base64Config2file(uri.extensionString("keytab"), configBasePath, "keytab")
163+
);
164+
ugi = UserGroupInformation.getCurrentUser();
165+
hadoopConf.set("metastore.sasl.enabled", "true");
166+
break;
167+
}
168+
case "simple": {
169+
ugi = UserGroupInformation.createRemoteUser(uri.extensionString("username"));
170+
UserGroupInformation.setLoginUser(ugi);
171+
hadoopConf.set("metastore.sasl.enabled", "true");
172+
break;
173+
}
174+
default: {
175+
}
176+
}
177+
} catch (Throwable e) {
178+
throw new RuntimeException(e);
179+
}
180+
}
181+
182+
private void withRest() {
183+
// rest config extension e.g.
184+
// automq.table.topic.catalog.uri=http://127.0.0.1:9001/iceberg
185+
// If a token is set, HTTP requests use the value as a bearer token in the HTTP Authorization header.
186+
// If credential is used, then the key and secret are used to fetch a token using the OAuth2 client credentials flow.
187+
// The resulting token is used as the bearer token for subsequent requests.
188+
// config ref. org.apache.iceberg.rest.RESTSessionCatalog#initialize
189+
// automq.table.topic.catalog.oauth2-server-uri=
190+
// automq.table.topic.catalog.credential=
191+
// automq.table.topic.catalog.token=
192+
// automq.table.topic.catalog.scope=
193+
catalogImpl = "org.apache.iceberg.rest.RESTCatalog";
194+
putDataBucketAsWarehouse(false);
195+
}
196+
197+
private Catalog runAs(Supplier<Catalog> func) {
198+
if (ugi != null) {
199+
return ugi.doAs((PrivilegedAction<Catalog>) func::get);
200+
} else {
201+
return func.get();
202+
}
203+
}
204+
205+
private void putDataBucketAsWarehouse(boolean s3a) {
206+
if (bucketURI.endpoint() != null) {
207+
options.put("s3.endpoint", bucketURI.endpoint());
208+
}
209+
if (bucketURI.extensionBool(AwsObjectStorage.PATH_STYLE_KEY, false)) {
210+
options.put("s3.path-style-access", "true");
211+
}
212+
options.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
213+
options.put("warehouse", String.format((s3a ? "s3a" : "s3") + "://%s/iceberg", bucketURI.bucket()));
214+
}
215+
216+
}
217+
218+
/**
219+
* Decode base64 str and save it to file
220+
*
221+
* @return the file path
222+
*/
223+
private static String base64Config2file(String base64, String configPath, String configName) {
224+
byte[] bytes = Base64.getDecoder().decode(base64);
225+
try {
226+
Path dir = Paths.get(configPath);
227+
if (!Files.exists(dir)) {
228+
Files.createDirectories(dir);
229+
}
230+
Path filePath = Paths.get(configPath + File.separator + configName);
231+
Files.write(filePath, bytes, CREATE, TRUNCATE_EXISTING);
232+
return filePath.toAbsolutePath().toString();
233+
} catch (Throwable e) {
234+
throw new RuntimeException(e);
235+
}
236+
}
237+
238+
private static String decodeBase64(String base64) {
239+
return new String(Base64.getDecoder().decode(base64), StandardCharsets.ISO_8859_1);
240+
}
241+
}

0 commit comments

Comments
 (0)