Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion iib/workers/tasks/oras_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from iib.common.tracing import instrument_tracing
from iib.exceptions import IIBError
from iib.workers.tasks.utils import run_cmd, set_registry_auths
from iib.workers.tasks.utils import run_cmd, set_registry_auths, get_image_digest

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -102,3 +102,75 @@ def push_oras_artifact(
log.info('Successfully pushed OCI artifact to %s', artifact_ref)
except Exception as e:
raise IIBError(f'Failed to push OCI artifact to {artifact_ref}: {e}')


def get_image_stream_digest(
tag: str,
):
"""
Retrieve the image digest from the OpenShift ImageStream.

This function queries the `index-db-cache` ImageStream to get the
SHA256 digest for a specific tag.

:param tag: The image tag to check.
:return: The image digest (e.g., "sha256:...").
:rtype: str
"""
jsonpath = f'\'{{.status.tags[?(@.tag=="{tag}")].items[0].image}}\''
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: The jsonpath string uses single quotes which may cause issues with oc CLI.

Consider removing the single quotes from the jsonpath expression unless they are specifically required, as they may cause parsing issues with run_cmd.

Suggested change
jsonpath = f'\'{{.status.tags[?(@.tag=="{tag}")].items[0].image}}\''
jsonpath = f'{{.status.tags[?(@.tag=="{tag}")].items[0].image}}'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth adding a comment here for posterity

return run_cmd(
['oc', 'get', 'imagestream', 'index-db-cache', '-o', f'jsonpath={jsonpath}'],
exc_msg=f'Failed to get digest for ImageStream tag {tag}.',
)


def verify_indexdb_cache_sync(tag: str) -> bool:
"""
Compare the digest of the ImageStream with the digest of the image in repository.

This function verifies if the local ImageStream cache is up to date with
the latest image in the remote registry.

:param tag: The image tag to verify.
:return: True if the digests match (cache is synced), False otherwise.
:rtype: bool
"""
# TODO - This can be loaded from config variable
repository = "quay.io/my-org/index-db"

quay_digest = get_image_digest(f"{repository}:{tag}")
is_digest = get_image_stream_digest(tag)

return quay_digest == is_digest


def refresh_indexdb_cache(
tag: str,
registry_auths: Optional[Dict[str, Any]] = None,
) -> None:
"""
Force a synchronization of the ImageStream with the remote registry.

This function imports the specified image from Quay.io into the `index-db-cache`
ImageStream, ensuring the local cache is up-to-date.

:param tag: The container image tag to refresh.
:param registry_auths: Optional authentication data for the registry.
"""
log.info('Refreshing OCI artifact cache: %s', tag)

# TODO - This can be loaded from config variable
repository = "quay.io/my-org/index-db"

# Use namespace-specific registry authentication if provided
with set_registry_auths(registry_auths, use_empty_config=True):
run_cmd(
[
'oc',
'import-image',
f'index-db-cache:{tag}',
f'--from={repository}:{tag}',
'--confirm',
],
exc_msg=f'Failed to refresh OCI artifact {tag}.',
)
33 changes: 22 additions & 11 deletions iib/workers/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,27 @@ def _get_container_image_name(pull_spec: str) -> str:
return pull_spec.rsplit(':', 1)[0]


def get_image_digest(pull_spec: str) -> str:
"""
Get the digest of the image defined by pull_spec.

:param str pull_spec: the pull specification of the container image
:return: the digest of the image
:rtype: str
"""
skopeo_output = skopeo_inspect(f'docker://{pull_spec}', '--raw', return_json=False)
if json.loads(skopeo_output).get('schemaVersion') == 2:
raw_digest = hashlib.sha256(skopeo_output.encode('utf-8')).hexdigest()
return f'sha256:{raw_digest}'

# Schema 1 is not a stable format. The contents of the manifest may change slightly
# between requests causing a different digest to be computed. Instead, let's leverage
# skopeo's own logic for determining the digest in this case. In the future, we
# may want to use skopeo in all cases, but this will have significant performance
# issues until https://github.com/containers/skopeo/issues/785
return skopeo_inspect(f'docker://{pull_spec}')['Digest']


def get_resolved_image(pull_spec: str) -> str:
"""
Get the pull specification of the container image using its digest.
Expand All @@ -513,17 +534,7 @@ def get_resolved_image(pull_spec: str) -> str:
"""
log.debug('Resolving %s', pull_spec)
name = _get_container_image_name(pull_spec)
skopeo_output = skopeo_inspect(f'docker://{pull_spec}', '--raw', return_json=False)
if json.loads(skopeo_output).get('schemaVersion') == 2:
raw_digest = hashlib.sha256(skopeo_output.encode('utf-8')).hexdigest()
digest = f'sha256:{raw_digest}'
else:
# Schema 1 is not a stable format. The contents of the manifest may change slightly
# between requests causing a different digest to be computed. Instead, let's leverage
# skopeo's own logic for determining the digest in this case. In the future, we
# may want to use skopeo in all cases, but this will have significant performance
# issues until https://github.com/containers/skopeo/issues/785
digest = skopeo_inspect(f'docker://{pull_spec}')['Digest']
digest = get_image_digest(pull_spec)
pull_spec_resolved = f'{name}@{digest}'
log.debug('%s resolved to %s', pull_spec, pull_spec_resolved)
return pull_spec_resolved
Expand Down
92 changes: 92 additions & 0 deletions tests/test_workers/test_tasks/test_oras_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from iib.workers.tasks.oras_utils import (
get_oras_artifact,
push_oras_artifact,
verify_indexdb_cache_sync,
get_image_stream_digest,
refresh_indexdb_cache,
)


Expand Down Expand Up @@ -326,3 +329,92 @@ def test_get_oras_artifact_with_base_dir_wont_leak_credentials(
all_messages = ' '.join(caplog.messages)
assert 'dXNlcjpwYXNz' not in all_messages # base64 encoded credentials
assert 'user:pass' not in all_messages # decoded credentials


@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_get_image_stream_digest(mock_run_cmd):
Comment on lines +334 to +335
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider testing get_image_stream_digest with unexpected output formats.

Please add tests for cases where run_cmd returns values like an empty string, None, or a non-digest value to verify error handling in get_image_stream_digest.

Suggested implementation:

@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_get_image_stream_digest(mock_run_cmd):
    """Test successful retrieval of image digest from ImageStream."""
    mock_run_cmd.return_value = 'sha256:12345'
    tag = 'test-tag'

    digest = get_image_stream_digest(tag)

    assert digest == 'sha256:12345'
    mock_run_cmd.assert_called_once_with(
        [
            'oc',
            'get',
            'imagestream',
        ]
    )

@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_get_image_stream_digest_empty_string(mock_run_cmd):
    """Test get_image_stream_digest with empty string output."""
    mock_run_cmd.return_value = ''
    tag = 'test-tag'
    try:
        digest = get_image_stream_digest(tag)
    except Exception as exc:
        assert isinstance(exc, Exception)
    else:
        assert digest is None or digest == '', "Expected None or empty digest for empty string output"

@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_get_image_stream_digest_none(mock_run_cmd):
    """Test get_image_stream_digest with None output."""
    mock_run_cmd.return_value = None
    tag = 'test-tag'
    try:
        digest = get_image_stream_digest(tag)
    except Exception as exc:
        assert isinstance(exc, Exception)
    else:
        assert digest is None, "Expected None digest for None output"

@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_get_image_stream_digest_invalid_format(mock_run_cmd):
    """Test get_image_stream_digest with non-digest output."""
    mock_run_cmd.return_value = 'not-a-digest'
    tag = 'test-tag'
    try:
        digest = get_image_stream_digest(tag)
    except Exception as exc:
        assert isinstance(exc, Exception)
    else:
        assert digest is None or digest == 'not-a-digest', "Expected None or raw output for invalid digest format"

You may need to adjust the assertions in the new tests depending on how get_image_stream_digest is implemented (e.g., whether it raises exceptions or returns None for invalid output). If it raises a specific exception, replace Exception with the specific exception type.

"""Test successful retrieval of image digest from ImageStream."""
mock_run_cmd.return_value = 'sha256:12345'
tag = 'test-tag'

digest = get_image_stream_digest(tag)

assert digest == 'sha256:12345'
mock_run_cmd.assert_called_once_with(
[
'oc',
'get',
'imagestream',
'index-db-cache',
'-o',
'jsonpath=\'{.status.tags[?(@.tag=="test-tag")].items[0].image}\'',
],
exc_msg='Failed to get digest for ImageStream tag test-tag.',
)


@mock.patch('iib.workers.tasks.oras_utils.run_cmd', side_effect=IIBError('cmd failed'))
def test_get_image_stream_digest_failure(mock_run_cmd):
"""Test failure during retrieval of image digest from ImageStream."""
with pytest.raises(IIBError, match='cmd failed'):
get_image_stream_digest('test-tag')


@mock.patch('iib.workers.tasks.oras_utils.get_image_stream_digest')
@mock.patch('iib.workers.tasks.oras_utils.get_image_digest')
def test_verify_indexdb_cache_sync_match(mock_get_image_digest, mock_get_is_digest):
Comment on lines +363 to +365
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a test for verify_indexdb_cache_sync when get_image_digest or get_image_stream_digest raises an exception.

Please add a test to verify that verify_indexdb_cache_sync properly handles exceptions from get_image_digest or get_image_stream_digest.

"""Test successful verification when digests match."""
mock_get_image_digest.return_value = 'sha256:abc'
mock_get_is_digest.return_value = 'sha256:abc'
tag = 'test-tag'

result = verify_indexdb_cache_sync(tag)

assert result is True
mock_get_image_digest.assert_called_once_with('quay.io/my-org/index-db:test-tag')
mock_get_is_digest.assert_called_once_with(tag)


@mock.patch('iib.workers.tasks.oras_utils.get_image_stream_digest')
@mock.patch('iib.workers.tasks.oras_utils.get_image_digest')
def test_verify_indexdb_cache_sync_no_match(mock_get_image_digest, mock_get_is_digest):
"""Test successful verification when digests don't match."""
mock_get_image_digest.return_value = 'sha256:abc'
mock_get_is_digest.return_value = 'sha256:xyz'
tag = 'test-tag'

result = verify_indexdb_cache_sync(tag)

assert result is False
mock_get_image_digest.assert_called_once_with('quay.io/my-org/index-db:test-tag')
mock_get_is_digest.assert_called_once_with(tag)


@mock.patch('iib.workers.tasks.oras_utils.set_registry_auths')
@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_refresh_indexdb_cache_success(mock_run_cmd, mock_auth, registry_auths):
"""Test successful cache refresh."""
tag = 'test-tag'
Comment on lines +393 to +397
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider testing refresh_indexdb_cache with missing or invalid registry_auths.

Please add tests for cases where registry_auths is None or contains invalid data to verify correct handling by refresh_indexdb_cache.

Suggested change
@mock.patch('iib.workers.tasks.oras_utils.set_registry_auths')
@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_refresh_indexdb_cache_success(mock_run_cmd, mock_auth, registry_auths):
"""Test successful cache refresh."""
tag = 'test-tag'
@mock.patch('iib.workers.tasks.oras_utils.set_registry_auths')
@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_refresh_indexdb_cache_success(mock_run_cmd, mock_auth, registry_auths):
"""Test successful cache refresh."""
tag = 'test-tag'
@mock.patch('iib.workers.tasks.oras_utils.set_registry_auths')
@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_refresh_indexdb_cache_none_registry_auths(mock_run_cmd, mock_auth):
"""Test refresh_indexdb_cache with registry_auths=None."""
tag = 'test-tag'
registry_auths = None
# Assuming refresh_indexdb_cache raises ValueError or handles None gracefully
from iib.workers.tasks.oras_utils import refresh_indexdb_cache
try:
refresh_indexdb_cache(tag, registry_auths)
except Exception as exc:
assert isinstance(exc, (TypeError, ValueError))
else:
# If no exception, ensure set_registry_auths is not called
mock_auth.assert_not_called()
@mock.patch('iib.workers.tasks.oras_utils.set_registry_auths')
@mock.patch('iib.workers.tasks.oras_utils.run_cmd')
def test_refresh_indexdb_cache_invalid_registry_auths(mock_run_cmd, mock_auth):
"""Test refresh_indexdb_cache with invalid registry_auths."""
tag = 'test-tag'
# Example of invalid registry_auths: not a dict, missing keys, etc.
invalid_registry_auths = "not-a-dict"
from iib.workers.tasks.oras_utils import refresh_indexdb_cache
try:
refresh_indexdb_cache(tag, invalid_registry_auths)
except Exception as exc:
assert isinstance(exc, (TypeError, ValueError))
else:
mock_auth.assert_not_called()


refresh_indexdb_cache(tag, registry_auths)

mock_auth.assert_called_once_with(registry_auths, use_empty_config=True)
mock_run_cmd.assert_called_once_with(
[
'oc',
'import-image',
'index-db-cache:test-tag',
'--from=quay.io/my-org/index-db:test-tag',
'--confirm',
],
exc_msg='Failed to refresh OCI artifact test-tag.',
)


@mock.patch('iib.workers.tasks.oras_utils.run_cmd', side_effect=IIBError('refresh failed'))
def test_refresh_indexdb_cache_failure(mock_run_cmd):
"""Test cache refresh failure."""
tag = 'test-tag'

with pytest.raises(IIBError, match='refresh failed'):
refresh_indexdb_cache(tag)
99 changes: 99 additions & 0 deletions tests/test_workers/test_tasks/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import hashlib
import json
import logging
import os
import stat
Expand Down Expand Up @@ -528,6 +530,103 @@ def mock_handler(spam, eggs, request_id, bacon):
assert not logs_dir.listdir()


@mock.patch('iib.workers.tasks.utils.skopeo_inspect')
def test_get_image_digest_schema_2(mock_skopeo_inspect):
"""
Tests the get_image_digest function for an image with schemaVersion 2.

Verifies that the function correctly calculates the SHA256 digest from the raw JSON output.
"""
# Simulate raw JSON output from skopeo inspect for schema v2
raw_output = textwrap.dedent(
"""
{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 5545,
"digest": "sha256:720713e1a4410985aacd7008719efd13d8a32e76d08d34fca202a60ff43e516d"
}
}
"""
).strip()

# Configure the mock object to return the raw output
mock_skopeo_inspect.return_value = raw_output

# Expected digest (SHA256 hash of the raw_output)
expected_digest = f"sha256:{hashlib.sha256(raw_output.encode('utf-8')).hexdigest()}"

# Run the function under test
actual_digest = utils.get_image_digest("some-image:latest")

# Verify that skopeo_inspect was called with the correct parameters
mock_skopeo_inspect.assert_called_once_with(
"docker://some-image:latest", "--raw", return_json=False
)

# Verify the result
assert actual_digest == expected_digest


@mock.patch('iib.workers.tasks.utils.skopeo_inspect')
def test_get_image_digest_schema_1(mock_skopeo_inspect):
"""
Tests the get_image_digest function for an image with schemaVersion 1.

Verifies that the function correctly returns the digest from the parsed JSON output
(the second skopeo_inspect call).
"""
# Simulate two different skopeo_inspect calls with different return values
mock_skopeo_inspect.side_effect = [
# First call (with --raw) to check the schema
'{"schemaVersion": 1, "name": "repository/name"}',
# Second call (without --raw) to get the digest
{
"Name": "registry.example.com/repository/name",
"Digest": "sha256:expected-digest-from-skopeo-output",
},
]

# Expected digest from the second mock call
expected_digest = "sha256:expected-digest-from-skopeo-output"

# Run the function under test
actual_digest = utils.get_image_digest("registry.example.com/repository/name:1.0.0")

# Verify that the skopeo_inspect function was called twice with the correct parameters
mock_skopeo_inspect.assert_has_calls(
[
mock.call(
"docker://registry.example.com/repository/name:1.0.0", "--raw", return_json=False
),
mock.call("docker://registry.example.com/repository/name:1.0.0"),
]
)

# Verify the result
assert actual_digest == expected_digest


@mock.patch('iib.workers.tasks.utils.skopeo_inspect')
def test_get_image_digest_invalid_json(mock_skopeo_inspect):
"""
Tests get_image_digest for invalid JSON output.

Verifies that the function correctly raises an error if the skopeo output is invalid.
"""
# Simulate invalid JSON output
mock_skopeo_inspect.return_value = "This is not valid JSON"

# We expect the function to raise a ValueError (from json.loads)
with pytest.raises(json.JSONDecodeError):
utils.get_image_digest("invalid-image:latest")

# Verify that skopeo_inspect was called
mock_skopeo_inspect.assert_called_once()


@pytest.mark.parametrize(
'pull_spec, expected',
(
Expand Down