From cce477c4cd5f33ac707de73196ec70e1b6ff2dad Mon Sep 17 00:00:00 2001 From: Nguyen Tri Hieu Date: Thu, 25 Aug 2022 16:04:02 +0700 Subject: [PATCH 1/6] feat(parser): add ability to infer schema from S3 newline JSON file --- .../src/datahub/ingestion/source/s3/config.py | 5 +++++ .../src/datahub/ingestion/source/s3/source.py | 4 +++- .../ingestion/source/schema_inference/json.py | 15 +++++++++++---- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 12bfde146cf3c3..30dff478907867 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -64,6 +64,11 @@ class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase): description="Maximum number of rows to use when inferring schemas for TSV and CSV files.", ) + is_newline_json: bool = Field( + default=False, + description="Whether to infer schemas for newline JSON files." + ) + @pydantic.root_validator(pre=False) def validate_platform(cls, values: Dict) -> Dict: value = values.get("platform") diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index bcc611ef773293..d5e75914b8bc26 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -411,7 +411,9 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: max_rows=self.source_config.max_rows ).infer_schema(file) elif extension == ".json": - fields = json.JsonInferrer().infer_schema(file) + fields = json.JsonInferrer( + is_newline_json=True + ).infer_schema(file) elif extension == ".avro": fields = avro.AvroInferrer().infer_schema(file) else: diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py index 9d690d8304b770..8f6f4b3889f87b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py @@ -1,6 +1,7 @@ from typing import IO, Dict, List, Type, Union import ujson +import newlinejson as nlj from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase from datahub.ingestion.source.schema_inference.object import construct_schema @@ -29,12 +30,18 @@ class JsonInferrer(SchemaInferenceBase): - def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: + def __init__(self, is_newline_json: bool) -> None: + super().__init__() + self.is_newline_json = is_newline_json - datastore = ujson.load(file) + def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: + if self.is_newline_json: + datastore = nlj.open(file, json_lib=ujson) + else: + datastore = ujson.load(file) - if not isinstance(datastore, list): - datastore = [datastore] + if not isinstance(datastore, list): + datastore = [datastore] schema = construct_schema(datastore, delimiter=".") fields: List[SchemaField] = [] From 14f58dd7ba9a44641faa4d1daa07d4692d5484ca Mon Sep 17 00:00:00 2001 From: Nguyen Tri Hieu Date: Thu, 25 Aug 2022 17:20:37 +0700 Subject: [PATCH 2/6] feat(parser): add ability to infer schema from S3 newline JSON file - update description --- metadata-ingestion/src/datahub/ingestion/source/s3/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 30dff478907867..c51384d6979b11 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -64,7 +64,7 @@ class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase): description="Maximum number of rows to use when inferring schemas for TSV and CSV files.", ) - is_newline_json: bool = Field( + is_newline_json: Optional[bool] = Field( default=False, description="Whether to infer schemas for newline JSON files." ) From 39a3146f207698b21eb9c830e7cfb4303f2e6636 Mon Sep 17 00:00:00 2001 From: Nguyen Tri Hieu Date: Sun, 28 Aug 2022 15:26:22 +0700 Subject: [PATCH 3/6] feat(parser): add pip dependency and change the initial value of is_newline_json --- docker/datahub-ingestion/base-requirements.txt | 1 + metadata-ingestion/src/datahub/ingestion/source/s3/source.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/datahub-ingestion/base-requirements.txt b/docker/datahub-ingestion/base-requirements.txt index f986e8af4df1dc..a054141bffd2df 100644 --- a/docker/datahub-ingestion/base-requirements.txt +++ b/docker/datahub-ingestion/base-requirements.txt @@ -168,6 +168,7 @@ nbconvert==6.5.1 nbformat==5.4.0 nest-asyncio==1.5.5 networkx==2.8.4 +NewlineJSON==1.0 notebook==6.4.12 numpy==1.22.4 oauthlib==3.2.0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index d5e75914b8bc26..f938e1b4c1860a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -412,7 +412,7 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: ).infer_schema(file) elif extension == ".json": fields = json.JsonInferrer( - is_newline_json=True + is_newline_json=self.source_config.is_newline_json ).infer_schema(file) elif extension == ".avro": fields = avro.AvroInferrer().infer_schema(file) From 4d5c6fb09e9343cf14e391d643c66a63a5ee6308 Mon Sep 17 00:00:00 2001 From: Nguyen Tri Hieu Date: Mon, 5 Sep 2022 01:48:19 +0700 Subject: [PATCH 4/6] feat(parser): improve json processing --- .../datahub-ingestion/base-requirements.txt | 1 - .../src/datahub/ingestion/source/s3/config.py | 5 ----- .../src/datahub/ingestion/source/s3/source.py | 4 +--- .../ingestion/source/schema_inference/json.py | 22 +++++++++---------- 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/docker/datahub-ingestion/base-requirements.txt b/docker/datahub-ingestion/base-requirements.txt index a054141bffd2df..f986e8af4df1dc 100644 --- a/docker/datahub-ingestion/base-requirements.txt +++ b/docker/datahub-ingestion/base-requirements.txt @@ -168,7 +168,6 @@ nbconvert==6.5.1 nbformat==5.4.0 nest-asyncio==1.5.5 networkx==2.8.4 -NewlineJSON==1.0 notebook==6.4.12 numpy==1.22.4 oauthlib==3.2.0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index c51384d6979b11..12bfde146cf3c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -64,11 +64,6 @@ class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase): description="Maximum number of rows to use when inferring schemas for TSV and CSV files.", ) - is_newline_json: Optional[bool] = Field( - default=False, - description="Whether to infer schemas for newline JSON files." - ) - @pydantic.root_validator(pre=False) def validate_platform(cls, values: Dict) -> Dict: value = values.get("platform") diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index f938e1b4c1860a..bcc611ef773293 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -411,9 +411,7 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: max_rows=self.source_config.max_rows ).infer_schema(file) elif extension == ".json": - fields = json.JsonInferrer( - is_newline_json=self.source_config.is_newline_json - ).infer_schema(file) + fields = json.JsonInferrer().infer_schema(file) elif extension == ".avro": fields = avro.AvroInferrer().infer_schema(file) else: diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py index 8f6f4b3889f87b..0ba1726e4b4f21 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py @@ -1,7 +1,8 @@ +from asyncio.log import logger from typing import IO, Dict, List, Type, Union import ujson -import newlinejson as nlj +import jsonlines as jsl from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase from datahub.ingestion.source.schema_inference.object import construct_schema @@ -30,18 +31,17 @@ class JsonInferrer(SchemaInferenceBase): - def __init__(self, is_newline_json: bool) -> None: - super().__init__() - self.is_newline_json = is_newline_json - def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: - if self.is_newline_json: - datastore = nlj.open(file, json_lib=ujson) - else: + try: datastore = ujson.load(file) - - if not isinstance(datastore, list): - datastore = [datastore] + except ujson.JSONDecodeError as e: + logger.info(f"Got ValueError: {e}. Retry with jsonlines") + file.seek(0) + reader = jsl.Reader(file) + datastore = [obj for obj in reader.iter(type=dict, skip_invalid=True)] + + if not isinstance(datastore, list): + datastore = [datastore] schema = construct_schema(datastore, delimiter=".") fields: List[SchemaField] = [] From d86d6c97655ac6a38de77621ea675d4f62a748a7 Mon Sep 17 00:00:00 2001 From: Nguyen Tri Hieu Date: Fri, 9 Sep 2022 10:20:28 +0700 Subject: [PATCH 5/6] feat(ingestion): minor change on logger --- .../src/datahub/ingestion/source/schema_inference/json.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py index 0ba1726e4b4f21..9d0613f4785e2c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py @@ -1,4 +1,4 @@ -from asyncio.log import logger +import logging from typing import IO, Dict, List, Type, Union import ujson @@ -29,6 +29,7 @@ "mixed": UnionTypeClass, } +logger = logging.getLogger(__name__) class JsonInferrer(SchemaInferenceBase): def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: From 66f71685cee7cf31d054a23c9672042edbe47ef0 Mon Sep 17 00:00:00 2001 From: Nguyen Tri Hieu Date: Mon, 12 Sep 2022 00:35:24 +0700 Subject: [PATCH 6/6] feat(ingestion): minor update for linting check --- .../src/datahub/ingestion/source/schema_inference/json.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py index 9d0613f4785e2c..c53c64be4cba80 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py +++ b/metadata-ingestion/src/datahub/ingestion/source/schema_inference/json.py @@ -1,8 +1,8 @@ import logging from typing import IO, Dict, List, Type, Union -import ujson import jsonlines as jsl +import ujson from datahub.ingestion.source.schema_inference.base import SchemaInferenceBase from datahub.ingestion.source.schema_inference.object import construct_schema @@ -31,6 +31,7 @@ logger = logging.getLogger(__name__) + class JsonInferrer(SchemaInferenceBase): def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: try: @@ -39,8 +40,8 @@ def infer_schema(self, file: IO[bytes]) -> List[SchemaField]: logger.info(f"Got ValueError: {e}. Retry with jsonlines") file.seek(0) reader = jsl.Reader(file) - datastore = [obj for obj in reader.iter(type=dict, skip_invalid=True)] - + datastore = [obj for obj in reader.iter(type=dict, skip_invalid=True)] + if not isinstance(datastore, list): datastore = [datastore]