Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2763b43
KAB-1108 fix: add validation of storage necessity, update code and te…
mariankrotil Jun 6, 2025
9f694b8
KAB-1108 chore: update vers
mariankrotil Jun 6, 2025
aefa575
KAB-1108 refactor: update error message when storage cannot be empty
mariankrotil Jun 6, 2025
4a2b243
KAB-1108 fix: update checking the empty storage
mariankrotil Jun 6, 2025
e326353
Merge branch 'main' into KAB-1108-storage-necessity
mariankrotil Jun 6, 2025
8239cac
KAB-1108 refactor: simplify the storage validation, compressed in one…
mariankrotil Jun 9, 2025
08648e3
KAB-1108 refactor: simplify the validation wrt discussion
mariankrotil Jun 9, 2025
4ca7016
Merge main into KAB-1108
mariankrotil Jun 9, 2025
da567db
Merge main into KAB-1108
mariankrotil Jun 10, 2025
6740213
KAB-1108 refactor: rename _validate to validation wrt reviews
mariankrotil Jun 10, 2025
2daa7b1
KAB-1108 fix: rename module imports wrt changes
mariankrotil Jun 10, 2025
0948bea
KAB-1108 refactor: update storage emptiness check for sql transformat…
mariankrotil Jun 11, 2025
84fca96
KAB-1108 feat: add only storage validation wrt changes
mariankrotil Jun 11, 2025
d4e4662
KAB-1108 docs: add comment wrt reveiw
mariankrotil Jun 11, 2025
3fcf111
Merge branch 'main' into KAB-1108-storage-necessity
mariankrotil Jun 11, 2025
562fa02
KAB-1108 chore: update vers
mariankrotil Jun 11, 2025
f110d9d
Merge branch 'main' into KAB-1108-storage-necessity
mariankrotil Jun 11, 2025
989040c
KAB-1108 refactor: move validation to validation file only
mariankrotil Jun 11, 2025
31d4c3d
KAB-1108 chore: update verrs
mariankrotil Jun 11, 2025
891965f
KAB-1108 test: move validation tests to one test file
mariankrotil Jun 11, 2025
074f237
Merge branch 'main' into KAB-1108-storage-necessity
mariankrotil Jun 11, 2025
d5fec5e
KAB-1108 chore: release sync tools.md, update uv lock deps, sync deps…
mariankrotil Jun 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integtests/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import pytest

from keboola_mcp_server.client import JsonDict, KeboolaClient
from keboola_mcp_server.tools._validate import KeboolaParametersValidator
from keboola_mcp_server.tools.components.model import Component
from keboola_mcp_server.tools.validation import KeboolaParametersValidator

LOG = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "keboola-mcp-server"
version = "0.33.5"
version = "0.33.6"
description = "MCP server for interacting with Keboola Connection"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
62 changes: 44 additions & 18 deletions src/keboola_mcp_server/tools/components/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,13 @@ async def update_sql_transformation_configuration(
sql_transformation_id = _get_sql_transformation_id_from_sql_dialect(await get_sql_dialect(ctx))
LOG.info(f'SQL transformation ID: {sql_transformation_id}')

validate_storage_configuration(storage=storage, initial_message='The "storage" field is not valid.')
transformation = await _get_component(client=client, component_id=sql_transformation_id)

storage = validate_storage_configuration(
component=transformation,
storage=storage,
initial_message='The "storage" field is not valid.',
)

updated_configuration = {
'parameters': parameters.model_dump(),
Expand All @@ -473,7 +479,6 @@ async def update_sql_transformation_configuration(
is_disabled=is_disabled,
)

transformation = await _get_component(client=client, component_id=sql_transformation_id)
updated_transformation_configuration = ComponentConfigurationResponse.model_validate(
updated_raw_configuration
| {
Expand Down Expand Up @@ -549,11 +554,16 @@ async def create_component_root_configuration(

LOG.info(f'Creating new configuration: {name} for component: {component_id}.')

storage_cfg = validate_storage_configuration(storage=storage, initial_message='The "storage" field is not valid.')
parameters = await validate_root_parameters_configuration(
client=client,
component = await _get_component(client=client, component_id=component_id)

storage_cfg = validate_storage_configuration(
component=component,
storage=storage,
initial_message='The "storage" field is not valid.',
)
parameters = validate_root_parameters_configuration(
component=component,
parameters=parameters,
component_id=component_id,
initial_message='The "parameters" field is not valid.',
)

Expand Down Expand Up @@ -647,11 +657,16 @@ async def create_component_row_configuration(
f'and configuration {configuration_id}.'
)

storage_cfg = validate_storage_configuration(storage=storage, initial_message='The "storage" field is not valid.')
parameters = await validate_row_parameters_configuration(
client=client,
component = await _get_component(client=client, component_id=component_id)

storage_cfg = validate_storage_configuration(
component=component,
storage=storage,
initial_message='The "storage" field is not valid.',
)
parameters = validate_row_parameters_configuration(
component=component,
parameters=parameters,
component_id=component_id,
initial_message='The "parameters" field is not valid.',
)

Expand Down Expand Up @@ -752,11 +767,16 @@ async def update_component_root_configuration(

LOG.info(f'Updating configuration: {name} for component: {component_id} and configuration ID {configuration_id}.')

storage_cfg = validate_storage_configuration(storage=storage, initial_message='The "storage" field is not valid.')
parameters = await validate_root_parameters_configuration(
client=client,
component = await _get_component(client=client, component_id=component_id)

storage_cfg = validate_storage_configuration(
component=component,
storage=storage,
initial_message='The "storage" field is not valid.',
)
parameters = validate_root_parameters_configuration(
component=component,
parameters=parameters,
component_id=component_id,
initial_message='The "parameters" field is not valid.',
)

Expand Down Expand Up @@ -857,11 +877,17 @@ async def update_component_row_configuration(
f'Updating configuration row: {name} for component: {component_id}, configuration id {configuration_id} '
f'and row id {configuration_row_id}.'
)
storage_cfg = validate_storage_configuration(storage=storage, initial_message='The "storage" field is not valid.')
parameters = await validate_row_parameters_configuration(
client=client,

component = await _get_component(client=client, component_id=component_id)

storage_cfg = validate_storage_configuration(
component=component,
storage=storage,
initial_message='The "storage" field is not valid.',
)
parameters = validate_row_parameters_configuration(
component=component,
parameters=parameters,
component_id=component_id,
initial_message='Field "parameters" is not valid.\n',
)

Expand Down
82 changes: 54 additions & 28 deletions src/keboola_mcp_server/tools/components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from pydantic import BaseModel, Field

from keboola_mcp_server.client import JsonDict, KeboolaClient
from keboola_mcp_server.tools._validate import validate_parameters, validate_storage
from keboola_mcp_server.tools.components.model import (
AllComponentTypes,
Component,
Expand All @@ -17,10 +16,18 @@
ComponentWithConfigurations,
ReducedComponent,
)
from keboola_mcp_server.tools.validation import (
validate_parameters_configuration_against_schema,
validate_storage_configuration_against_schema,
)

LOG = logging.getLogger(__name__)


SNOWFLAKE_TRANSFORMATION_ID = 'keboola.snowflake-transformation'
BIGQUERY_TRANSFORMATION_ID = 'keboola.google-bigquery-transformation'


def _handle_component_types(
types: Optional[Union[ComponentType, Sequence[ComponentType]]],
) -> Sequence[ComponentType]:
Expand Down Expand Up @@ -195,9 +202,9 @@ def _get_sql_transformation_id_from_sql_dialect(
:raises ValueError: If the SQL dialect is not supported
"""
if sql_dialect.lower() == 'snowflake':
return 'keboola.snowflake-transformation'
return SNOWFLAKE_TRANSFORMATION_ID
elif sql_dialect.lower() == 'bigquery':
return 'keboola.google-bigquery-transformation'
return BIGQUERY_TRANSFORMATION_ID
else:
raise ValueError(f'Unsupported SQL dialect: {sql_dialect}')

Expand Down Expand Up @@ -325,63 +332,79 @@ def _get_transformation_configuration(

def validate_storage_configuration(
storage: Optional[JsonDict],
component: Component,
initial_message: Optional[str] = None,
) -> JsonDict:
"""
Validates the storage configuration and extracts the storage key contents.
Validates the storage configuration and checks if it is necessary for the component.
:param storage: The storage configuration to validate received from the agent.
:param component: The component for which the storage is provided
:param initial_message: The initial message to include in the error message.
:return: The contents of the 'storage' key from the validated configuration,
or an empty dict if no storage is provided.
"""
if not storage or storage is None or storage.get('storage', {}) is None:
LOG.warning('No storage configuration provided, skipping validation.')
return {}
# As expected by the storage schema, we normalize storage to {'storage': storage | {} | None}
# since the agent bot can input storage as {'storage': storage} or just storage
storage_cfg = cast(Optional[JsonDict], storage.get('storage', storage) if storage else {})

# If storage is None, we set it to an empty dict
if storage_cfg is None:
LOG.warning(
f'No storage configuration provided for component {component.component_id} of type '
f'{component.component_type}.'
)
storage_cfg = {}
# Only for SQL transformations
if component.component_id in [SNOWFLAKE_TRANSFORMATION_ID, BIGQUERY_TRANSFORMATION_ID]:
# For SQL transformations, we want to have atleast one of the input or output config in the storage
if not storage_cfg or ('output' not in storage_cfg and 'input' not in storage_cfg):
raise ValueError(
f'Storage configuration of {component.component_id} SQL transformation cannot be empty and must '
'contain either input or output configuration.'
)

initial_message = (initial_message or '') + '\n'
initial_message += STORAGE_VALIDATION_INITIAL_MESSAGE
normalized_storage = validate_storage(storage, initial_message)
normalized_storage = cast(JsonDict, {'storage': storage_cfg})
normalized_storage = validate_storage_configuration_against_schema(normalized_storage, initial_message)
return cast(JsonDict, normalized_storage['storage'])


async def validate_root_parameters_configuration(
client: KeboolaClient,
def validate_root_parameters_configuration(
parameters: JsonDict,
component_id: str,
component: Component,
initial_message: Optional[str] = None,
) -> JsonDict:
"""
Utility function to validate the root parameters configuration.
:param client: The Keboola client
:param parameters: The parameters of the configuration to validate
:param component_id: The ID of the component for which the configuration is provided
:param component: The component for which the configuration is provided
:param initial_message: The initial message to include in the error message
:return: The contents of the 'parameters' key from the validated configuration
"""
initial_message = (initial_message or '') + '\n'
initial_message += ROOT_PARAMETERS_VALIDATION_INITIAL_MESSAGE.format(component_id=component_id)
component = await _get_component(client=client, component_id=component_id)
return _validate_parameters_configuration(parameters, component.configuration_schema, component_id, initial_message)
initial_message += ROOT_PARAMETERS_VALIDATION_INITIAL_MESSAGE.format(component_id=component.component_id)
return _validate_parameters_configuration(
parameters, component.configuration_schema, component.component_id, initial_message
)


async def validate_row_parameters_configuration(
client: KeboolaClient,
def validate_row_parameters_configuration(
parameters: JsonDict,
component_id: str,
component: Component,
initial_message: Optional[str] = None,
) -> JsonDict:
"""
Utility function to validate the row parameters configuration.
:param client: The Keboola client
:param parameters: The parameters of the configuration to validate
:param component_id: The ID of the component for which the configuration is provided
:param component: The component for which the configuration is provided
:param initial_message: The initial message to include in the error message
:return: The contents of the 'parameters' key from the validated configuration
"""
initial_message = (initial_message or '') + '\n'
initial_message += ROW_PARAMETERS_VALIDATION_INITIAL_MESSAGE.format(component_id=component_id)
component = await _get_component(client=client, component_id=component_id)
initial_message += ROW_PARAMETERS_VALIDATION_INITIAL_MESSAGE.format(component_id=component.component_id)
return _validate_parameters_configuration(
parameters, component.configuration_row_schema, component_id, initial_message
parameters, component.configuration_row_schema, component.component_id, initial_message
)


Expand All @@ -399,10 +422,13 @@ def _validate_parameters_configuration(
:param initial_message: The initial message to include in the error message
:return: The contents of the 'parameters' key from the validated configuration
"""
# As expected by the component parameter schema, we use only the parameters configurations without the "parameters"
# key since the agent bot can input parameters as {'parameters': parameters} or just parameters
expected_parameters = cast(JsonDict, parameters.get('parameters', parameters))

if not schema:
LOG.warning(f'No schema provided for component {component_id}, skipping validation.')
return parameters
return expected_parameters

# we expect the parameters to be a dictionary of parameter configurations without the "parameters" key
normalized_parameters = validate_parameters(parameters, schema, initial_message)
return cast(JsonDict, normalized_parameters['parameters'])
expected_parameters = validate_parameters_configuration_against_schema(expected_parameters, schema, initial_message)
return expected_parameters
2 changes: 1 addition & 1 deletion src/keboola_mcp_server/tools/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
from keboola_mcp_server.client import JsonDict, KeboolaClient
from keboola_mcp_server.errors import tool_errors
from keboola_mcp_server.mcp import with_session_state
from keboola_mcp_server.tools._validate import validate_flow_configuration_against_schema
from keboola_mcp_server.tools.components.model import (
FlowConfiguration,
FlowConfigurationResponse,
FlowPhase,
FlowTask,
ReducedFlow,
)
from keboola_mcp_server.tools.validation import validate_flow_configuration_against_schema

LOG = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

import jsonschema
import jsonschema.validators
from jsonschema import TypeChecker
from jsonschema.validators import extend

from keboola_mcp_server.client import JsonDict, JsonPrimitive, JsonStruct

Expand Down Expand Up @@ -86,6 +84,9 @@ def __str__(self) -> str:

class KeboolaParametersValidator:
"""
We use this validator to load parameters' schema that has been fetched from AI service for a given component ID and
to validate the parameters configuration (json data) received from the Agent against the loaded schema.

A custom JSON Schema validator that handles UI elements and schema normalization:
1. Ignores 'button' type (UI-only construct)
2. Normalizes schema by:
Expand All @@ -96,17 +97,19 @@ class KeboolaParametersValidator:
@classmethod
def validate(cls, instance: JsonDict, schema: JsonDict) -> None:
"""
Validate the instance against the schema.
Validate the json data instance against the schema.
:param instance: The json data to validate
:param schema: The schema to validate against
"""
sanitized_schema = cls.sanitize_schema(schema)
base_validator = jsonschema.validators.validator_for(sanitized_schema)
keboola_validator = extend(
keboola_validator = jsonschema.validators.extend(
base_validator, type_checker=base_validator.TYPE_CHECKER.redefine('button', cls.check_button_type)
)
return keboola_validator(sanitized_schema).validate(instance)

@staticmethod
def check_button_type(checker: TypeChecker, instance: object) -> bool:
def check_button_type(checker: jsonschema.TypeChecker, instance: object) -> bool:
"""
Dummy button type checker.
We accept button as a type since it is a UI construct and not a data type.
Expand Down Expand Up @@ -164,34 +167,40 @@ def _sanitize_required_and_properties(
return sanitized_schema


def validate_storage(storage: JsonDict, initial_message: Optional[str] = None) -> JsonDict:
def validate_storage_configuration_against_schema(storage: JsonDict, initial_message: Optional[str] = None) -> JsonDict:
"""Validate the storage configuration using jsonschema.
:param storage: The storage configuration to validate
:param initial_message: The initial message to include in the error message
:returns: The validated storage configuration normalized to {"storage" : {...}}
:returns: The validated storage configuration (json data as the input) if the validation succeeds
"""
schema = _load_schema(ConfigurationSchemaResources.STORAGE)
# we expect the storage to be a dictionary of storage configurations with the "storage" key
normalized_storage_data = {'storage': storage.get('storage', storage)}
_validate_json_against_schema(
json_data=normalized_storage_data,
json_data=storage,
schema=schema,
initial_message=initial_message,
)
return normalized_storage_data
return storage


def validate_parameters(parameters: JsonDict, schema: JsonDict, initial_message: Optional[str] = None) -> JsonDict:
def validate_parameters_configuration_against_schema(
parameters: JsonDict,
schema: JsonDict,
initial_message: Optional[str] = None,
) -> JsonDict:
"""
Validate the parameters configuration using jsonschema.
:parameters: json data to validate
:schema: json schema to validate against (root or row parameter configuration schema)
:initial_message: initial message to include in the error message
:returns: The validated parameters configuration normalized to {"parameters" : {...}}
:returns: The validated parameters configuration (json data as the input) if the validation succeeds
"""
expected_input = cast(JsonDict, parameters.get('parameters', parameters))
_validate_json_against_schema(expected_input, schema, initial_message, KeboolaParametersValidator.validate)
return {'parameters': expected_input} # normalized to {"parameters" : {...}}
_validate_json_against_schema(
json_data=parameters,
schema=schema,
initial_message=initial_message,
validate_fn=KeboolaParametersValidator.validate,
)
return parameters


def validate_flow_configuration_against_schema(flow: JsonDict, initial_message: Optional[str] = None) -> JsonDict:
Expand Down
Loading