Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/scripts/docgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ def generate(
"Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.\n\n\n"
)
f.write(
"For general pointers on writing and running a recipe, see our [main recipe guide](../../../../metadata-ingestion/README.md#recipes)\n"
"For general pointers on writing and running a recipe, see our [main recipe guide](../../../../metadata-ingestion/README.md#recipes).\n"
)
f.write("```yaml\n")
f.write(plugin_docs["recipe"])
Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class GlueSourceConfig(
default=False,
description="If an S3 Objects Tags should be created for the Tables ingested by Glue.",
)
profiling: GlueProfilingConfig = Field(
profiling: Optional[GlueProfilingConfig] = Field(
default=None,
description="Configs to ingest data profiles from glue table",
)
Expand Down Expand Up @@ -751,6 +751,8 @@ def _create_profile_mcp(
column_stats: dict,
partition_spec: Optional[str] = None,
) -> MetadataChangeProposalWrapper:
assert self.source_config.profiling

# instantiate profile class
dataset_profile = DatasetProfileClass(timestampMillis=get_sys_time())

Expand Down
14 changes: 9 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,24 @@ class Config:
) # needed to allow cached_property to work. See https://github.com/samuelcolvin/pydantic/issues/1241 for more info.

models: EmitDirective = Field(
"Yes", description="Emit metadata for dbt models when set to Yes or Only"
EmitDirective.YES,
description="Emit metadata for dbt models when set to Yes or Only",
)
sources: EmitDirective = Field(
"Yes", description="Emit metadata for dbt sources when set to Yes or Only"
EmitDirective.YES,
description="Emit metadata for dbt sources when set to Yes or Only",
)
seeds: EmitDirective = Field(
"Yes", description="Emit metadata for dbt seeds when set to Yes or Only"
EmitDirective.YES,
description="Emit metadata for dbt seeds when set to Yes or Only",
)
test_definitions: EmitDirective = Field(
"Yes",
EmitDirective.YES,
description="Emit metadata for test definitions when enabled when set to Yes or Only",
)
test_results: EmitDirective = Field(
"Yes", description="Emit metadata for test results when set to Yes or Only"
EmitDirective.YES,
description="Emit metadata for test results when set to Yes or Only",
)

@validator("*", pre=True, always=True)
Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class FileSourceConfig(ConfigModel):
)
read_mode: FileReadMode = FileReadMode.AUTO
aspect: Optional[str] = Field(
description="Set to an aspect to only read this aspect for ingestion."
default=None,
description="Set to an aspect to only read this aspect for ingestion.",
)

_minsize_for_streaming_mode_in_bytes: int = (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,52 @@
from typing import Optional

from pydantic.fields import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel


class GlueProfilingConfig(ConfigModel):
row_count: str = Field(
row_count: Optional[str] = Field(
default=None,
description="The parameter name for row count in glue table.",
)
column_count: str = Field(
column_count: Optional[str] = Field(
default=None,
description="The parameter name for column count in glue table.",
)
unique_count: str = Field(
unique_count: Optional[str] = Field(
default=None,
description="The parameter name for the count of unique value in a column.",
)
unique_proportion: str = Field(
unique_proportion: Optional[str] = Field(
default=None,
description="The parameter name for the proportion of unique values in a column.",
)
null_count: int = Field(
null_count: Optional[str] = Field(
default=None,
description="The parameter name for the count of null values in a column.",
)
null_proportion: str = Field(
null_proportion: Optional[str] = Field(
default=None,
description="The parameter name for the proportion of null values in a column.",
)
min: str = Field(
min: Optional[str] = Field(
default=None,
description="The parameter name for the min value of a column.",
)
max: str = Field(
max: Optional[str] = Field(
default=None,
description="The parameter name for the max value of a column.",
)
mean: str = Field(
mean: Optional[str] = Field(
default=None,
description="The parameter name for the mean value of a column.",
)
median: str = Field(
median: Optional[str] = Field(
default=None,
description="The parameter name for the median value of a column.",
)
stdev: str = Field(
stdev: Optional[str] = Field(
default=None,
description="The parameter name for the standard deviation of a column.",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ class IcebergProfilingConfig(ConfigModel):

class IcebergSourceConfig(DatasetSourceConfigBase):
adls: Optional[AdlsSourceConfig] = Field(
default=None,
description="[Azure Data Lake Storage](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction) to crawl for Iceberg tables. This is one filesystem type supported by this source and **only one can be configured**.",
)
localfs: Optional[str] = Field(
default=None,
description="Local path to crawl for Iceberg tables. This is one filesystem type supported by this source and **only one can be configured**.",
)
max_path_depth: int = Field(
Expand All @@ -64,6 +66,7 @@ class IcebergSourceConfig(DatasetSourceConfigBase):
description="Iceberg table property to look for a `CorpUser` owner. Can only hold a single user value. If property has no value, no owner information will be emitted.",
)
group_ownership_property: Optional[str] = Field(
default=None,
description="Iceberg table property to look for a `CorpGroup` owner. Can only hold a single group value. If property has no value, no owner information will be emitted.",
)
profiling: IcebergProfilingConfig = IcebergProfilingConfig()
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()
topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
domain: Dict[str, AllowDenyPattern] = pydantic.Field(
default_factory=dict,
default={},
description="A map of domain names to allow deny patterns. Domains can be urn-based (`urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810`) or bare (`13ae4d85-d955-49fc-8474-9004c663a810`).",
)
topic_subject_map: Dict[str, str] = pydantic.Field(
default_factory=dict,
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
# Custom Stateful Ingestion settings
Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ class LDAPSourceConfig(ConfigModel):
# Extraction configuration.
base_dn: str = Field(description="LDAP DN.")
filter: str = Field(default="(objectClass=*)", description="LDAP extractor filter.")
attrs_list: List[str] = Field(default=None, description="Retrieved attributes list")
attrs_list: Optional[List[str]] = Field(
default=None, description="Retrieved attributes list"
)

# If set to true, any users without first and last names will be dropped.
drop_missing_first_last_name: bool = Field(
Expand Down
14 changes: 9 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from typing import Dict, Iterable, Optional

import dateutil.parser as dp
import pydantic
import requests
from pydantic import validator
from pydantic.fields import Field
from pydantic import Field, validator
from requests.models import HTTPError
from sqllineage.runner import LineageRunner

Expand Down Expand Up @@ -48,8 +48,10 @@ class MetabaseConfig(DatasetLineageProviderConfigBase):
# See the Metabase /api/session endpoint for details
# https://www.metabase.com/docs/latest/api-documentation.html#post-apisession
connect_uri: str = Field(default="localhost:3000", description="Metabase host URL.")
username: str = Field(default=None, description="Metabase username.")
password: str = Field(default=None, description="Metabase password.")
username: Optional[str] = Field(default=None, description="Metabase username.")
password: Optional[pydantic.SecretStr] = Field(
default=None, description="Metabase password."
)
database_alias_map: Optional[dict] = Field(
default=None,
description="Database name map to use when constructing dataset URN.",
Expand Down Expand Up @@ -126,7 +128,9 @@ def __init__(self, ctx: PipelineContext, config: MetabaseConfig):
None,
{
"username": self.config.username,
"password": self.config.password,
"password": self.config.password.get_secret_value()
if self.config.password
else None,
},
)

Expand Down
15 changes: 10 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from typing import Dict, Iterable, Optional, Tuple, Union

import dateutil.parser as dp
import pydantic
import requests
import tenacity
from pydantic import validator
from pydantic.fields import Field
from pydantic import Field, validator
from requests.models import HTTPBasicAuth, HTTPError
from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
Expand Down Expand Up @@ -68,8 +68,10 @@ class ModeConfig(DatasetLineageProviderConfigBase):
connect_uri: str = Field(
default="https://app.mode.com", description="Mode host URL."
)
token: str = Field(default=None, description="Mode user token.")
password: str = Field(default=None, description="Mode password for authentication.")
token: Optional[str] = Field(default=None, description="Mode user token.")
password: Optional[pydantic.SecretStr] = Field(
default=None, description="Mode password for authentication."
)
workspace: Optional[str] = Field(default=None, description="")
default_schema: str = Field(
default="public",
Expand Down Expand Up @@ -168,7 +170,10 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig):
self.report = SourceReport()

self.session = requests.session()
self.session.auth = HTTPBasicAuth(self.config.token, self.config.password)
self.session.auth = HTTPBasicAuth(
self.config.token,
self.config.password.get_secret_value() if self.config.password else None,
)
self.session.headers.update(
{
"Content-Type": "application/json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class SupersetConfig(ConfigModel):
# See the Superset /security/login endpoint for details
# https://superset.apache.org/docs/rest-api
connect_uri: str = Field(default="localhost:8088", description="Superset host URL.")
display_uri: str = Field(
display_uri: Optional[str] = Field(
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
Expand Down
41 changes: 41 additions & 0 deletions metadata-ingestion/tests/unit/config/test_config_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import List

import pydantic
import pytest

from datahub.configuration.common import ConfigModel


def test_extras_not_allowed():
class MyConfig(ConfigModel):
required: str
optional: str = "bar"

MyConfig.parse_obj({"required": "foo"})
MyConfig.parse_obj({"required": "foo", "optional": "baz"})

with pytest.raises(pydantic.ValidationError):
MyConfig.parse_obj({"required": "foo", "extra": "extra"})


def test_default_object_copy():
# Doing this with dataclasses would yield a subtle bug: the default list
# objects would be shared between instances. However, pydantic is smart
# enough to copy the object when it's used as a default value.

class MyConfig(ConfigModel):
items: List[str] = []

items_field: List[str] = pydantic.Field(
default=[],
description="A list of items",
)

config_1 = MyConfig()
config_2 = MyConfig()

config_1.items.append("foo")
config_1.items_field.append("foo")

assert config_2.items == []
assert config_2.items_field == []