Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions metadata-ingestion/docs/sources/dbt/dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ meta_mapping:
operation: "add_term"
config:
term: "Finance_test"
terms_list:
match: ".*"
operation: "add_terms"
config:
separator: ","
```
</TabItem>
<TabItem value="json" label="JSON">
Expand Down Expand Up @@ -66,6 +71,11 @@ meta_mapping:
"operation": "add_term",
"config": {"term": "Finance_test"},
},
"terms_list": {
"match": ".*",
"operation": "add_terms",
"config": {"separator": ","},
},
}
```
</TabItem>
Expand All @@ -74,10 +84,11 @@ meta_mapping:
We support the following operations:
1. add_tag - Requires ```tag``` property in config.
2. add_term - Requires ```term``` property in config.
3. add_owner - Requires ```owner_type``` property in config which can be either user or group. Optionally accepts the ```owner_category``` config property which you can set to one of ```['TECHNICAL_OWNER', 'BUSINESS_OWNER', 'DATA_STEWARD', 'DATAOWNER'``` (defaults to `DATAOWNER`).
3. add_terms - Accepts an optional ```separator``` property in config.
4. add_owner - Requires ```owner_type``` property in config which can be either user or group. Optionally accepts the ```owner_category``` config property which you can set to one of ```['TECHNICAL_OWNER', 'BUSINESS_OWNER', 'DATA_STEWARD', 'DATAOWNER'``` (defaults to `DATAOWNER`).

Note:
1. Currently, dbt meta mapping is only supported for meta elements defined at the model level (not supported for columns).
1. The dbt `meta_mapping` config works at the model level, while the `column_meta_mapping` config works at the column level. The `add_owner` operation is not supported at the column level.
2. For string meta properties we support regex matching.

With regex matching, you can also use the matched value to customize how you populate the tag, term or owner fields. Here are a few advanced examples:
Expand Down
187 changes: 109 additions & 78 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
ViewPropertiesClass,
)
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis

logger = logging.getLogger(__name__)
DBT_PLATFORM = "dbt"
Expand Down Expand Up @@ -273,6 +274,10 @@ class DBTConfig(StatefulIngestionConfigBase):
default={},
description="mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings.",
)
column_meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against dbt column meta properties. Refer to the section below on dbt meta automated mappings.",
)
enable_meta_mapping = Field(
default=True,
description="When enabled, applies the mappings that are defined through the meta_mapping directives.",
Expand Down Expand Up @@ -394,6 +399,8 @@ class DBTColumn:
description: str
index: int
data_type: str

meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)


Expand Down Expand Up @@ -437,7 +444,7 @@ class DBTNode:
compiled_sql: Optional[str] = None

def get_db_fqn(self) -> str:
if self.database is not None:
if self.database:
fqn = f"{self.database}.{self.schema}.{self.name}"
else:
fqn = f"{self.schema}.{self.name}"
Expand All @@ -463,22 +470,24 @@ def get_columns(
) -> List[DBTColumn]:
columns = []

catalog_columns = catalog_node["columns"]
manifest_columns = manifest_node.get("columns", {})

raw_columns = catalog_node["columns"]
for key, catalog_column in catalog_columns.items():
manifest_column = manifest_columns.get(key.lower(), {})

for key in raw_columns:
raw_column = raw_columns[key]
meta = manifest_column.get("meta", {})

tags = manifest_columns.get(key.lower(), {}).get("tags", [])
tags = manifest_column.get("tags", [])
tags = [tag_prefix + tag for tag in tags]

dbtCol = DBTColumn(
name=raw_column["name"].lower(),
comment=raw_column.get("comment", ""),
description=manifest_columns.get(key.lower(), {}).get("description", ""),
data_type=raw_column["type"],
index=raw_column["index"],
name=catalog_column["name"].lower(),
comment=catalog_column.get("comment", ""),
description=manifest_column.get("description", ""),
data_type=catalog_column["type"],
index=catalog_column["index"],
meta=meta,
tags=tags,
)
columns.append(dbtCol)
Expand Down Expand Up @@ -550,9 +559,8 @@ def extract_dbt_entities(

tags = manifest_node.get("tags", [])
tags = [tag_prefix + tag for tag in tags]
meta_props = manifest_node.get("meta", {})
if not meta:
meta_props = manifest_node.get("config", {}).get("meta", {})
meta = manifest_node.get("config", {}).get("meta", {})

max_loaded_at_str = sources_by_id.get(key, {}).get("max_loaded_at")
max_loaded_at = None
Expand All @@ -575,20 +583,20 @@ def extract_dbt_entities(
upstream_nodes=upstream_nodes,
materialization=materialization,
catalog_type=catalog_type,
meta=meta_props,
meta=meta,
query_tag=query_tag_props,
tags=tags,
owner=owner,
compiled_sql=manifest_node.get("compiled_sql"),
manifest_raw=manifest_node,
)

# overwrite columns from catalog
# Load columns from catalog, and override some properties from manifest.
if dbtNode.materialization not in [
"ephemeral",
"test",
]: # we don't want columns if platform isn't 'dbt'
logger.debug("Loading schema info")
]:
logger.debug(f"Loading schema info for {dbtNode.dbt_name}")
if catalog_node is not None:
# We already have done the reporting for catalog_node being None above.
dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix)
Expand Down Expand Up @@ -724,66 +732,6 @@ def get_column_type(
return SchemaFieldDataType(type=TypeClass())


def get_schema_metadata(
report: DBTSourceReport, node: DBTNode, platform: str
) -> SchemaMetadata:
canonical_schema: List[SchemaField] = []
for column in node.columns:
description = None

if (
column.comment
and column.description
and column.comment != column.description
):
description = f"{platform} comment: {column.comment}\n\ndbt model description: {column.description}"
elif column.comment:
description = column.comment
elif column.description:
description = column.description

globalTags = None
if column.tags:
globalTags = GlobalTagsClass(
tags=[
TagAssociationClass(mce_builder.make_tag_urn(tag))
for tag in column.tags
]
)

field = SchemaField(
fieldPath=column.name,
nativeDataType=column.data_type,
type=get_column_type(
report, node.dbt_name, column.data_type, node.dbt_adapter
),
description=description,
nullable=False, # TODO: actually autodetect this
recursive=False,
globalTags=globalTags,
)

canonical_schema.append(field)

last_modified = None
if node.max_loaded_at is not None:
actor = mce_builder.make_user_urn("dbt_executor")
last_modified = AuditStamp(
time=int(node.max_loaded_at.timestamp() * 1000),
actor=actor,
)

return SchemaMetadata(
schemaName=node.dbt_name,
platform=mce_builder.make_data_platform_urn(platform),
version=0,
hash="",
platformSchema=MySqlDDL(tableSchema=""),
lastModified=last_modified,
fields=canonical_schema,
)


@dataclass
class AssertionParams:
scope: Union[DatasetAssertionScopeClass, str]
Expand Down Expand Up @@ -1664,10 +1612,93 @@ def _generate_base_aspects(
aspects.append(meta_aspects.get(Constants.ADD_TERM_OPERATION))

# add schema metadata aspect
schema_metadata = get_schema_metadata(self.report, node, mce_platform)
schema_metadata = self.get_schema_metadata(self.report, node, mce_platform)
aspects.append(schema_metadata)
return aspects

def get_schema_metadata(
self, report: DBTSourceReport, node: DBTNode, platform: str
) -> SchemaMetadata:
action_processor = OperationProcessor(
self.config.column_meta_mapping,
self.config.tag_prefix,
"SOURCE_CONTROL",
self.config.strip_user_ids_from_email,
)

canonical_schema: List[SchemaField] = []
for column in node.columns:
description = None

if (
column.comment
and column.description
and column.comment != column.description
):
description = f"{platform} comment: {column.comment}\n\ndbt model description: {column.description}"
elif column.comment:
description = column.comment
elif column.description:
description = column.description

meta_aspects: Dict[str, Any] = {}
if self.config.enable_meta_mapping and column.meta:
meta_aspects = action_processor.process(column.meta)

if meta_aspects.get(Constants.ADD_OWNER_OPERATION):
logger.warning("The add_owner operation is not supported for columns.")

meta_tags: Optional[GlobalTagsClass] = meta_aspects.get(
Constants.ADD_TAG_OPERATION
)
globalTags = None
if meta_tags or column.tags:
# Merge tags from meta mapping and column tags.
globalTags = GlobalTagsClass(
tags=(meta_tags.tags if meta_tags else [])
+ [
TagAssociationClass(mce_builder.make_tag_urn(tag))
for tag in column.tags
]
)

glossaryTerms = None
if meta_aspects.get(Constants.ADD_TERM_OPERATION):
glossaryTerms = meta_aspects.get(Constants.ADD_TERM_OPERATION)

field = SchemaField(
fieldPath=column.name,
nativeDataType=column.data_type,
type=get_column_type(
report, node.dbt_name, column.data_type, node.dbt_adapter
),
description=description,
nullable=False, # TODO: actually autodetect this
recursive=False,
globalTags=globalTags,
glossaryTerms=glossaryTerms,
)

canonical_schema.append(field)

last_modified = None
if node.max_loaded_at is not None:
actor = mce_builder.make_user_urn("dbt_executor")
last_modified = AuditStamp(
time=datetime_to_ts_millis(node.max_loaded_at),
actor=actor,
)

return SchemaMetadata(
schemaName=node.dbt_name,
platform=mce_builder.make_data_platform_urn(platform),
version=0,
hash="",
platformSchema=MySqlDDL(tableSchema=""),
lastModified=last_modified,
fields=canonical_schema,
)

def _aggregate_owners(
self, node: DBTNode, meta_owner_aspects: Any
) -> List[OwnerClass]:
Expand Down Expand Up @@ -1732,7 +1763,7 @@ def _create_subType_wu(
aspect=SubTypesClass(typeNames=subtypes),
)
subtype_wu = MetadataWorkUnit(
id=f"{self.platform}-{subtype_mcp.entityUrn}-{subtype_mcp.aspectName}",
id=f"{subtype_mcp.entityUrn}-{subtype_mcp.aspectName}",
mcp=subtype_mcp,
)
return subtype_wu
Expand Down
26 changes: 22 additions & 4 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import re
from typing import Any, Dict, Match, Optional, Union
from typing import Any, Dict, List, Match, Optional, Union

from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import OwnerType
Expand All @@ -15,6 +15,7 @@
class Constants:
ADD_TAG_OPERATION = "add_tag"
ADD_TERM_OPERATION = "add_term"
ADD_TERMS_OPERATION = "add_terms"
ADD_OWNER_OPERATION = "add_owner"
OPERATION = "operation"
OPERATION_CONFIG = "config"
Expand All @@ -27,6 +28,7 @@ class Constants:
GROUP_OWNER = "group"
OPERAND_DATATYPE_SUPPORTED = [int, bool, str, float]
TAG_PARTITION_KEY = "PARTITION_KEY"
SEPARATOR = "separator"


class OperationProcessor:
Expand Down Expand Up @@ -102,12 +104,20 @@ def process(self, raw_props: Dict[str, Any]) -> Dict[str, Any]:
operation = self.get_operation_value(
operation_key, operation_type, operation_config, maybe_match
)
if operation_type == Constants.ADD_TERMS_OPERATION:
# add_terms operation is a special case where the operation value is a list of terms.
# We want to aggregate these values with the add_term operation.
operation_type = Constants.ADD_TERM_OPERATION

if operation:
if isinstance(operation, str):
if isinstance(operation, (str, list)):
operations_value_set = operations_map.get(
operation_type, set()
)
operations_value_set.add(operation) # type: ignore
if isinstance(operation, list):
operations_value_set.update(operation) # type: ignore
else:
operations_value_set.add(operation) # type: ignore
operations_map[operation_type] = operations_value_set
else:
operations_value_list = operations_map.get(
Expand Down Expand Up @@ -160,7 +170,7 @@ def get_operation_value(
operation_type: str,
operation_config: Dict,
match: Match,
) -> Optional[Union[str, Dict]]:
) -> Optional[Union[str, Dict, List[str]]]:
def _get_best_match(the_match: Match, group_name: str) -> str:
result = the_match.group(0)
try:
Expand Down Expand Up @@ -220,6 +230,14 @@ def _get_best_match(the_match: Match, group_name: str) -> str:
if isinstance(captured_term_id, str):
term = re.sub(match_regexp, captured_term_id, term, 0, re.MULTILINE)
return mce_builder.make_term_urn(term)
elif operation_type == Constants.ADD_TERMS_OPERATION:
separator = operation_config.get(Constants.SEPARATOR, ",")
captured_terms = match.group(0)
return [
mce_builder.make_term_urn(term.strip())
for term in captured_terms.split(separator)
if term.strip()
]
return None

def sanitize_owner_ids(self, owner_id: str) -> str:
Expand Down
4 changes: 1 addition & 3 deletions metadata-ingestion/src/datahub/utilities/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,4 @@ def get_datetime_from_ts_millis_in_utc(ts_millis: int) -> datetime:


def datetime_to_ts_millis(dt: datetime) -> int:
return int(
round(dt.timestamp() * 1000),
)
return int(round(dt.timestamp() * 1000))
Loading