Skip to content

Commit 6b3d679

Browse files
committed
feat(model, ingest): sizeInBytes in datasetProfile, populate size in snowflake
1 parent ee43262 commit 6b3d679

File tree

2 files changed

+66
-63
lines changed

2 files changed

+66
-63
lines changed

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py

Lines changed: 61 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
import dataclasses
12
import datetime
23
import logging
3-
from typing import Callable, Dict, Iterable, List, Optional
4+
from typing import Callable, Dict, Iterable, List, Optional, Tuple, cast
45

56
from sqlalchemy import create_engine, inspect
67

@@ -19,10 +20,17 @@
1920
)
2021
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
2122
from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile
23+
from datahub.metadata.schema_classes import DatasetProfileClass
2224

2325
logger = logging.getLogger(__name__)
2426

2527

28+
@dataclasses.dataclass
29+
class SnowflakeProfilerRequest(GEProfilerRequest):
30+
table: SnowflakeTable
31+
profile_table_level_only: bool = False
32+
33+
2634
class SnowflakeProfiler(SnowflakeCommonMixin):
2735
def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None:
2836
self.config = config
@@ -31,12 +39,6 @@ def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None
3139

3240
def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit]:
3341

34-
# If only table level profiling is enabled, report table profile and exit
35-
if self.config.profiling.profile_table_level_only:
36-
37-
yield from self.get_table_level_profile_workunits(databases)
38-
return
39-
4042
# Extra default SQLAlchemy option for better connection pooling and threading.
4143
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
4244
if self.config.profiling.enabled:
@@ -55,21 +57,22 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
5557
for table in schema.tables:
5658

5759
# Emit the profile work unit
58-
profile_request = self.get_ge_profile_request(
60+
profile_request = self.get_snowflake_profile_request(
5961
table, schema.name, db.name
6062
)
6163
if profile_request is not None:
6264
profile_requests.append(profile_request)
6365

6466
if len(profile_requests) == 0:
6567
continue
66-
ge_profiler = self.get_profiler_instance(db.name)
67-
for request, profile in ge_profiler.generate_profiles(
68+
for request, profile in self.generate_profiles(
69+
db.name,
6870
profile_requests,
6971
self.config.profiling.max_workers,
7072
platform=self.platform,
7173
profiler_args=self.get_profile_args(),
7274
):
75+
profile.sizeInBytes = request.table.size_in_bytes # type:ignore
7376
if profile is None:
7477
continue
7578
dataset_name = request.pretty_name
@@ -86,67 +89,26 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
8689
profile,
8790
)
8891

89-
def get_table_level_profile_workunits(
90-
self, databases: List[SnowflakeDatabase]
91-
) -> Iterable[WorkUnit]:
92-
for db in databases:
93-
if not self.config.database_pattern.allowed(db.name):
94-
continue
95-
for schema in db.schemas:
96-
if not self.config.schema_pattern.allowed(schema.name):
97-
continue
98-
for table in schema.tables:
99-
dataset_name = self.get_dataset_identifier(
100-
table.name, schema.name, db.name
101-
)
102-
# no need to filter by size_in_bytes and row_count limits,
103-
# if table level profilin, since its not expensive
104-
if not self.is_dataset_eligible_for_profiling(
105-
dataset_name,
106-
table.last_altered,
107-
0,
108-
0,
109-
):
110-
skip_profiling = True
111-
112-
if skip_profiling:
113-
if self.config.profiling.report_dropped_profiles:
114-
self.report.report_dropped(f"profile of {dataset_name}")
115-
return None
116-
117-
self.report.report_entity_profiled(dataset_name)
118-
119-
dataset_urn = make_dataset_urn_with_platform_instance(
120-
self.platform,
121-
dataset_name,
122-
self.config.platform_instance,
123-
self.config.env,
124-
)
125-
yield self.wrap_aspect_as_workunit(
126-
"dataset",
127-
dataset_urn,
128-
"datasetProfile",
129-
DatasetProfile(
130-
timestampMillis=round(
131-
datetime.datetime.now().timestamp() * 1000
132-
),
133-
columnCount=len(table.columns),
134-
rowCount=table.rows_count,
135-
),
136-
)
137-
138-
def get_ge_profile_request(
92+
def get_snowflake_profile_request(
13993
self,
14094
table: SnowflakeTable,
14195
schema_name: str,
14296
db_name: str,
143-
) -> Optional[GEProfilerRequest]:
97+
) -> Optional[SnowflakeProfilerRequest]:
14498
skip_profiling = False
99+
profile_table_level_only = self.config.profiling.profile_table_level_only
145100
dataset_name = self.get_dataset_identifier(table.name, schema_name, db_name)
146101
if not self.is_dataset_eligible_for_profiling(
147102
dataset_name, table.last_altered, table.size_in_bytes, table.rows_count
148103
):
149-
skip_profiling = True
104+
# Profile only table level if dataset is filtered from profiling
105+
# due to size limits alone
106+
if self.is_dataset_eligible_for_profiling(
107+
dataset_name, table.last_altered, 0, 0
108+
):
109+
profile_table_level_only = True
110+
else:
111+
skip_profiling = True
150112

151113
if len(table.columns) == 0:
152114
skip_profiling = True
@@ -158,9 +120,11 @@ def get_ge_profile_request(
158120

159121
self.report.report_entity_profiled(dataset_name)
160122
logger.debug(f"Preparing profiling request for {dataset_name}")
161-
profile_request = GEProfilerRequest(
123+
profile_request = SnowflakeProfilerRequest(
162124
pretty_name=dataset_name,
163125
batch_kwargs=dict(schema=schema_name, table=table.name),
126+
table=table,
127+
profile_table_level_only=profile_table_level_only,
164128
)
165129
return profile_request
166130

@@ -236,3 +200,37 @@ def get_db_connection():
236200
return conn
237201

238202
return get_db_connection
203+
204+
def generate_profiles(
205+
self,
206+
db_name: str,
207+
requests: List[SnowflakeProfilerRequest],
208+
max_workers: int,
209+
platform: Optional[str] = None,
210+
profiler_args: Optional[Dict] = None,
211+
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
212+
213+
ge_profile_requests: List[GEProfilerRequest] = [
214+
cast(GEProfilerRequest, request)
215+
for request in requests
216+
if not request.profile_table_level_only
217+
]
218+
table_level_profile_requests: List[SnowflakeProfilerRequest] = [
219+
request for request in requests if request.profile_table_level_only
220+
]
221+
for request in table_level_profile_requests:
222+
profile = DatasetProfile(
223+
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
224+
columnCount=len(request.table.columns),
225+
rowCount=request.table.rows_count,
226+
sizeInBytes=request.table.size_in_bytes,
227+
)
228+
yield (request, profile)
229+
230+
if len(ge_profile_requests) == 0:
231+
return
232+
233+
ge_profiler = self.get_profiler_instance(db_name)
234+
yield from ge_profiler.generate_profiles(
235+
ge_profile_requests, max_workers, platform, profiler_args
236+
)

metadata-models/src/main/pegasus/com/linkedin/dataset/DatasetProfile.pdl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,10 @@ record DatasetProfile includes TimeseriesAspectBase {
1414

1515
columnCount: optional long
1616

17+
/**
18+
* Storage size in bytes
19+
*/
20+
sizeInBytes: optional long
21+
1722
fieldProfiles: optional array[DatasetFieldProfile]
1823
}

0 commit comments

Comments
 (0)