Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

import org.apache.doris.datasource.property.constants.PaimonProperties;

import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

public class PaimonDLFExternalCatalog extends PaimonExternalCatalog {
private static final Logger LOG = LogManager.getLogger(PaimonDLFExternalCatalog.class);

public PaimonDLFExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props, String comment) {
super(catalogId, name, resource, props, comment);
}

@Override
protected void initLocalObjectsImpl() {
super.initLocalObjectsImpl();
catalogType = PAIMON_DLF;
catalog = createCatalog();
}

@Override
protected void setPaimonCatalogOptions(Map<String, String> properties, Map<String, String> options) {
options.put(PaimonProperties.PAIMON_CATALOG_TYPE, PaimonProperties.PAIMON_HMS_CATALOG);
options.put(PaimonProperties.PAIMON_METASTORE_CLIENT, ProxyMetaStoreClient.class.getName());
options.put(PaimonProperties.PAIMON_OSS_ENDPOINT,
properties.get(PaimonProperties.PAIMON_OSS_ENDPOINT));
options.put(PaimonProperties.PAIMON_OSS_ACCESS_KEY,
properties.get(PaimonProperties.PAIMON_OSS_ACCESS_KEY));
options.put(PaimonProperties.PAIMON_OSS_SECRET_KEY,
properties.get(PaimonProperties.PAIMON_OSS_SECRET_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public abstract class PaimonExternalCatalog extends ExternalCatalog {
public static final String PAIMON_CATALOG_TYPE = "paimon.catalog.type";
public static final String PAIMON_FILESYSTEM = "filesystem";
public static final String PAIMON_HMS = "hms";
public static final String PAIMON_DLF = "dlf";
protected String catalogType;
protected Catalog catalog;
protected AuthenticationConfig authConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.property.constants.HMSProperties;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -38,6 +39,9 @@ public static ExternalCatalog createCatalog(long catalogId, String name, String
return new PaimonHMSExternalCatalog(catalogId, name, resource, props, comment);
case PaimonExternalCatalog.PAIMON_FILESYSTEM:
return new PaimonFileExternalCatalog(catalogId, name, resource, props, comment);
case PaimonExternalCatalog.PAIMON_DLF:
props.put(HMSProperties.HIVE_METASTORE_TYPE, HMSProperties.DLF_TYPE);
return new PaimonDLFExternalCatalog(catalogId, name, resource, props, comment);
default:
throw new DdlException("Unknown " + PaimonExternalCatalog.PAIMON_CATALOG_TYPE
+ " value: " + metastoreType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) {
@Override
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType()) || PaimonExternalCatalog.PAIMON_FILESYSTEM
.equals(getPaimonCatalogType())) {
if (PaimonExternalCatalog.PAIMON_HMS.equals(getPaimonCatalogType())
|| PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(getPaimonCatalogType())
|| PaimonExternalCatalog.PAIMON_DLF.equals(getPaimonCatalogType())) {
THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
getName(), dbName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public static Map<String, String> convertToMetaProperties(Map<String, String> pr
}
metaProperties = convertToGlueProperties(props, credential);
} else if (props.containsKey(DLFProperties.ENDPOINT)
|| props.containsKey(DLFProperties.REGION)
|| props.containsKey(DataLakeConfig.CATALOG_ENDPOINT)) {
metaProperties = convertToDLFProperties(props, DLFProperties.getCredential(props));
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
Expand Down Expand Up @@ -444,10 +445,18 @@ private static void getPropertiesFromDLFProps(Map<String, String> props,
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " + DLFProperties.UID);
}
String endpoint = props.get(DLFProperties.ENDPOINT);
props.put(DataLakeConfig.CATALOG_ENDPOINT, endpoint);
props.put(DataLakeConfig.CATALOG_REGION_ID, props.getOrDefault(DLFProperties.REGION,
S3Properties.getRegionOfEndpoint(endpoint)));

// region
String region = props.get(DLFProperties.REGION);
if (Strings.isNullOrEmpty(region)) {
throw new IllegalArgumentException("Required dlf property: " + DLFProperties.REGION);
}
props.put(DataLakeConfig.CATALOG_REGION_ID, region);

// endpoint
props.put(DataLakeConfig.CATALOG_ENDPOINT,
props.getOrDefault(DLFProperties.ENDPOINT, getDlfEndpointByRegion(region)));

props.put(DataLakeConfig.CATALOG_PROXY_MODE, props.getOrDefault(DLFProperties.PROXY_MODE, "DLF_ONLY"));
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_ID, credential.getAccessKey());
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET, credential.getSecretKey());
Expand Down Expand Up @@ -508,6 +517,10 @@ private static String getOssEndpoint(String region, boolean publicAccess) {
return prefix + region + suffix;
}

private static String getDlfEndpointByRegion(String region) {
return "dlf-vpc." + region + ".aliyuncs.com";
}

private static Map<String, String> convertToGlueProperties(Map<String, String> props, CloudCredential credential) {
// convert doris glue property to glue properties, s3 client property and BE property
String metastoreType = props.get(HMSProperties.HIVE_METASTORE_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class PaimonProperties {
public static final String PAIMON_OSS_SECRET_KEY = org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
public static final String PAIMON_HMS_CATALOG = "hive";
public static final String PAIMON_FILESYSTEM_CATALOG = "filesystem";
public static final String PAIMON_METASTORE_CLIENT = "metastore.client.class";


public static Map<String, String> convertToS3Properties(Map<String, String> properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.paimon.PaimonDLFExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalDatabase;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
Expand Down Expand Up @@ -408,7 +409,8 @@ public class GsonUtils {
.registerSubtype(
TrinoConnectorExternalCatalog.class, TrinoConnectorExternalCatalog.class.getSimpleName())
.registerSubtype(LakeSoulExternalCatalog.class, LakeSoulExternalCatalog.class.getSimpleName())
.registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName());
.registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonDLFExternalCatalog.class, PaimonDLFExternalCatalog.class.getSimpleName());
if (Config.isNotCloudMode()) {
dsTypeAdapterFactory
.registerSubtype(InternalCatalog.class, InternalCatalog.class.getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !c1 --
1 a
2 b

-- !c2 --
1 a
2 b

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_paimon_dlf_catalog", "p2,external,paimon,external_remote,external_remote_paimon") {
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
return
}

try {
String catalog = "test_paimon_dlf_catalog"
String uid = context.config.otherConfigs.get("dlf_uid")
String region = context.config.otherConfigs.get("dlf_region")
String catalog_id = context.config.otherConfigs.get("dlf_catalog_id")
String access_key = context.config.otherConfigs.get("dlf_access_key")
String secret_key = context.config.otherConfigs.get("dlf_secret_key")


sql """drop catalog if exists ${catalog};"""
sql """
create catalog if not exists ${catalog} properties (
"type" = "paimon",
"paimon.catalog.type" = "dlf",
"warehouse" = "oss://selectdb-qa-datalake-test/p2_regression_case",
"dlf.proxy.mode" = "DLF_ONLY",
"dlf.uid" = "${uid}",
"dlf.region" = "${region}",
"dlf.catalog.id" = "${catalog_id}",
"dlf.access_key" = "${access_key}",
"dlf.secret_key" = "${secret_key}"
);
"""

sql """ use ${catalog}.regression_paimon """

sql """set force_jni_scanner=false"""
qt_c1 """ select * from tb_simple order by id """
sql """set force_jni_scanner=true"""
qt_c2 """ select * from tb_simple order by id """

} finally {
sql """set force_jni_scanner=false"""
}
}

Loading