Skip to content

Commit 386719f

Browse files
authored
feat(platform): timeseries - Server & Client side changes to support timeseries aspect deletion & rollback. (#4756)
1 parent e556bcb commit 386719f

File tree

26 files changed

+1101
-86
lines changed

26 files changed

+1101
-86
lines changed

docs/how/delete-metadata.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,28 @@ This physically deletes all rows for all aspects of the entity. This action cann
3333
datahub delete --urn "<my urn>" --hard
3434
```
3535

36-
As of datahub v.0.8.35 doing a hard delete by urn will also provide you with a way to remove references to the urn being deleted across the metadata graph. This is important to use if you don't want to have ghost references in your metadata model and want to save space in the graph database.
36+
As of datahub v0.8.35 doing a hard delete by urn will also provide you with a way to remove references to the urn being deleted across the metadata graph. This is important to use if you don't want to have ghost references in your metadata model and want to save space in the graph database.
3737
For now, this behaviour must be opted into by a prompt that will appear for you to manually accept or deny.
3838

39+
Starting v0.8.44.2, this also supports deletion of a specific `timeseries` aspect associated with the entity, optionally for a specific time range.
40+
41+
_Note: Deletion by a specific aspect and time range is currently supported only for timeseries aspects._
42+
43+
```bash
44+
# Delete all of the aspect values for a given entity and a timeseries aspect.
45+
datahub delete --urn "<entity urn>" -a "<timeseries aspect>" --hard
46+
Eg: datahub delete --urn "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_dataset,TEST)" -a "datasetProfile" --hard
47+
48+
# Delete all of the aspect values for a given platform and a timeseries aspect.
49+
datahub delete -p "<platform>" -a "<timeseries aspect>" --hard
50+
Eg: datahub delete -p "snowflake" -a "datasetProfile" --hard
51+
52+
# Delete the aspect values for a given platform and a timeseries aspect corresponding to a specific time range.
53+
datahub delete -p "<platform>" -a "<timeseries aspect>" --start-time '<start_time>' --end-time '<end_time>' --hard
54+
Eg: datahub delete -p "snowflake" -a "datasetProfile" --start-time '2022-05-29 00:00:00' --end-time '2022-05-31 00:00:00' --hard
55+
```
56+
57+
3958
You can optionally add `-n` or `--dry-run` to execute a dry run before issuing the final delete command.
4059
You can optionally add `-f` or `--force` to skip confirmations
4160
You can optionally add `--only-soft-deleted` flag to remove soft-deleted items only.
@@ -119,6 +138,7 @@ datahub ingest rollback --run-id <run-id>
119138
```
120139

121140
to rollback all aspects added with this run and all entities created by this run.
141+
This deletes both the versioned and the timeseries aspects associated with these entities.
122142

123143
### Unsafe Entities and Rollback
124144

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.linkedin.metadata.models;
2+
3+
import com.linkedin.metadata.models.registry.EntityRegistry;
4+
import java.util.List;
5+
import java.util.stream.Collectors;
6+
import javax.annotation.Nonnull;
7+
8+
9+
public class EntitySpecUtils {
10+
private EntitySpecUtils() {
11+
}
12+
13+
public static List<String> getEntityTimeseriesAspectNames(@Nonnull EntityRegistry entityRegistry,
14+
@Nonnull String entityName) {
15+
final EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
16+
final List<String> timeseriesAspectNames = entitySpec.getAspectSpecs()
17+
.stream()
18+
.filter(x -> x.isTimeseries())
19+
.map(x -> x.getName())
20+
.collect(Collectors.toList());
21+
return timeseriesAspectNames;
22+
}
23+
}

metadata-ingestion/src/datahub/cli/cli_utils.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def post_delete_endpoint(
303303
payload_obj: dict,
304304
path: str,
305305
cached_session_host: Optional[Tuple[Session, str]] = None,
306-
) -> typing.Tuple[str, int]:
306+
) -> typing.Tuple[str, int, int]:
307307
session, gms_host = cached_session_host or get_session_and_host()
308308
url = gms_host + path
309309

@@ -314,16 +314,17 @@ def post_delete_endpoint_with_session_and_url(
314314
session: Session,
315315
url: str,
316316
payload_obj: dict,
317-
) -> typing.Tuple[str, int]:
317+
) -> typing.Tuple[str, int, int]:
318318
payload = json.dumps(payload_obj)
319319

320320
response = session.post(url, payload)
321321

322322
summary = parse_run_restli_response(response)
323-
urn = summary.get("urn", "")
324-
rows_affected = summary.get("rows", 0)
323+
urn: str = summary.get("urn", "")
324+
rows_affected: int = summary.get("rows", 0)
325+
timeseries_rows_affected: int = summary.get("timeseriesRows", 0)
325326

326-
return urn, rows_affected
327+
return urn, rows_affected, timeseries_rows_affected
327328

328329

329330
def get_urns_by_filter(
@@ -624,7 +625,7 @@ def get_aspects_for_entity(
624625
# Process timeseries aspects & append to aspect_list
625626
timeseries_aspects: List[str] = [a for a in aspects if a in TIMESERIES_ASPECT_MAP]
626627
for timeseries_aspect in timeseries_aspects:
627-
timeseries_response = get_latest_timeseries_aspect_values(
628+
timeseries_response: Dict = get_latest_timeseries_aspect_values(
628629
entity_urn, timeseries_aspect, cached_session_host
629630
)
630631
values: List[Dict] = timeseries_response.get("value", {}).get("values", [])
@@ -633,18 +634,13 @@ def get_aspects_for_entity(
633634
timeseries_aspect
634635
)
635636
if aspect_cls is not None:
636-
aspect_value = values[0]
637+
ts_aspect = values[0]["aspect"]
637638
# Decode the json-encoded generic aspect value.
638-
aspect_value["aspect"]["value"] = json.loads(
639-
aspect_value["aspect"]["value"]
640-
)
641-
aspect_list[
642-
aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "")
643-
] = aspect_value
639+
ts_aspect["value"] = json.loads(ts_aspect["value"])
640+
aspect_list[timeseries_aspect] = ts_aspect
644641

645642
aspect_map: Dict[str, Union[dict, _Aspect]] = {}
646-
for a in aspect_list.values():
647-
aspect_name = a["name"]
643+
for aspect_name, a in aspect_list.items():
648644
aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name(
649645
aspect_name
650646
)

metadata-ingestion/src/datahub/cli/delete_cli.py

Lines changed: 84 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import logging
22
import time
33
from dataclasses import dataclass
4+
from datetime import datetime
45
from random import choices
5-
from typing import Dict, List, Optional, Tuple
6+
from typing import Any, Dict, List, Optional, Tuple
67

78
import click
89
import progressbar
@@ -30,25 +31,27 @@
3031

3132
@dataclass
3233
class DeletionResult:
33-
start_time_millis: int = int(time.time() * 1000.0)
34-
end_time_millis: int = 0
34+
start_time: int = int(time.time() * 1000.0)
35+
end_time: int = 0
3536
num_records: int = 0
37+
num_timeseries_records: int = 0
3638
num_entities: int = 0
3739
sample_records: Optional[List[List[str]]] = None
3840

3941
def start(self) -> None:
40-
self.start_time_millis = int(time.time() * 1000.0)
42+
self.start_time = int(time.time() * 1000.0)
4143

4244
def end(self) -> None:
43-
self.end_time_millis = int(time.time() * 1000.0)
45+
self.end_time = int(time.time() * 1000.0)
4446

4547
def merge(self, another_result: "DeletionResult") -> None:
46-
self.end_time_millis = another_result.end_time_millis
48+
self.end_time = another_result.end_time
4749
self.num_records = (
4850
self.num_records + another_result.num_records
4951
if another_result.num_records != UNKNOWN_NUM_RECORDS
5052
else UNKNOWN_NUM_RECORDS
5153
)
54+
self.num_timeseries_records += another_result.num_timeseries_records
5255
self.num_entities += another_result.num_entities
5356
if another_result.sample_records:
5457
if not self.sample_records:
@@ -82,26 +85,66 @@ def delete_for_registry(
8285

8386

8487
@click.command()
85-
@click.option("--urn", required=False, type=str)
86-
@click.option("-f", "--force", required=False, is_flag=True)
87-
@click.option("--soft/--hard", required=False, is_flag=True, default=True)
88-
@click.option("-e", "--env", required=False, type=str)
89-
@click.option("-p", "--platform", required=False, type=str)
90-
@click.option("--entity_type", required=False, type=str, default="dataset")
88+
@click.option("--urn", required=False, type=str, help="the urn of the entity")
89+
@click.option(
90+
"-a",
91+
"--aspect_name",
92+
required=False,
93+
type=str,
94+
help="the aspect name associated with the entity(only for timeseries aspects)",
95+
)
96+
@click.option(
97+
"-f", "--force", required=False, is_flag=True, help="force the delete if set"
98+
)
99+
@click.option(
100+
"--soft/--hard",
101+
required=False,
102+
is_flag=True,
103+
default=True,
104+
help="specifies soft/hard deletion",
105+
)
106+
@click.option(
107+
"-e", "--env", required=False, type=str, help="the environment of the entity"
108+
)
109+
@click.option(
110+
"-p", "--platform", required=False, type=str, help="the platform of the entity"
111+
)
112+
@click.option(
113+
"--entity_type",
114+
required=False,
115+
type=str,
116+
default="dataset",
117+
help="the entity_type of the entity",
118+
)
91119
@click.option("--query", required=False, type=str)
120+
@click.option(
121+
"--start-time",
122+
required=False,
123+
type=click.DateTime(),
124+
help="the start time(only for timeseries aspects)",
125+
)
126+
@click.option(
127+
"--end-time",
128+
required=False,
129+
type=click.DateTime(),
130+
help="the end time(only for timeseries aspects)",
131+
)
92132
@click.option("--registry-id", required=False, type=str)
93133
@click.option("-n", "--dry-run", required=False, is_flag=True)
94134
@click.option("--only-soft-deleted", required=False, is_flag=True, default=False)
95135
@upgrade.check_upgrade
96136
@telemetry.with_telemetry
97137
def delete(
98138
urn: str,
139+
aspect_name: Optional[str],
99140
force: bool,
100141
soft: bool,
101142
env: str,
102143
platform: str,
103144
entity_type: str,
104145
query: str,
146+
start_time: Optional[datetime],
147+
end_time: Optional[datetime],
105148
registry_id: str,
106149
dry_run: bool,
107150
only_soft_deleted: bool,
@@ -161,9 +204,12 @@ def delete(
161204

162205
deletion_result: DeletionResult = delete_one_urn_cmd(
163206
urn,
207+
aspect_name=aspect_name,
164208
soft=soft,
165209
dry_run=dry_run,
166210
entity_type=entity_type,
211+
start_time=start_time,
212+
end_time=end_time,
167213
cached_session_host=(session, host),
168214
)
169215

@@ -201,11 +247,14 @@ def delete(
201247
if not dry_run:
202248
message = "soft delete" if soft else "hard delete"
203249
click.echo(
204-
f"Took {(deletion_result.end_time_millis-deletion_result.start_time_millis)/1000.0} seconds to {message} {deletion_result.num_records} rows for {deletion_result.num_entities} entities"
250+
f"Took {(deletion_result.end_time-deletion_result.start_time)/1000.0} seconds to {message}"
251+
f" {deletion_result.num_records} versioned rows"
252+
f" and {deletion_result.num_timeseries_records} timeseries aspect rows"
253+
f" for {deletion_result.num_entities} entities."
205254
)
206255
else:
207256
click.echo(
208-
f"{deletion_result.num_entities} entities with {deletion_result.num_records if deletion_result.num_records != UNKNOWN_NUM_RECORDS else 'unknown'} rows will be affected. Took {(deletion_result.end_time_millis-deletion_result.start_time_millis)/1000.0} seconds to evaluate."
257+
f"{deletion_result.num_entities} entities with {deletion_result.num_records if deletion_result.num_records != UNKNOWN_NUM_RECORDS else 'unknown'} rows will be affected. Took {(deletion_result.end_time-deletion_result.start_time)/1000.0} seconds to evaluate."
209258
)
210259
if deletion_result.sample_records:
211260
click.echo(
@@ -276,7 +325,7 @@ def delete_with_filters(
276325
click.echo(
277326
f"No urns to delete. Maybe you want to change entity_type={entity_type} or platform={platform} to be something different?"
278327
)
279-
return DeletionResult(end_time_millis=int(time.time() * 1000.0))
328+
return DeletionResult(end_time=int(time.time() * 1000.0))
280329

281330
if not force and not dry_run:
282331
type_delete = "soft" if soft else "permanently"
@@ -320,6 +369,9 @@ def _delete_one_urn(
320369
soft: bool = False,
321370
dry_run: bool = False,
322371
entity_type: str = "dataset",
372+
aspect_name: Optional[str] = None,
373+
start_time: Optional[datetime] = None,
374+
end_time: Optional[datetime] = None,
323375
cached_session_host: Optional[Tuple[sessions.Session, str]] = None,
324376
cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None,
325377
run_id: str = "delete-run-id",
@@ -359,13 +411,22 @@ def _delete_one_urn(
359411
else:
360412
logger.info(f"[Dry-run] Would soft-delete {urn}")
361413
elif not dry_run:
362-
payload_obj = {"urn": urn}
363-
urn, rows_affected = cli_utils.post_delete_endpoint(
414+
payload_obj: Dict[str, Any] = {"urn": urn}
415+
if aspect_name:
416+
payload_obj["aspectName"] = aspect_name
417+
if start_time:
418+
payload_obj["startTimeMillis"] = int(round(start_time.timestamp() * 1000))
419+
if end_time:
420+
payload_obj["endTimeMillis"] = int(round(end_time.timestamp() * 1000))
421+
rows_affected: int
422+
ts_rows_affected: int
423+
urn, rows_affected, ts_rows_affected = cli_utils.post_delete_endpoint(
364424
payload_obj,
365425
"/entities?action=delete",
366426
cached_session_host=cached_session_host,
367427
)
368428
deletion_result.num_records = rows_affected
429+
deletion_result.num_timeseries_records = ts_rows_affected
369430
else:
370431
logger.info(f"[Dry-run] Would hard-delete {urn} {soft_delete_msg}")
371432
deletion_result.num_records = (
@@ -379,9 +440,12 @@ def _delete_one_urn(
379440
@telemetry.with_telemetry
380441
def delete_one_urn_cmd(
381442
urn: str,
443+
aspect_name: Optional[str] = None,
382444
soft: bool = False,
383445
dry_run: bool = False,
384446
entity_type: str = "dataset",
447+
start_time: Optional[datetime] = None,
448+
end_time: Optional[datetime] = None,
385449
cached_session_host: Optional[Tuple[sessions.Session, str]] = None,
386450
cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None,
387451
) -> DeletionResult:
@@ -396,6 +460,9 @@ def delete_one_urn_cmd(
396460
soft,
397461
dry_run,
398462
entity_type,
463+
aspect_name,
464+
start_time,
465+
end_time,
399466
cached_session_host,
400467
cached_emitter,
401468
)

metadata-io/src/main/java/com/linkedin/metadata/timeseries/TimeseriesAspectService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.linkedin.metadata.aspect.EnvelopedAspect;
66
import com.linkedin.metadata.query.filter.Filter;
77
import com.linkedin.timeseries.AggregationSpec;
8+
import com.linkedin.timeseries.DeleteAspectValuesResult;
89
import com.linkedin.timeseries.GenericTable;
910
import com.linkedin.timeseries.GroupingBucket;
1011
import java.util.List;
@@ -29,4 +30,23 @@ List<EnvelopedAspect> getAspectValues(@Nonnull final Urn urn, @Nonnull String en
2930
@Nonnull
3031
GenericTable getAggregatedStats(@Nonnull String entityName, @Nonnull String aspectName,
3132
@Nonnull AggregationSpec[] aggregationSpecs, @Nullable Filter filter, @Nullable GroupingBucket[] groupingBuckets);
33+
34+
/**
35+
* Generic filter based deletion for timseries aspects.
36+
* @param entityName - The name of the entity.
37+
* @param aspectName - The name of the aspect.
38+
* @param filter - The filter to be used for deletion of the documents on the index.
39+
* @return - number of documents deleted.
40+
*/
41+
@Nonnull
42+
DeleteAspectValuesResult deleteAspectValues(@Nonnull String entityName, @Nonnull String aspectName,
43+
@Nonnull Filter filter);
44+
45+
/**
46+
* Rollback the timeseries aspects associated with a runId.
47+
* @param runId The runId that needs to be rolledback.
48+
* @return
49+
*/
50+
@Nonnull
51+
DeleteAspectValuesResult rollbackTimeseriesAspects(@Nonnull String runId);
3252
}

0 commit comments

Comments
 (0)