Skip to content

Commit e7fa9b8

Browse files
siddiquebagwansiddiquebagwan-gslabhsheth2
authored andcommitted
feat(transformers): Add semantics & transform_aspect support in transformers (datahub-project#5514)
Co-authored-by: MohdSiddique Bagwan <[email protected]> Co-authored-by: Harshal Sheth <[email protected]>
1 parent bcfbae9 commit e7fa9b8

22 files changed

+2528
-1040
lines changed

docs-website/genJsonSchema/gen_json_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def get_base() -> Any:
159159
"type": "array",
160160
"items": {
161161
"type": "object",
162-
"description": "Transformer configs see at https://datahubproject.io/docs/metadata-ingestion/transformers",
162+
"description": "Transformer configs see at https://datahubproject.io/docs/metadata-ingestion/docs/transformer",
163163
"properties": {
164164
"type": {"type": "string", "description": "Transformer type"},
165165
"config": {

docs-website/sidebars.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,12 @@ module.exports = {
137137
{
138138
Sinks: list_ids_in_directory("metadata-ingestion/sink_docs"),
139139
},
140-
"metadata-ingestion/transformers",
140+
{
141+
Transformers: [
142+
"metadata-ingestion/docs/transformer/intro",
143+
"metadata-ingestion/docs/transformer/dataset_transformer",
144+
],
145+
},
141146
{
142147
"Advanced Guides": [
143148
{

metadata-ingestion/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ transformers: # an array of transformers applied sequentially
183183
# default sink, no config needed
184184
```
185185

186-
Check out the [transformers guide](./transformers.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!
186+
Check out the [transformers guide](./docs/transformer/intro.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!
187187

188188
## Using as a library (SDK)
189189

@@ -195,5 +195,5 @@ In some cases, you might want to configure and run a pipeline entirely from with
195195

196196
## Developing
197197

198-
See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./transformers.md).
198+
See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./docs/transformer/intro.md).
199199

metadata-ingestion/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ ext {
88
}
99

1010
task checkPythonVersion(type: Exec) {
11-
commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 7)'
11+
commandLine python_executable, '-c', 'import sys; assert sys.version_info >= (3, 6)'
1212
}
1313

1414
task environmentSetup(type: Exec, dependsOn: checkPythonVersion) {

metadata-ingestion/docs/transformer/dataset_transformer.md

Lines changed: 1203 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
---
2+
title: "Introduction"
3+
---
4+
5+
# Transformers
6+
7+
## What’s a transformer?
8+
9+
Oftentimes we want to modify metadata before it reaches the ingestion sink – for instance, we might want to add custom tags, ownership, properties, or patch some fields. A transformer allows us to do exactly these things.
10+
11+
Moreover, a transformer allows one to have fine-grained control over the metadata that’s ingested without having to modify the ingestion framework's code yourself. Instead, you can write your own module that can transform metadata events however you like. To include a transformer into a recipe, all that's needed is the name of the transformer as well as any configuration that the transformer needs.
12+
13+
## Provided transformers
14+
15+
Aside from the option of writing your own transformer (see below), we provide some simple transformers for the use cases of adding: tags, glossary terms, properties and ownership information.
16+
17+
DataHub provided transformers for dataset are:
18+
- [Simple Add Dataset ownership](./dataset_transformer.md#simple-add-dataset-ownership)
19+
- [Pattern Add Dataset ownership](./dataset_transformer.md#pattern-add-dataset-ownership)
20+
- [Simple Remove Dataset ownership](./dataset_transformer.md#simple-remove-dataset-ownership)
21+
- [Mark Dataset Status](./dataset_transformer.md#mark-dataset-status)
22+
- [Simple Add Dataset globalTags](./dataset_transformer.md#simple-add-dataset-globaltags)
23+
- [Pattern Add Dataset globalTags](./dataset_transformer.md#pattern-add-dataset-globaltags)
24+
- [Add Dataset globalTags](./dataset_transformer.md#add-dataset-globaltags)
25+
- [Set Dataset browsePath](./dataset_transformer.md#set-dataset-browsepath)
26+
- [Simple Add Dataset glossaryTerms](./dataset_transformer.md#simple-add-dataset-glossaryterms)
27+
- [Pattern Add Dataset glossaryTerms](./dataset_transformer.md#pattern-add-dataset-glossaryterms)
28+
- [Pattern Add Dataset Schema Field glossaryTerms](./dataset_transformer.md#pattern-add-dataset-schema-field-glossaryterms)
29+
- [Pattern Add Dataset Schema Field globalTags](./dataset_transformer.md#pattern-add-dataset-schema-field-globaltags)
30+
- [Simple Add Dataset datasetProperties](./dataset_transformer.md#simple-add-dataset-datasetproperties)
31+
- [Add Dataset datasetProperties](./dataset_transformer.md#add-dataset-datasetproperties)
32+
- [Simple Add Dataset domains](./dataset_transformer.md#simple-add-dataset-domains)
33+
- [Pattern Add Dataset domains](./dataset_transformer.md#pattern-add-dataset-domains)

metadata-ingestion/examples/transforms/custom_transform_example.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
import json
33
from typing import List, Optional
44

5-
from datahub.configuration.common import ConfigModel
5+
from datahub.configuration.common import ConfigModel, TransformerSemantics
66
from datahub.ingestion.api.common import PipelineContext
7-
from datahub.ingestion.transformer.add_dataset_ownership import Semantics
87
from datahub.ingestion.transformer.base_transformer import (
98
BaseTransformer,
109
SingleAspectTransformer,
@@ -18,7 +17,7 @@
1817

1918
class AddCustomOwnershipConfig(ConfigModel):
2019
owners_json: str
21-
semantics: Semantics = Semantics.OVERWRITE
20+
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE
2221

2322

2423
class AddCustomOwnership(BaseTransformer, SingleAspectTransformer):

metadata-ingestion/src/datahub/configuration/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class TransformerSemantics(Enum):
2626

2727
class TransformerSemanticsConfigModel(ConfigModel):
2828
semantics: TransformerSemantics = TransformerSemantics.OVERWRITE
29+
replace_existing: bool = False
2930

3031
@validator("semantics", pre=True)
3132
def ensure_semantics_is_upper_case(cls, v: str) -> str:

metadata-ingestion/src/datahub/ingestion/graph/client.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414
from datahub.emitter.rest_emitter import DatahubRestEmitter
1515
from datahub.emitter.serialization_helper import post_json_transform
1616
from datahub.metadata.schema_classes import (
17+
BrowsePathsClass,
18+
DatasetPropertiesClass,
1719
DatasetUsageStatisticsClass,
1820
DomainPropertiesClass,
1921
DomainsClass,
2022
GlobalTagsClass,
2123
GlossaryTermsClass,
2224
OwnershipClass,
25+
SchemaMetadataClass,
2326
TelemetryClientIdClass,
2427
)
2528
from datahub.utilities.urns.urn import Urn
@@ -185,13 +188,29 @@ def get_ownership(self, entity_urn: str) -> Optional[OwnershipClass]:
185188
aspect_type=OwnershipClass,
186189
)
187190

191+
def get_schema_metadata(self, entity_urn: str) -> Optional[SchemaMetadataClass]:
192+
return self.get_aspect_v2(
193+
entity_urn=entity_urn,
194+
aspect="schemaMetadata",
195+
aspect_type=SchemaMetadataClass,
196+
)
197+
188198
def get_domain_properties(self, entity_urn: str) -> Optional[DomainPropertiesClass]:
189199
return self.get_aspect_v2(
190200
entity_urn=entity_urn,
191201
aspect="domainProperties",
192202
aspect_type=DomainPropertiesClass,
193203
)
194204

205+
def get_dataset_properties(
206+
self, entity_urn: str
207+
) -> Optional[DatasetPropertiesClass]:
208+
return self.get_aspect_v2(
209+
entity_urn=entity_urn,
210+
aspect="datasetProperties",
211+
aspect_type=DatasetPropertiesClass,
212+
)
213+
195214
def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
196215
return self.get_aspect_v2(
197216
entity_urn=entity_urn,
@@ -213,6 +232,13 @@ def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
213232
aspect_type=DomainsClass,
214233
)
215234

235+
def get_browse_path(self, entity_urn: str) -> Optional[BrowsePathsClass]:
236+
return self.get_aspect_v2(
237+
entity_urn=entity_urn,
238+
aspect="browsePaths",
239+
aspect_type=BrowsePathsClass,
240+
)
241+
216242
def get_usage_aspects_from_urn(
217243
self, entity_urn: str, start_timestamp: int, end_timestamp: int
218244
) -> Optional[List[DatasetUsageStatisticsClass]]:
Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1-
from typing import List
1+
from typing import List, Optional, cast
22

3-
import datahub.emitter.mce_builder as builder
4-
from datahub.configuration.common import ConfigModel
3+
from datahub.configuration.common import (
4+
TransformerSemantics,
5+
TransformerSemanticsConfigModel,
6+
)
7+
from datahub.emitter.mce_builder import Aspect
58
from datahub.ingestion.api.common import PipelineContext
9+
from datahub.ingestion.graph.client import DataHubGraph
610
from datahub.ingestion.transformer.dataset_transformer import (
711
DatasetBrowsePathsTransformer,
812
)
9-
from datahub.metadata.schema_classes import BrowsePathsClass, MetadataChangeEventClass
13+
from datahub.metadata.schema_classes import BrowsePathsClass
1014

1115

12-
class AddDatasetBrowsePathConfig(ConfigModel):
16+
class AddDatasetBrowsePathConfig(TransformerSemanticsConfigModel):
1317
path_templates: List[str]
14-
replace_existing: bool = False
1518

1619

1720
class AddDatasetBrowsePathTransformer(DatasetBrowsePathsTransformer):
@@ -32,32 +35,59 @@ def create(
3235
config = AddDatasetBrowsePathConfig.parse_obj(config_dict)
3336
return cls(config, ctx)
3437

35-
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
38+
@staticmethod
39+
def get_browse_paths_to_set(
40+
graph: DataHubGraph, urn: str, mce_browse_paths: Optional[BrowsePathsClass]
41+
) -> Optional[BrowsePathsClass]:
42+
if not mce_browse_paths or not mce_browse_paths.paths:
43+
# nothing to add, no need to consult server
44+
return None
45+
46+
server_browse_paths = graph.get_browse_path(entity_urn=urn)
47+
if server_browse_paths:
48+
# compute patch
49+
# we only include domain who are not present in the server domain list
50+
paths_to_add: List[str] = []
51+
for path in mce_browse_paths.paths:
52+
if path not in server_browse_paths.paths:
53+
paths_to_add.append(path)
54+
# Lets patch
55+
mce_browse_paths.paths = []
56+
mce_browse_paths.paths.extend(server_browse_paths.paths)
57+
mce_browse_paths.paths.extend(paths_to_add)
58+
59+
return mce_browse_paths
60+
61+
def transform_aspect(
62+
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
63+
) -> Optional[Aspect]:
3664
platform_part, dataset_fqdn, env = (
37-
mce.proposedSnapshot.urn.replace("urn:li:dataset:(", "")
38-
.replace(")", "")
39-
.split(",")
65+
entity_urn.replace("urn:li:dataset:(", "").replace(")", "").split(",")
4066
)
67+
4168
platform = platform_part.replace("urn:li:dataPlatform:", "")
4269
dataset = dataset_fqdn.replace(".", "/")
4370

44-
browse_paths = builder.get_or_add_aspect(
45-
mce,
46-
BrowsePathsClass(
47-
paths=[],
48-
),
49-
)
50-
51-
if self.config.replace_existing:
52-
browse_paths.paths = []
71+
browse_paths = BrowsePathsClass(paths=[])
72+
if aspect is not None and self.config.replace_existing is False:
73+
browse_paths.paths.extend(aspect.paths) # type: ignore[attr-defined]
5374

5475
for template in self.config.path_templates:
5576
browse_path = (
5677
template.replace("PLATFORM", platform)
5778
.replace("DATASET_PARTS", dataset)
5879
.replace("ENV", env.lower())
5980
)
60-
6181
browse_paths.paths.append(browse_path)
6282

63-
return mce
83+
if self.config.semantics == TransformerSemantics.PATCH:
84+
assert self.ctx.graph
85+
patch_browse_paths: Optional[
86+
BrowsePathsClass
87+
] = AddDatasetBrowsePathTransformer.get_browse_paths_to_set(
88+
self.ctx.graph, entity_urn, browse_paths
89+
)
90+
if patch_browse_paths is not None:
91+
browse_paths = patch_browse_paths
92+
93+
return cast(Optional[Aspect], browse_paths)

0 commit comments

Comments
 (0)