From bffc45eb7827af7ac9f92076e50956c37d5cd8e3 Mon Sep 17 00:00:00 2001 From: aschroed Date: Wed, 24 Sep 2025 19:12:01 -0400 Subject: [PATCH 1/4] add helper es_util call for expand_es_metadata --- .../checks/helpers/es_utils.py | 8 +++ .../checks/helpers/wfr_utils.py | 55 +++++++++++-------- chalicelib_fourfront/checks/wfr_checks.py | 2 +- .../checks/wfr_encode_checks.py | 30 +++++----- pyproject.toml | 4 +- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/chalicelib_fourfront/checks/helpers/es_utils.py b/chalicelib_fourfront/checks/helpers/es_utils.py index 8b825cd6..9b978012 100644 --- a/chalicelib_fourfront/checks/helpers/es_utils.py +++ b/chalicelib_fourfront/checks/helpers/es_utils.py @@ -3,6 +3,7 @@ from dcicutils.es_utils import create_es_client from dcicutils import ff_utils + def get_es_metadata(*args, **kwargs): if (kwargs.get("es_client", None) is None) and ((es_host_local := _get_es_host_local()) is not None): es_client = create_es_client(es_host_local, use_aws_auth=True) @@ -10,5 +11,12 @@ def get_es_metadata(*args, **kwargs): return ff_utils.get_es_metadata(*args, **kwargs) +def expand_es_metadata(*args, **kwargs): + if (kwargs.get("es_client", None) is None) and ((es_host_local := _get_es_host_local()) is not None): + es_client = create_es_client(es_host_local, use_aws_auth=True) + return ff_utils.expand_es_metadata(*args, **kwargs, es_client=es_client) + return ff_utils.expand_es_metadata(*args, **kwargs) + + def _get_es_host_local() -> Optional[str]: return os.environ.get("ES_HOST_LOCAL", None) diff --git a/chalicelib_fourfront/checks/helpers/wfr_utils.py b/chalicelib_fourfront/checks/helpers/wfr_utils.py index 59aef2e9..2c7dd9fa 100644 --- a/chalicelib_fourfront/checks/helpers/wfr_utils.py +++ b/chalicelib_fourfront/checks/helpers/wfr_utils.py @@ -10,6 +10,7 @@ from operator import itemgetter from tibanna_4dn.core import API from . import wfrset_utils +from chalicelib_fourfront.checks.helpers.es_utils import expand_es_metadata lambda_limit = wfrset_utils.lambda_limit random_wait = wfrset_utils.random_wait @@ -822,15 +823,17 @@ def check_runs_without_output(res, check, run_name, my_auth, start, **kwargs): def check_hic(res, my_auth, exp_type, check, start, lambda_limit, nore=False, nonorm=False, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" + # es_client = kwargs.get('es_client') for a_set in res: # get all related items - all_items, _ = ff_utils.expand_es_metadata([a_set['uuid']], my_auth, + all_items, _ = expand_es_metadata([a_set['uuid']], my_auth, store_frame='embedded', add_pc_wfr=True, ignore_field=['experiment_relation', 'biosample_relation', 'references', - 'reference_pubs']) + 'reference_pubs'],) + #es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds) @@ -1039,15 +1042,17 @@ def check_hic(res, my_auth, exp_type, check, start, lambda_limit, nore=False, no def check_margi(res, my_auth, exp_type, check, start, lambda_limit, nore=False, nonorm=False, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" + # es_client = kwargs.get('es_client') for a_set in res: # get all related items - all_items, all_uuids = ff_utils.expand_es_metadata([a_set['uuid']], my_auth, - store_frame='embedded', - add_pc_wfr=True, - ignore_field=['experiment_relation', - 'biosample_relation', - 'references', - 'reference_pubs']) + all_items, all_uuids = expand_es_metadata([a_set['uuid']], my_auth, + store_frame='embedded', + add_pc_wfr=True, + ignore_field=['experiment_relation', + 'biosample_relation', + 'references', + 'reference_pubs'],) + # es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds, len(all_uuids)) @@ -1513,15 +1518,17 @@ def start_tasks(missing_runs, patch_meta, action, my_auth, my_env, fs_env, start def check_repli(res, my_auth, exp_type, check, start, lambda_limit, winsize=None, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" + # es_client = kwargs.get('es_client') for a_set in res: # get all related items - all_items, _ = ff_utils.expand_es_metadata([a_set['uuid']], my_auth, - store_frame='embedded', - add_pc_wfr=True, - ignore_field=['experiment_relation', - 'biosample_relation', - 'references', - 'reference_pubs']) + all_items, _ = expand_es_metadata([a_set['uuid']], my_auth, + store_frame='embedded', + add_pc_wfr=True, + ignore_field=['experiment_relation', + 'biosample_relation', + 'references', + 'reference_pubs'],) + # es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds) @@ -1666,15 +1673,17 @@ def check_repli(res, my_auth, exp_type, check, start, lambda_limit, winsize=None def check_rna(res, my_auth, exp_type, check, start, lambda_limit, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" + # es_client = kwargs.get('es_client') for a_set in res: # get all related items - all_items, all_uuids = ff_utils.expand_es_metadata([a_set['uuid']], my_auth, - store_frame='embedded', - add_pc_wfr=True, - ignore_field=['experiment_relation', - 'biosample_relation', - 'references', - 'reference_pubs']) + all_items, _ = expand_es_metadata([a_set['uuid']], my_auth, + store_frame='embedded', + add_pc_wfr=True, + ignore_field=['experiment_relation', + 'biosample_relation', + 'references', + 'reference_pubs'],) + # es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() # print(a_set['accession'], (now-start).seconds) diff --git a/chalicelib_fourfront/checks/wfr_checks.py b/chalicelib_fourfront/checks/wfr_checks.py index ba0a0580..0960cfbf 100644 --- a/chalicelib_fourfront/checks/wfr_checks.py +++ b/chalicelib_fourfront/checks/wfr_checks.py @@ -786,7 +786,7 @@ def in_situ_hic_status(connection, **kwargs): if not res: check.summary = 'All Good!' return check - check = wfr_utils.check_hic(res, my_auth, exp_type, check, start, lambda_limit, kwargs) + check = wfr_utils.check_hic(res, my_auth, exp_type, check, start, lambda_limit, **kwargs) return check diff --git a/chalicelib_fourfront/checks/wfr_encode_checks.py b/chalicelib_fourfront/checks/wfr_encode_checks.py index 3581c9de..1343269c 100644 --- a/chalicelib_fourfront/checks/wfr_encode_checks.py +++ b/chalicelib_fourfront/checks/wfr_encode_checks.py @@ -1,7 +1,7 @@ from datetime import datetime from dcicutils import ff_utils from .helpers import wfr_utils - +from chalicelib_fourfront.checks.helpers.es_utils import expand_es_metadata # Use confchecks to import decorators object and its methods for each check module # rather than importing check_function, action_function, CheckResult, ActionResult # individually - they're now part of class Decorators in foursight-core::decorators @@ -52,13 +52,13 @@ def chipseq_status(connection, **kwargs): for a_set in res: set_acc = a_set['accession'] - all_items, all_uuids = ff_utils.expand_es_metadata([a_set['uuid']], my_auth, - store_frame='embedded', - add_pc_wfr=True, - ignore_field=[ # 'experiment_relation', - 'biosample_relation', - 'references', - 'reference_pubs']) + all_items, all_uuids = expand_es_metadata([a_set['uuid']], my_auth, + store_frame='embedded', + add_pc_wfr=True, + ignore_field=[ # 'experiment_relation', + 'biosample_relation', + 'references', + 'reference_pubs']) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds, len(all_uuids)) if (now-start).seconds > lambda_limit: @@ -568,13 +568,13 @@ def atacseq_status(connection, **kwargs): for a_set in res: set_acc = a_set['accession'] - all_items, all_uuids = ff_utils.expand_es_metadata([a_set['uuid']], my_auth, - store_frame='embedded', - add_pc_wfr=True, - ignore_field=['experiment_relation', - 'biosample_relation', - 'references', - 'reference_pubs']) + all_items, all_uuids = expand_es_metadata([a_set['uuid']], my_auth, + store_frame='embedded', + add_pc_wfr=True, + ignore_field=['experiment_relation', + 'biosample_relation', + 'references', + 'reference_pubs']) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds, len(all_uuids)) if (now-start).seconds > lambda_limit: diff --git a/pyproject.toml b/pyproject.toml index 4531af3b..d2461848 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "foursight" -version = "4.9.29" +version = "4.9.30" description = "Serverless Chalice Application for Monitoring" authors = ["4DN-DCIC Team "] license = "MIT" @@ -30,7 +30,7 @@ tibanna-ff = "^3.5.0" # use below for deployment foursight-core = "^5.7.0" # use below for tests but not for deployment -# foursight-core = { git = "https://github.com/4dn-dcic/foursight-core.git", branch="master" } +# foursight-core = { git = "https://github.com/4dn-dcic/foursight-core.git", branch="ajs_fiz_lce_script_250919" } # Need pytest-redis 3.0.2 or higher for pytest 7.4.2 (or higher). pytest = "^7.4.2" pytest-redis = "^3.0.2" From abe8375c6f01da082645279700c696ad614af51f Mon Sep 17 00:00:00 2001 From: aschroed Date: Thu, 25 Sep 2025 12:35:29 -0400 Subject: [PATCH 2/4] remove commented out lines --- chalicelib_fourfront/checks/helpers/wfr_utils.py | 12 ++---------- chalicelib_fourfront/checks/wfr_checks.py | 2 +- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/chalicelib_fourfront/checks/helpers/wfr_utils.py b/chalicelib_fourfront/checks/helpers/wfr_utils.py index 2c7dd9fa..8e496aae 100644 --- a/chalicelib_fourfront/checks/helpers/wfr_utils.py +++ b/chalicelib_fourfront/checks/helpers/wfr_utils.py @@ -10,7 +10,7 @@ from operator import itemgetter from tibanna_4dn.core import API from . import wfrset_utils -from chalicelib_fourfront.checks.helpers.es_utils import expand_es_metadata + lambda_limit = wfrset_utils.lambda_limit random_wait = wfrset_utils.random_wait @@ -823,7 +823,6 @@ def check_runs_without_output(res, check, run_name, my_auth, start, **kwargs): def check_hic(res, my_auth, exp_type, check, start, lambda_limit, nore=False, nonorm=False, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" - # es_client = kwargs.get('es_client') for a_set in res: # get all related items all_items, _ = expand_es_metadata([a_set['uuid']], my_auth, @@ -833,7 +832,6 @@ def check_hic(res, my_auth, exp_type, check, start, lambda_limit, nore=False, no 'biosample_relation', 'references', 'reference_pubs'],) - #es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds) @@ -1042,7 +1040,6 @@ def check_hic(res, my_auth, exp_type, check, start, lambda_limit, nore=False, no def check_margi(res, my_auth, exp_type, check, start, lambda_limit, nore=False, nonorm=False, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" - # es_client = kwargs.get('es_client') for a_set in res: # get all related items all_items, all_uuids = expand_es_metadata([a_set['uuid']], my_auth, @@ -1052,7 +1049,6 @@ def check_margi(res, my_auth, exp_type, check, start, lambda_limit, nore=False, 'biosample_relation', 'references', 'reference_pubs'],) - # es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds, len(all_uuids)) @@ -1518,7 +1514,6 @@ def start_tasks(missing_runs, patch_meta, action, my_auth, my_env, fs_env, start def check_repli(res, my_auth, exp_type, check, start, lambda_limit, winsize=None, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" - # es_client = kwargs.get('es_client') for a_set in res: # get all related items all_items, _ = expand_es_metadata([a_set['uuid']], my_auth, @@ -1528,7 +1523,6 @@ def check_repli(res, my_auth, exp_type, check, start, lambda_limit, winsize=None 'biosample_relation', 'references', 'reference_pubs'],) - # es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() print(a_set['accession'], (now-start).seconds) @@ -1673,8 +1667,7 @@ def check_repli(res, my_auth, exp_type, check, start, lambda_limit, winsize=None def check_rna(res, my_auth, exp_type, check, start, lambda_limit, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" - # es_client = kwargs.get('es_client') - for a_set in res: + for a_set in res: # get all related items all_items, _ = expand_es_metadata([a_set['uuid']], my_auth, store_frame='embedded', @@ -1683,7 +1676,6 @@ def check_rna(res, my_auth, exp_type, check, start, lambda_limit, **kwargs): 'biosample_relation', 'references', 'reference_pubs'],) - # es_client=es_client) all_wfrs = all_items.get('workflow_run_awsem', []) + all_items.get('workflow_run_sbg', []) now = datetime.utcnow() # print(a_set['accession'], (now-start).seconds) diff --git a/chalicelib_fourfront/checks/wfr_checks.py b/chalicelib_fourfront/checks/wfr_checks.py index 0960cfbf..ba0a0580 100644 --- a/chalicelib_fourfront/checks/wfr_checks.py +++ b/chalicelib_fourfront/checks/wfr_checks.py @@ -786,7 +786,7 @@ def in_situ_hic_status(connection, **kwargs): if not res: check.summary = 'All Good!' return check - check = wfr_utils.check_hic(res, my_auth, exp_type, check, start, lambda_limit, **kwargs) + check = wfr_utils.check_hic(res, my_auth, exp_type, check, start, lambda_limit, kwargs) return check From 74b0724c96492f756bf527e55133951c1c62e8a0 Mon Sep 17 00:00:00 2001 From: aschroed Date: Thu, 25 Sep 2025 12:53:18 -0400 Subject: [PATCH 3/4] use the helper function --- chalicelib_fourfront/checks/helpers/wfr_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/chalicelib_fourfront/checks/helpers/wfr_utils.py b/chalicelib_fourfront/checks/helpers/wfr_utils.py index 8e496aae..59f8aabf 100644 --- a/chalicelib_fourfront/checks/helpers/wfr_utils.py +++ b/chalicelib_fourfront/checks/helpers/wfr_utils.py @@ -10,6 +10,7 @@ from operator import itemgetter from tibanna_4dn.core import API from . import wfrset_utils +from chalicelib_fourfront.checks.helpers.es_utils import expand_es_metadata lambda_limit = wfrset_utils.lambda_limit From 680693baa9229a81ddd57a7239b58e3ae7b790f9 Mon Sep 17 00:00:00 2001 From: aschroed Date: Thu, 25 Sep 2025 12:58:36 -0400 Subject: [PATCH 4/4] fix indent typo --- chalicelib_fourfront/checks/helpers/wfr_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chalicelib_fourfront/checks/helpers/wfr_utils.py b/chalicelib_fourfront/checks/helpers/wfr_utils.py index 59f8aabf..7d670617 100644 --- a/chalicelib_fourfront/checks/helpers/wfr_utils.py +++ b/chalicelib_fourfront/checks/helpers/wfr_utils.py @@ -1668,7 +1668,7 @@ def check_repli(res, my_auth, exp_type, check, start, lambda_limit, winsize=None def check_rna(res, my_auth, exp_type, check, start, lambda_limit, **kwargs): """Check run status for each set in res, and report missing runs and completed process""" - for a_set in res: + for a_set in res: # get all related items all_items, _ = expand_es_metadata([a_set['uuid']], my_auth, store_frame='embedded',