Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions docs/source/invalidation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Quick reference of important files (all under src/snovault/)
* indexing_views.py - contains the function that builds the object model and other information that is actually indexed. This view is in charge of generating invalidation information
* resource_views.py - contains functions for all commonly-used views, including those used in indexing_views
* embed.py - key functions used in indexing_views.py and resource_views.py
* utils.py - contains functions used in the selective embedding process (used for @@embedded view)
* utils.py - contains functions used in the selective embedding process (used for ``@@embedded view``)
* resources.py - contains class definitions for the basic items in Snovault and controls adding linked and rev_linked items
* elasticsearch/indexer.py - coordinates the whole indexing/invalidation process =
* elasticsearch/indexer_utils.py - holds function used to run invalidation
Expand All @@ -16,7 +16,7 @@ Quick reference of important files (all under src/snovault/)
Keeping elasticsearch in sync
-----------------------------

The /_indexer wsgi app (es_index_listener.py) drives the incremental indexing process. Previously in the ENCODE setup, the listener was driven by database transactions. We have moved this to a queue-based setup that does not operate on DB snapshot. At a fixed, short time interval (at time of writing: 3 seconds), the index listener calls the /index view (indexer.py) which works out what needs to be reindexed. The actual reindexing happens in parallel in multiprocessing subprocesses (mpindexer.py.)
The ``/_indexer`` wsgi app (``es_index_listener.py``) drives the incremental indexing process. Previously in the ENCODE setup, the listener was driven by database transactions. We have moved this to a queue-based setup that does not operate on DB snapshot. At a fixed, short time interval (at time of writing: 3 seconds), the index listener calls the ``/index`` view (``indexer.py``) which works out what needs to be reindexed. The actual reindexing happens in parallel in multiprocessing subprocesses (``mpindexer.py``.)

Keeping indexing on track requires a couple components:
1. keeping track of what needs to be invalidated
Expand All @@ -25,40 +25,41 @@ Keeping indexing on track requires a couple components:
Keeping track of what needs to be invalidated
---------------------------------------------

When rendering the view of an item to be indexed (@@index-data, see src/snovault/indexing_views.py), we record the set of uuids traversed when building the item. This is the _linked_uuids, which is stored as an reified attribute on the request used to build the item. These are the main source of information of what needs to be invalidated when items are updated. Whenever an item is changed, a search is performed to find all items that contain the changed item in their linked_uuids; these items are also reindexed. The function responsible for this is `find_uuids_for_indexing` in src/snovault/elasticsearch/indexer_utils.py.
When rendering the view of an item to be indexed (``@@index-data``, see ``src/snovault/indexing_views.py``), we record the set of uuids traversed when building the item. This is the ``_linked_uuids``, which is stored as an reified attribute on the request used to build the item. These are the main source of information of what needs to be invalidated when items are updated. Whenever an item is changed, a search is performed to find all items that contain the changed item in their ``linked_uuids``; these items are also reindexed. The function responsible for this is ``find_uuids_for_indexing`` in ``src/snovault/elasticsearch/indexer_utils.py``.

Items are added to the set of request._linked_uuids in the `item_with_links` function in src/snovault/resources.py. This is the function used to control the _linked_uuids because it is closely tied with the @@object view of an item (defined in resource_views.py). The embedding process traverses the `embedded_list` of an object and uses the @@object view to build the total embedded object by iteratively visiting all its component objects. See the `embedding-and-indexing.rst` document for more information.
Items are added to the set of ``request._linked_uuids`` in the ``item_with_links`` function in ``src/snovault/resources.py``. This is the function used to control the ``_linked_uuids`` because it is closely tied with the ``@@object`` view of an item (defined in ``resource_views.py``). The embedding process traverses the ``embedded_list`` of an object and uses the ``@@object`` view to build the total embedded object by iteratively visiting all its component objects. See the ``embedding-and-indexing.rst`` document for more information.

Reverse links (rev_links) must also be kept track of in the invalidation process. In our system, we represent rev_links as linkTos; the ENCODE concept of a linkFrom has been removed. rev_links are added to a request much the same as items are added to _linked_uuids. See the get_rev_links function in src/snovault/resources.py. This function keeps track of information of where the rev_link originates from and what item it targets, which is important information because many rev links could be visited in the process of building an embedded item.
Reverse links (``rev_links``) must also be kept track of in the invalidation process. In our system, we represent ``rev_links`` as ``linkTos``; the ENCODE concept of a ``linkFrom`` has been removed. ``rev_links`` are added to a request much the same as items are added to ``_linked_uuids``. See the ``get_rev_links`` function in ``src/snovault/resources.py``. This function keeps track of information of where the ``rev_link`` originates from and what item it targets, which is important information because many rev links could be visited in the process of building an embedded item.

Both _linked_uuids and rev_links are only kept track of if we are indexing. This is done by setting request._indexing_view to True in indexing_views.py. The information about the linked uuids and uuids that reverse link to an item are stored in the Elasticsearch document for the item in the `linked_uuids` and `uuids_that_rev_link_to_me` fields, respectively.
Both ``_linked_uuids`` and ``rev_links`` are only kept track of if we are indexing. This is done by setting ``request._indexing_view`` to ``True`` in ``indexing_views.py``. The information about the linked uuids and uuids that reverse link to an item are stored in the Elasticsearch document for the item in the ``linked_uuids`` and ``uuids_that_rev_link_to_me`` fields, respectively.


Finding items to invalidate
---------------------------

This has already been somewhat covered, but it's worth reiterating. Whenever an item is indexed, the `find_uuids_for_indexing` function is run to find all items in Elasticsearch that contain the indexed item in their linked_uuids. In addition to this, any items added from the `uuids_rev_linked_to_me` list generated from the @@index-data view are also invalidated, since new reverse links may have been created and those items need to be updated as well. All of these items are added to the secondary queue after a primary item has been indexed.
This has already been somewhat covered, but it's worth reiterating. Whenever an item is indexed, the ``find_uuids_for_indexing`` function is run to find all items in Elasticsearch that contain the indexed item in their linked_uuids. In addition to this, any items added from the ``uuids_rev_linked_to_me`` list generated from the ``@@index-data`` view are also invalidated, since new reverse links may have been created and those items need to be updated as well. All of these items are added to the secondary queue after a primary item has been indexed.


Total Reindexing
----------------

Cases can arise where a total reindexing needs to be triggered. This should be done by using `bin/create-mapping`, which executes code in create_mapping.py. The point of this code is primarily to build the mappings needed to make the indices in Elasticsearch. Secondarily, create-mapping also takes care of queueing objects for indexing. Check out the code in that file for more information. A total re-creation of Elasticsearch indices followed by reindexing can be triggered using:
Cases can arise where a total reindexing needs to be triggered. This should be done by using ``bin/create-mapping``, which executes code in create_mapping.py. The point of this code is primarily to build the mappings needed to make the indices in Elasticsearch. Secondarily, create-mapping also takes care of queueing objects for indexing. Check out the code in that file for more information. A total re-creation of Elasticsearch indices followed by reindexing can be triggered using::

`bin/create-mapping production.ini --app-name app`
NOTE: use `development.ini` locally
bin/create-mapping production.ini --app-name app

NOTE: For local debugging, such as when you're running ``make deploy1`` or ``make deploy2``, use ``development.ini`` instead.


Purging items
-------------

There is another spot `find_uuids_for_indexing` is used, and that is to find all linked items when attempting to "purge" an item (fully remove from postgresql and Elasticsearch). Before removing an item, it is crucial to ensure that all links to that item have been removed, which is why this function is used.
There is another spot ``find_uuids_for_indexing`` is used, and that is to find all linked items when attempting to "purge" an item (fully remove from postgresql and Elasticsearch). Before removing an item, it is crucial to ensure that all links to that item have been removed, which is why this function is used.


Invalidation Scope
-------------------------

Previously, `find_uuids_for_indexing` would take the uuids from _linked_uuids as is. Now, if given a diff (passed from SQS on edit) the uuids returned will be pruned to determine whether or not they actually need to be invalidated. The indexer does this by examining the diff received from SQS and the embedded list of all invalidated item types. If it detects the diff modified something that is embedded in the invalidated item type, all uuids of this type are invalidated. If not, those uuids are not queued for reindexing since the edit does not change the embedded view of the item. The followind diagram serves as a visual aid.
Previously, ``find_uuids_for_indexing`` would take the uuids from ``_linked_uuids`` as is. Now, if given a diff (passed from SQS on edit) the uuids returned will be pruned to determine whether or not they actually need to be invalidated. The indexer does this by examining the diff received from SQS and the embedded list of all invalidated item types. If it detects the diff modified something that is embedded in the invalidated item type, all uuids of this type are invalidated. If not, those uuids are not queued for reindexing since the edit does not change the embedded view of the item. The following diagram serves as a visual aid.

Note that the above behavior is ONLY activate upon receiving a diff, which is computed only on item edits. Upon item creation/deletion the process remains the same, since there is no diff. It is also very important to note that any additional fields used in calculated properties are embedded as well. If not, then a field could be modified that would affect an embedded field but such edit would be invisible because we did not know the field was used.

Expand Down
243 changes: 122 additions & 121 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dcicsnovault"
version = "8.0.1.3b7" # to become 8.1.0
version = "8.0.1.5b17" # to become 8.1.0
description = "Storage support for 4DN Data Portals."
authors = ["4DN-DCIC Team <[email protected]>"]
license = "MIT"
Expand Down Expand Up @@ -41,7 +41,7 @@ boto3 = "^1.26.124" # no particular version required, but this speeds up search
botocore = "^1.19.124" # no particular version required, but this speeds up search
elasticsearch = "7.13.4" # versions >= 7.14.0 lock out AWS ES
elasticsearch_dsl = "^7.4.0"
dcicutils = "^7.0.0"
dcicutils = "^7.4.4.5b25" # "^7.0.0"
future = ">=0.15.2,<1"
html5lib = ">=1.1" # experimental, should be OK now that we're not using moto server
humanfriendly = "^1.44.9"
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ markers =
integrated: an integration test
integratedx: an excludable integration test, redundantly testing functionality also covered by a unit test
performance: mark a test as a performance test (deselect with '-m "not performance"')
setone: (deprecated) instead of '-m "setone"', please use '-m "not indexing"', TODO: refs and scripts to be rewritten
slow: mark a test as slow (deselect with '-m "not slow"')
static: mark as a test that is testing the static form of code, not its runtime functionality
storage: mark a test as about storage (deselect with '-m "not storage"')
Expand Down
11 changes: 9 additions & 2 deletions snovault/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
import psycopg2
import psycopg2.extensions
import subprocess
import zope.sqlalchemy

Expand All @@ -13,7 +14,7 @@
from pyramid.session import SignedCookieSessionFactory
from pyramid.settings import asbool
from pyramid_localroles import LocalRolesAuthorizationPolicy
from sqlalchemy import engine_from_config, event, orm
from sqlalchemy import engine_from_config, event, orm # , text as psql_text
from webob.cookies import JSONSerializer

from .interfaces import DBSESSION
Expand Down Expand Up @@ -82,7 +83,13 @@ def connect(dbapi_connection, connection_record):
timeout_ms = int(timeout_ms)
cursor = dbapi_connection.cursor()
try:
cursor.execute("SET statement_timeout TO %d" % timeout_ms)
# cursor: psycopg2.extensions.cursor
# This call to psycopg2.extensions.cursor.execute expects a real string. Giving it an sqlalchemy.text
# object will fail because something will try to do a boolean test, probably "if thing_to_execute:..."
# and __bool__ is not defined on sqlalchemy.txt
# Bottom line: Cannot wrap this string with psql_text(...) like we do elsewhere. It's not ready.
# Might be we could do such a wrapper if we called execute on some other object.
cursor.execute("SET statement_timeout = %d;" % timeout_ms)
except psycopg2.Error:
dbapi_connection.rollback()
finally:
Expand Down
5 changes: 3 additions & 2 deletions snovault/commands/clear_db_es_contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dcicutils.lang_utils import disjoined_list
from pyramid.paster import get_app
from snovault import DBSESSION
from sqlalchemy import text as psql_text
from ..storage import Base
from ..elasticsearch.create_mapping import run as run_create_mapping
from sqlalchemy import MetaData
Expand Down Expand Up @@ -40,8 +41,8 @@ def clear_db_tables(app):
try:
# truncate tables by only deleting contents (sqlalchemy 1.4+ compliant)
table_names = ','.join(table.name for table in reversed(Base.metadata.sorted_tables))
connection.execute('SET statement_timeout = 300000;') # give 5 mins for DB clear
connection.execute(f'TRUNCATE {table_names} RESTART IDENTITY;')
connection.execute(psql_text('SET statement_timeout = 300000;')) # give 5 mins for DB clear
connection.execute(psql_text(f'TRUNCATE {table_names} RESTART IDENTITY;'))
except Exception as e:
log.error(f"clear_db_es_contents: Error on DB drop_all/create_all. {type(e)}: {e}")
transaction.abort()
Expand Down
4 changes: 2 additions & 2 deletions snovault/commands/list_db_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from typing import Optional, List
# from zope.sqlalchemy import mark_changed
from .. import configure_dbsession
from ..project_defs import app_project
from ..sqlalchemy_tools import PyramidAppManager
from ..project import app_project


logger = structlog.getLogger(__name__)
Expand Down Expand Up @@ -115,7 +115,7 @@ def main(simulated_args=None):
log = args.log

logging.basicConfig()
project = app_project(initialize=True)
project = app_project()
# Loading app will have configured from config file. Reconfigure here:
if log:
logging.getLogger(project.NAME).setLevel(logging.DEBUG)
Expand Down
6 changes: 3 additions & 3 deletions snovault/dev_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pyramid.paster import get_app, get_appsettings
from pyramid.path import DottedNameResolver
from .elasticsearch import create_mapping
from .project import app_project, project_filename
from .project_defs import app_project
from .tests import elasticsearch_fixture, postgresql_fixture


Expand All @@ -31,7 +31,7 @@
def nginx_server_process(prefix='', echo=False):
args = [
os.path.join(prefix, 'nginx'),
'-c', project_filename('nginx-dev.conf'),
'-c', app_project().project_filename('nginx-dev.conf'),
'-g', 'daemon off;'
]
process = subprocess.Popen(
Expand Down Expand Up @@ -114,7 +114,7 @@ def main():

def run(app_name, config_uri, datadir, clear=False, init=False, load=False, ingest=True):

project = app_project(initialize=True)
project = app_project()

logging.basicConfig(format='')
# Loading app will have configured from config file. Reconfigure here:
Expand Down
14 changes: 7 additions & 7 deletions snovault/loadxl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from pyramid.view import view_config
from snovault.util import debug_log

from .project import project_filename
from .project_defs import app_project
from .server_defaults import add_last_modified


Expand Down Expand Up @@ -117,7 +117,7 @@ def load_data_view(context, request):
inserts = None
from_json = False
if fdn_dir:
inserts = project_filename('tests/data/' + fdn_dir + '/')
inserts = app_project().project_filename('tests/data/' + fdn_dir + '/')
elif local_path:
inserts = local_path
elif store:
Expand Down Expand Up @@ -517,7 +517,7 @@ def load_data(app, indir='inserts', docsdir=None, overwrite=False,
testapp = webtest.TestApp(app, environ)
# load master-inserts by default
if indir != 'master-inserts' and use_master_inserts:
master_inserts = project_filename('tests/data/master-inserts/')
master_inserts = app_project().project_filename('tests/data/master-inserts/')
master_res = load_all(testapp, master_inserts, [], skip_types=skip_types)
if master_res: # None if successful
print(LOAD_ERROR_MESSAGE)
Expand All @@ -526,13 +526,13 @@ def load_data(app, indir='inserts', docsdir=None, overwrite=False,

if not indir.endswith('/'):
indir += '/'
inserts = project_filename('tests/data/' + indir)
inserts = app_project().project_filename('tests/data/' + indir)
if docsdir is None:
docsdir = []
else:
if not docsdir.endswith('/'):
docsdir += '/'
docsdir = [project_filename('tests/data/' + docsdir)]
docsdir = [app_project().project_filename('tests/data/' + docsdir)]
res = load_all(testapp, inserts, docsdir, overwrite=overwrite)
if res: # None if successful
print(LOAD_ERROR_MESSAGE)
Expand Down Expand Up @@ -568,7 +568,7 @@ def load_local_data(app, overwrite=False):
]

for test_insert_dir in test_insert_dirs:
chk_dir = project_filename("tests/data/" + test_insert_dir)
chk_dir = app_project().project_filename("tests/data/" + test_insert_dir)
for (dirpath, dirnames, filenames) in os.walk(chk_dir):
if any([fn for fn in filenames if fn.endswith('.json') or fn.endswith('.json.gz')]):
logger.info('Loading inserts from "{}" directory.'.format(test_insert_dir))
Expand Down Expand Up @@ -711,7 +711,7 @@ def load_data_by_type(app, indir='master-inserts', overwrite=True, itype=None):

if not indir.endswith('/'):
indir += '/'
inserts = project_filename('tests/data/' + indir)
inserts = app_project().project_filename('tests/data/' + indir)

res = load_all(testapp, inserts, docsdir=[], overwrite=overwrite, itype=itype)
if res: # None if successful
Expand Down
Loading