Skip to content

Commit fa3934d

Browse files
authored
[feature](paimon)support paimon with dlf (#41247)
## Proposed changes We now support reading the paimon table on dlf. We can create a catalog using dlf for paimon in the following way: ``` CREATE CATALOG `dlf_paimon` PROPERTIES ( "type" = "paimon", "paimon.catalog.type" = "dlf", "warehouse" = "oss://xx/yy/", "dlf.proxy.mode" = "DLF_ONLY", "dlf.uid" = "xxxxx", "dlf.region" = "cn-beijing", "dlf.access_key" = "ak", "dlf.secret_key" = "sk" -- "dlf.endpoint" = "dlf.cn-beijing.aliyuncs.com", -- optional -- "dlf.catalog.id" = "xxxx", -- optional ); ```
1 parent 401be16 commit fa3934d

File tree

9 files changed

+151
-7
lines changed

9 files changed

+151
-7
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.paimon;
19+
20+
import org.apache.doris.datasource.property.constants.PaimonProperties;
21+
22+
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
25+
26+
import java.util.Map;
27+
28+
public class PaimonDLFExternalCatalog extends PaimonExternalCatalog {
29+
private static final Logger LOG = LogManager.getLogger(PaimonDLFExternalCatalog.class);
30+
31+
public PaimonDLFExternalCatalog(long catalogId, String name, String resource,
32+
Map<String, String> props, String comment) {
33+
super(catalogId, name, resource, props, comment);
34+
}
35+
36+
@Override
37+
protected void initLocalObjectsImpl() {
38+
super.initLocalObjectsImpl();
39+
catalogType = PAIMON_DLF;
40+
catalog = createCatalog();
41+
}
42+
43+
@Override
44+
protected void setPaimonCatalogOptions(Map<String, String> properties, Map<String, String> options) {
45+
options.put(PaimonProperties.PAIMON_CATALOG_TYPE, PaimonProperties.PAIMON_HMS_CATALOG);
46+
options.put(PaimonProperties.PAIMON_METASTORE_CLIENT, ProxyMetaStoreClient.class.getName());
47+
options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
48+
properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
49+
options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
50+
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
51+
options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
52+
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
53+
}
54+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
4949
public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
5050
public static final String PAIMON_FILESYSTEM = "filesystem";
5151
public static final String PAIMON_HMS = "hms";
52+
public static final String PAIMON_DLF = "dlf";
5253
protected String catalogType;
5354
protected Catalog catalog;
5455
protected AuthenticationConfig authConf;

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalogFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.common.DdlException;
2121
import org.apache.doris.datasource.ExternalCatalog;
22+
import org.apache.doris.datasource.property.constants.HMSProperties;
2223

2324
import org.apache.commons.lang3.StringUtils;
2425

@@ -38,6 +39,9 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String
3839
return new PaimonHMSExternalCatalog(catalogId, name, resource, props, comment);
3940
case PaimonExternalCatalog.PAIMON_FILESYSTEM:
4041
return new PaimonFileExternalCatalog(catalogId, name, resource, props, comment);
42+
case PaimonExternalCatalog.PAIMON_DLF:
43+
props.put(HMSProperties.HIVE_METASTORE_TYPE, HMSProperties.DLF_TYPE);
44+
return new PaimonDLFExternalCatalog(catalogId, name, resource, props, comment);
4145
default:
4246
throw new DdlException("Unknown " + PaimonExternalCatalog.PAIMON_CATALOG_TYPE
4347
+ " value: " + metastoreType);

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
165165
@Override
166166
public TTableDescriptor toThrift() {
167167
List<Column> schema = getFullSchema();
168-
if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType()) || PaimonExternalCatalog.PAIMON_FILESYSTEM
169-
.equals(getPaimonCatalogType())) {
168+
if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType())
169+
|| PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(getPaimonCatalogType())
170+
|| PaimonExternalCatalog.PAIMON_DLF.equals(getPaimonCatalogType())) {
170171
THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
171172
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
172173
getName(), dbName);

fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public static Map<String, String> convertToMetaProperties(Map<String, String> pr
8888
}
8989
metaProperties = convertToGlueProperties(props, credential);
9090
} else if (props.containsKey(DLFProperties.ENDPOINT)
91+
|| props.containsKey(DLFProperties.REGION)
9192
|| props.containsKey(DataLakeConfig.CATALOG_ENDPOINT)) {
9293
metaProperties = convertToDLFProperties(props, DLFProperties.getCredential(props));
9394
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
@@ -444,10 +445,18 @@ private static void getPropertiesFromDLFProps(Map<String, String> props,
444445
if (Strings.isNullOrEmpty(uid)) {
445446
throw new IllegalArgumentException("Required dlf property: " + DLFProperties.UID);
446447
}
447-
String endpoint = props.get(DLFProperties.ENDPOINT);
448-
props.put(DataLakeConfig.CATALOG_ENDPOINT, endpoint);
449-
props.put(DataLakeConfig.CATALOG_REGION_ID, props.getOrDefault(DLFProperties.REGION,
450-
S3Properties.getRegionOfEndpoint(endpoint)));
448+
449+
// region
450+
String region = props.get(DLFProperties.REGION);
451+
if (Strings.isNullOrEmpty(region)) {
452+
throw new IllegalArgumentException("Required dlf property: " + DLFProperties.REGION);
453+
}
454+
props.put(DataLakeConfig.CATALOG_REGION_ID, region);
455+
456+
// endpoint
457+
props.put(DataLakeConfig.CATALOG_ENDPOINT,
458+
props.getOrDefault(DLFProperties.ENDPOINT, getDlfEndpointByRegion(region)));
459+
451460
props.put(DataLakeConfig.CATALOG_PROXY_MODE, props.getOrDefault(DLFProperties.PROXY_MODE, "DLF_ONLY"));
452461
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_ID, credential.getAccessKey());
453462
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET, credential.getSecretKey());
@@ -508,6 +517,10 @@ private static String getOssEndpoint(String region, boolean publicAccess) {
508517
return prefix + region + suffix;
509518
}
510519

520+
private static String getDlfEndpointByRegion(String region) {
521+
return "dlf-vpc." + region + ".aliyuncs.com";
522+
}
523+
511524
private static Map<String, String> convertToGlueProperties(Map<String, String> props, CloudCredential credential) {
512525
// convert doris glue property to glue properties, s3 client property and BE property
513526
String metastoreType = props.get(HMSProperties.HIVE_METASTORE_TYPE);

fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class PaimonProperties {
3838
public static final String PAIMON_OSS_SECRET_KEY = org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
3939
public static final String PAIMON_HMS_CATALOG = "hive";
4040
public static final String PAIMON_FILESYSTEM_CATALOG = "filesystem";
41+
public static final String PAIMON_METASTORE_CLIENT = "metastore.client.class";
4142

4243

4344
public static Map<String, String> convertToS3Properties(Map<String, String> properties,

fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@
158158
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
159159
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
160160
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
161+
import org.apache.doris.datasource.paimon.PaimonDLFExternalCatalog;
161162
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
162163
import org.apache.doris.datasource.paimon.PaimonExternalDatabase;
163164
import org.apache.doris.datasource.paimon.PaimonExternalTable;
@@ -410,7 +411,8 @@ public class GsonUtils {
410411
.registerSubtype(
411412
TrinoConnectorExternalCatalog.class, TrinoConnectorExternalCatalog.class.getSimpleName())
412413
.registerSubtype(LakeSoulExternalCatalog.class, LakeSoulExternalCatalog.class.getSimpleName())
413-
.registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName());
414+
.registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName())
415+
.registerSubtype(PaimonDLFExternalCatalog.class, PaimonDLFExternalCatalog.class.getSimpleName());
414416
if (Config.isNotCloudMode()) {
415417
dsTypeAdapterFactory
416418
.registerSubtype(InternalCatalog.class, InternalCatalog.class.getSimpleName());
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !c1 --
3+
1 a
4+
2 b
5+
6+
-- !c2 --
7+
1 a
8+
2 b
9+
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_paimon_dlf_catalog", "p2,external,paimon,external_remote,external_remote_paimon") {
19+
String enabled = context.config.otherConfigs.get("enablePaimonTest")
20+
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
21+
return
22+
}
23+
24+
try {
25+
String catalog = "test_paimon_dlf_catalog"
26+
String uid = context.config.otherConfigs.get("dlf_uid")
27+
String region = context.config.otherConfigs.get("dlf_region")
28+
String catalog_id = context.config.otherConfigs.get("dlf_catalog_id")
29+
String access_key = context.config.otherConfigs.get("dlf_access_key")
30+
String secret_key = context.config.otherConfigs.get("dlf_secret_key")
31+
32+
33+
sql """drop catalog if exists ${catalog};"""
34+
sql """
35+
create catalog if not exists ${catalog} properties (
36+
"type" = "paimon",
37+
"paimon.catalog.type" = "dlf",
38+
"warehouse" = "oss://selectdb-qa-datalake-test/p2_regression_case",
39+
"dlf.proxy.mode" = "DLF_ONLY",
40+
"dlf.uid" = "${uid}",
41+
"dlf.region" = "${region}",
42+
"dlf.catalog.id" = "${catalog_id}",
43+
"dlf.access_key" = "${access_key}",
44+
"dlf.secret_key" = "${secret_key}"
45+
);
46+
"""
47+
48+
sql """ use ${catalog}.regression_paimon """
49+
50+
sql """set force_jni_scanner=false"""
51+
qt_c1 """ select * from tb_simple order by id """
52+
sql """set force_jni_scanner=true"""
53+
qt_c2 """ select * from tb_simple order by id """
54+
55+
} finally {
56+
sql """set force_jni_scanner=false"""
57+
}
58+
}
59+

0 commit comments

Comments
 (0)