1
+ import dataclasses
1
2
import datetime
2
3
import logging
3
- from typing import Callable , Dict , Iterable , List , Optional
4
+ from typing import Callable , Dict , Iterable , List , Optional , Tuple , cast
4
5
5
6
from sqlalchemy import create_engine , inspect
6
7
19
20
)
20
21
from datahub .ingestion .source .snowflake .snowflake_utils import SnowflakeCommonMixin
21
22
from datahub .metadata .com .linkedin .pegasus2avro .dataset import DatasetProfile
23
+ from datahub .metadata .schema_classes import DatasetProfileClass
22
24
23
25
logger = logging .getLogger (__name__ )
24
26
25
27
28
+ @dataclasses .dataclass
29
+ class SnowflakeProfilerRequest (GEProfilerRequest ):
30
+ table : SnowflakeTable
31
+ profile_table_level_only : bool = False
32
+
33
+
26
34
class SnowflakeProfiler (SnowflakeCommonMixin ):
27
35
def __init__ (self , config : SnowflakeV2Config , report : SnowflakeV2Report ) -> None :
28
36
self .config = config
@@ -31,12 +39,6 @@ def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None
31
39
32
40
def get_workunits (self , databases : List [SnowflakeDatabase ]) -> Iterable [WorkUnit ]:
33
41
34
- # If only table level profiling is enabled, report table profile and exit
35
- if self .config .profiling .profile_table_level_only :
36
-
37
- yield from self .get_table_level_profile_workunits (databases )
38
- return
39
-
40
42
# Extra default SQLAlchemy option for better connection pooling and threading.
41
43
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
42
44
if self .config .profiling .enabled :
@@ -55,21 +57,22 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
55
57
for table in schema .tables :
56
58
57
59
# Emit the profile work unit
58
- profile_request = self .get_ge_profile_request (
60
+ profile_request = self .get_snowflake_profile_request (
59
61
table , schema .name , db .name
60
62
)
61
63
if profile_request is not None :
62
64
profile_requests .append (profile_request )
63
65
64
66
if len (profile_requests ) == 0 :
65
67
continue
66
- ge_profiler = self .get_profiler_instance ( db . name )
67
- for request , profile in ge_profiler . generate_profiles (
68
+ for request , profile in self .generate_profiles (
69
+ db . name ,
68
70
profile_requests ,
69
71
self .config .profiling .max_workers ,
70
72
platform = self .platform ,
71
73
profiler_args = self .get_profile_args (),
72
74
):
75
+ profile .sizeInBytes = request .table .size_in_bytes # type:ignore
73
76
if profile is None :
74
77
continue
75
78
dataset_name = request .pretty_name
@@ -86,68 +89,26 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
86
89
profile ,
87
90
)
88
91
89
- def get_table_level_profile_workunits (
90
- self , databases : List [SnowflakeDatabase ]
91
- ) -> Iterable [WorkUnit ]:
92
- for db in databases :
93
- if not self .config .database_pattern .allowed (db .name ):
94
- continue
95
- for schema in db .schemas :
96
- if not self .config .schema_pattern .allowed (schema .name ):
97
- continue
98
- for table in schema .tables :
99
- dataset_name = self .get_dataset_identifier (
100
- table .name , schema .name , db .name
101
- )
102
- skip_profiling = False
103
- # no need to filter by size_in_bytes and row_count limits,
104
- # if table level profilin, since its not expensive
105
- if not self .is_dataset_eligible_for_profiling (
106
- dataset_name ,
107
- table .last_altered ,
108
- 0 ,
109
- 0 ,
110
- ):
111
- skip_profiling = True
112
-
113
- if skip_profiling :
114
- if self .config .profiling .report_dropped_profiles :
115
- self .report .report_dropped (f"profile of { dataset_name } " )
116
- return None
117
-
118
- self .report .report_entity_profiled (dataset_name )
119
-
120
- dataset_urn = make_dataset_urn_with_platform_instance (
121
- self .platform ,
122
- dataset_name ,
123
- self .config .platform_instance ,
124
- self .config .env ,
125
- )
126
- yield self .wrap_aspect_as_workunit (
127
- "dataset" ,
128
- dataset_urn ,
129
- "datasetProfile" ,
130
- DatasetProfile (
131
- timestampMillis = round (
132
- datetime .datetime .now ().timestamp () * 1000
133
- ),
134
- columnCount = len (table .columns ),
135
- rowCount = table .rows_count ,
136
- ),
137
- )
138
-
139
- def get_ge_profile_request (
92
+ def get_snowflake_profile_request (
140
93
self ,
141
94
table : SnowflakeTable ,
142
95
schema_name : str ,
143
96
db_name : str ,
144
- ) -> Optional [GEProfilerRequest ]:
97
+ ) -> Optional [SnowflakeProfilerRequest ]:
145
98
skip_profiling = False
99
+ profile_table_level_only = self .config .profiling .profile_table_level_only
146
100
dataset_name = self .get_dataset_identifier (table .name , schema_name , db_name )
147
101
if not self .is_dataset_eligible_for_profiling (
148
102
dataset_name , table .last_altered , table .size_in_bytes , table .rows_count
149
103
):
150
- skip_profiling = True
104
+ # Profile only table level if dataset is filtered from profiling
105
+ # due to size limits alone
106
+ if self .is_dataset_eligible_for_profiling (
107
+ dataset_name , table .last_altered , 0 , 0
108
+ ):
109
+ profile_table_level_only = True
110
+ else :
111
+ skip_profiling = True
151
112
152
113
if len (table .columns ) == 0 :
153
114
skip_profiling = True
@@ -159,9 +120,11 @@ def get_ge_profile_request(
159
120
160
121
self .report .report_entity_profiled (dataset_name )
161
122
logger .debug (f"Preparing profiling request for { dataset_name } " )
162
- profile_request = GEProfilerRequest (
123
+ profile_request = SnowflakeProfilerRequest (
163
124
pretty_name = dataset_name ,
164
125
batch_kwargs = dict (schema = schema_name , table = table .name ),
126
+ table = table ,
127
+ profile_table_level_only = profile_table_level_only ,
165
128
)
166
129
return profile_request
167
130
@@ -237,3 +200,37 @@ def get_db_connection():
237
200
return conn
238
201
239
202
return get_db_connection
203
+
204
+ def generate_profiles (
205
+ self ,
206
+ db_name : str ,
207
+ requests : List [SnowflakeProfilerRequest ],
208
+ max_workers : int ,
209
+ platform : Optional [str ] = None ,
210
+ profiler_args : Optional [Dict ] = None ,
211
+ ) -> Iterable [Tuple [GEProfilerRequest , Optional [DatasetProfileClass ]]]:
212
+
213
+ ge_profile_requests : List [GEProfilerRequest ] = [
214
+ cast (GEProfilerRequest , request )
215
+ for request in requests
216
+ if not request .profile_table_level_only
217
+ ]
218
+ table_level_profile_requests : List [SnowflakeProfilerRequest ] = [
219
+ request for request in requests if request .profile_table_level_only
220
+ ]
221
+ for request in table_level_profile_requests :
222
+ profile = DatasetProfile (
223
+ timestampMillis = round (datetime .datetime .now ().timestamp () * 1000 ),
224
+ columnCount = len (request .table .columns ),
225
+ rowCount = request .table .rows_count ,
226
+ sizeInBytes = request .table .size_in_bytes ,
227
+ )
228
+ yield (request , profile )
229
+
230
+ if len (ge_profile_requests ) == 0 :
231
+ return
232
+
233
+ ge_profiler = self .get_profiler_instance (db_name )
234
+ yield from ge_profiler .generate_profiles (
235
+ ge_profile_requests , max_workers , platform , profiler_args
236
+ )
0 commit comments