Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0873d76
feat(transformers): Add domain transformer for dataset
siddiquebagwan-gslab Jul 20, 2022
402d8de
resolve merge conflict
siddiquebagwan-gslab Jul 20, 2022
9f7ff27
verify semantics code flow
siddiquebagwan-gslab Jul 21, 2022
3d69c33
unified set_dataset_browse_path
siddiquebagwan-gslab Jul 21, 2022
41c670e
Add unit test cases for schema terms, schema tags, domain and ownership
siddiquebagwan-gslab Jul 26, 2022
bbd19f7
Merge branch 'master' into master+robust-transformers
siddiquebagwan-gslab Jul 27, 2022
fc9bbf8
Add domain registry to validate input domains
siddiquebagwan-gslab Jul 27, 2022
ad239c8
resolve merge conflict
siddiquebagwan-gslab Jul 27, 2022
9e655f8
WIP
siddiquebagwan-gslab Jul 27, 2022
6844fec
lintFix
siddiquebagwan-gslab Jul 27, 2022
593ea0d
resolve merg conflict
siddiquebagwan-gslab Jul 27, 2022
8917698
feat(transformer):Use transform_aspect for all transformer & support …
siddiquebagwan-gslab Jul 27, 2022
2f7278f
udpate transformer readme
siddiquebagwan-gslab Jul 28, 2022
a6c3e6a
Merge branch 'master' into master+robust-transformers
siddiquebagwan-gslab Jul 28, 2022
436081c
Merge branch 'master+robust-transformers' into master+robust-transfor…
siddiquebagwan-gslab Jul 28, 2022
e464965
Merge branch 'master' into master+robust-transformers
siddiquebagwan-gslab Aug 3, 2022
e365b42
rename Semantics to TransformerSemantics
siddiquebagwan-gslab Aug 3, 2022
66fd5c6
lintFix
siddiquebagwan-gslab Aug 3, 2022
28195aa
resolve merge conflict
siddiquebagwan-gslab Aug 3, 2022
a68eb0e
lintFix
siddiquebagwan-gslab Aug 3, 2022
c6c5694
fix mypy lint
siddiquebagwan-gslab Aug 3, 2022
f52f8ff
resolve merge conflict
siddiquebagwan-gslab Aug 3, 2022
5fcd2a1
fix merge conflict
siddiquebagwan-gslab Aug 23, 2022
020b8c5
resolve review comments
siddiquebagwan-gslab Aug 23, 2022
b8c1cc2
lint fix
siddiquebagwan-gslab Aug 23, 2022
5707240
Merge branch 'master' into master+robust-transformers+unified
siddiquebagwan-gslab Aug 23, 2022
da64cf2
[EMPTY]
siddiquebagwan-gslab Aug 23, 2022
69f157e
Merge branch 'master' into master+robust-transformers+unified
hsheth2 Aug 24, 2022
3c7c395
resolve merge
siddiquebagwan-gslab Sep 4, 2022
db12be4
Merge branch 'master+robust-transformers+unified' of github.com:acryl…
siddiquebagwan-gslab Sep 4, 2022
dc858ae
Merge branch 'master+robust-transformers+unified' of github.com:mohds…
siddiquebagwan-gslab Sep 4, 2022
27dd232
lint fix
siddiquebagwan-gslab Sep 5, 2022
e0c7c9f
Merge branch 'master' into master+robust-transformers+unified
siddiquebagwan-gslab Sep 5, 2022
99c24bf
Merge branch 'master+robust-transformers+unified' of github.com:acryl…
siddiquebagwan-gslab Sep 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs-website/genJsonSchema/gen_json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_base() -> Any:
"type": "array",
"items": {
"type": "object",
"description": "Transformer configs see at https://datahubproject.io/docs/metadata-ingestion/transformers",
"description": "Transformer configs see at https://datahubproject.io/docs/metadata-ingestion/docs/transformer",
"properties": {
"type": {"type": "string", "description": "Transformer type"},
"config": {
Expand Down
7 changes: 6 additions & 1 deletion docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ module.exports = {
{
Sinks: list_ids_in_directory("metadata-ingestion/sink_docs"),
},
"metadata-ingestion/transformers",
{
Transformers: [
"metadata-ingestion/docs/transformer/intro",
"metadata-ingestion/docs/transformer/dataset_transformer",
],
},
{
"Advanced Guides": [
{
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ transformers: # an array of transformers applied sequentially
# default sink, no config needed
```

Check out the [transformers guide](./transformers.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!
Check out the [transformers guide](./docs/transformer/intro.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!

## Using as a library (SDK)

Expand All @@ -195,5 +195,5 @@ In some cases, you might want to configure and run a pipeline entirely from with

## Developing

See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./transformers.md).
See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./docs/transformer/intro.md).

2 changes: 1 addition & 1 deletion metadata-ingestion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ext {
}

task checkPythonVersion(type: Exec) {
commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 7)'
commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 6)'
}

task environmentSetup(type: Exec, dependsOn: checkPythonVersion) {
Expand Down
1,203 changes: 1,203 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions metadata-ingestion/docs/transformer/intro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
title: "Introduction"
---

# Transformers

## What’s a transformer?

Oftentimes we want to modify metadata before it reaches the ingestion sink – for instance, we might want to add custom tags, ownership, properties, or patch some fields. A transformer allows us to do exactly these things.

Moreover, a transformer allows one to have fine-grained control over the metadata that’s ingested without having to modify the ingestion framework's code yourself. Instead, you can write your own module that can transform metadata events however you like. To include a transformer into a recipe, all that's needed is the name of the transformer as well as any configuration that the transformer needs.

## Provided transformers

Aside from the option of writing your own transformer (see below), we provide some simple transformers for the use cases of adding: tags, glossary terms, properties and ownership information.

DataHub provided transformers for dataset are:
- [Simple Add Dataset ownership](./dataset_transformer.md#simple-add-dataset-ownership)
- [Pattern Add Dataset ownership](./dataset_transformer.md#pattern-add-dataset-ownership)
- [Simple Remove Dataset ownership](./dataset_transformer.md#simple-remove-dataset-ownership)
- [Mark Dataset Status](./dataset_transformer.md#mark-dataset-status)
- [Simple Add Dataset globalTags](./dataset_transformer.md#simple-add-dataset-globaltags)
- [Pattern Add Dataset globalTags](./dataset_transformer.md#pattern-add-dataset-globaltags)
- [Add Dataset globalTags](./dataset_transformer.md#add-dataset-globaltags)
- [Set Dataset browsePath](./dataset_transformer.md#set-dataset-browsepath)
- [Simple Add Dataset glossaryTerms](./dataset_transformer.md#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](./dataset_transformer.md#pattern-add-dataset-glossaryterms)
- [Pattern Add Dataset Schema Field glossaryTerms](./dataset_transformer.md#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](./dataset_transformer.md#pattern-add-dataset-schema-field-globaltags)
- [Simple Add Dataset datasetProperties](./dataset_transformer.md#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](./dataset_transformer.md#add-dataset-datasetproperties)
- [Simple Add Dataset domains](./dataset_transformer.md#simple-add-dataset-domains)
- [Pattern Add Dataset domains](./dataset_transformer.md#pattern-add-dataset-domains)
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import json
from typing import List, Optional

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, TransformerSemantics
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.add_dataset_ownership import Semantics
from datahub.ingestion.transformer.base_transformer import (
BaseTransformer,
SingleAspectTransformer,
Expand All @@ -18,7 +17,7 @@

class AddCustomOwnershipConfig(ConfigModel):
owners_json: str
semantics: Semantics = Semantics.OVERWRITE
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE


class AddCustomOwnership(BaseTransformer, SingleAspectTransformer):
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TransformerSemantics(Enum):

class TransformerSemanticsConfigModel(ConfigModel):
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE
replace_existing: bool = False

@validator("semantics", pre=True)
def ensure_semantics_is_upper_case(cls, v: str) -> str:
Expand Down
26 changes: 26 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.serialization_helper import post_json_transform
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetPropertiesClass,
DatasetUsageStatisticsClass,
DomainPropertiesClass,
DomainsClass,
GlobalTagsClass,
GlossaryTermsClass,
OwnershipClass,
SchemaMetadataClass,
TelemetryClientIdClass,
)
from datahub.utilities.urns.urn import Urn
Expand Down Expand Up @@ -185,13 +188,29 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
aspect_type=OwnershipClass,
)

def get_schema_metadata(self, entity_urn: str) -> Optional[SchemaMetadataClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="schemaMetadata",
aspect_type=SchemaMetadataClass,
)

def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="domainProperties",
aspect_type=DomainPropertiesClass,
)

def get_dataset_properties(
self, entity_urn: str
) -> Optional[DatasetPropertiesClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="datasetProperties",
aspect_type=DatasetPropertiesClass,
)

def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
Expand All @@ -213,6 +232,13 @@ def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
aspect_type=DomainsClass,
)

def get_browse_path(self, entity_urn: str) -> Optional[BrowsePathsClass]:
return self.get_aspect_v2(
entity_urn=entity_urn,
aspect="browsePaths",
aspect_type=BrowsePathsClass,
)

def get_usage_aspects_from_urn(
self, entity_urn: str, start_timestamp: int, end_timestamp: int
) -> Optional[List[DatasetUsageStatisticsClass]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from typing import List
from typing import List, Optional, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import (
DatasetBrowsePathsTransformer,
)
from datahub.metadata.schema_classes import BrowsePathsClass, MetadataChangeEventClass
from datahub.metadata.schema_classes import BrowsePathsClass


class AddDatasetBrowsePathConfig(ConfigModel):
class AddDatasetBrowsePathConfig(TransformerSemanticsConfigModel):
path_templates: List[str]
replace_existing: bool = False


class AddDatasetBrowsePathTransformer(DatasetBrowsePathsTransformer):
Expand All @@ -32,32 +35,59 @@ def create(
config = AddDatasetBrowsePathConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
@staticmethod
def get_browse_paths_to_set(
graph: DataHubGraph, urn: str, mce_browse_paths: Optional[BrowsePathsClass]
) -> Optional[BrowsePathsClass]:
if not mce_browse_paths or not mce_browse_paths.paths:
# nothing to add, no need to consult server
return None

server_browse_paths = graph.get_browse_path(entity_urn=urn)
if server_browse_paths:
# compute patch
# we only include domain who are not present in the server domain list
paths_to_add: List[str] = []
for path in mce_browse_paths.paths:
if path not in server_browse_paths.paths:
paths_to_add.append(path)
# Lets patch
mce_browse_paths.paths = []
mce_browse_paths.paths.extend(server_browse_paths.paths)
mce_browse_paths.paths.extend(paths_to_add)

return mce_browse_paths

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
platform_part, dataset_fqdn, env = (
mce.proposedSnapshot.urn.replace("urn:li:dataset:(", "")
.replace(")", "")
.split(",")
entity_urn.replace("urn:li:dataset:(", "").replace(")", "").split(",")
)

platform = platform_part.replace("urn:li:dataPlatform:", "")
dataset = dataset_fqdn.replace(".", "/")

browse_paths = builder.get_or_add_aspect(
mce,
BrowsePathsClass(
paths=[],
),
)

if self.config.replace_existing:
browse_paths.paths = []
browse_paths = BrowsePathsClass(paths=[])
if aspect is not None and self.config.replace_existing is False:
browse_paths.paths.extend(aspect.paths) # type: ignore[attr-defined]

for template in self.config.path_templates:
browse_path = (
template.replace("PLATFORM", platform)
.replace("DATASET_PARTS", dataset)
.replace("ENV", env.lower())
)

browse_paths.paths.append(browse_path)

return mce
if self.config.semantics == TransformerSemantics.PATCH:
assert self.ctx.graph
patch_browse_paths: Optional[
BrowsePathsClass
] = AddDatasetBrowsePathTransformer.get_browse_paths_to_set(
self.ctx.graph, entity_urn, browse_paths
)
if patch_browse_paths is not None:
browse_paths = patch_browse_paths

return cast(Optional[Aspect], browse_paths)
Loading