Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "keboola-mcp-server"
version = "1.14.0"
version = "1.15.0"
description = "MCP server for interacting with Keboola Connection"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
24 changes: 24 additions & 0 deletions src/keboola_mcp_server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,30 @@ async def is_enabled(self, features: ProjectFeature | Iterable[ProjectFeature])
project_features = cast(list[str], project_data.get('features', []))
return all(feature in project_features for feature in features)

async def token_create(
self,
description: str,
component_access: list[str] | None = None,
expires_in: int | None = None,
) -> JsonDict:
"""
Creates a new Storage API token.

:param description: Description of the token
:param component_access: List of component IDs the token should have access to
:param expires_in: Token expiration time in seconds
:return: Token creation response containing the token and its details
"""
token_data: dict[str, Any] = {'description': description}

if component_access:
token_data['componentAccess'] = component_access

if expires_in:
token_data['expiresIn'] = expires_in

return cast(JsonDict, await self.post(endpoint='tokens', data=token_data))


class JobsQueueClient(KeboolaServiceClient):
"""
Expand Down
2 changes: 2 additions & 0 deletions src/keboola_mcp_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from keboola_mcp_server.tools.doc import add_doc_tools
from keboola_mcp_server.tools.flow.tools import add_flow_tools
from keboola_mcp_server.tools.jobs import add_job_tools
from keboola_mcp_server.tools.oauth import add_oauth_tools
from keboola_mcp_server.tools.project import add_project_tools
from keboola_mcp_server.tools.search import add_search_tools
from keboola_mcp_server.tools.sql import add_sql_tools
Expand Down Expand Up @@ -163,6 +164,7 @@ async def oauth_callback_handler(request: Request) -> Response:
add_doc_tools(mcp)
add_flow_tools(mcp)
add_job_tools(mcp)
add_oauth_tools(mcp)
add_project_tools(mcp)
add_search_tools(mcp)
add_sql_tools(mcp)
Expand Down
64 changes: 64 additions & 0 deletions src/keboola_mcp_server/tools/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""OAuth URL generation tools for the MCP server."""

import logging
from typing import Annotated

from fastmcp import Context
from fastmcp.tools import FunctionTool
from pydantic import Field

from keboola_mcp_server.client import KeboolaClient
from keboola_mcp_server.errors import tool_errors
from keboola_mcp_server.mcp import KeboolaMcpServer, with_session_state

LOG = logging.getLogger(__name__)

TOOL_GROUP_NAME = 'OAUTH'


def add_oauth_tools(mcp: KeboolaMcpServer) -> None:
"""Adds OAuth tools to the MCP server."""
mcp.add_tool(FunctionTool.from_function(create_oauth_url))
LOG.info('OAuth tools added to the MCP server.')


@tool_errors()
@with_session_state()
async def create_oauth_url(
component_id: Annotated[
str, Field(description='The component ID to grant access to (e.g., "keboola.ex-google-analytics-v4").')
],
config_id: Annotated[str, Field(description='The configuration ID for the component.')],
ctx: Context,
) -> str:
"""
Generates an OAuth authorization URL for a Keboola component configuration.

When using this tool, be very concise in your response. Just guide the user to click the
authorization link.

Note that this tool should be called specifically for the OAuth-requiring components after their
configuration is created e.g. keboola.ex-google-analytics-v4 and keboola.ex-gmail.
"""
client = KeboolaClient.from_state(ctx.session.state)

# Create the token using the storage client
token_response = await client.storage_client.token_create(
description=f'Short-lived token for OAuth URL - {component_id}/{config_id}',
component_access=[component_id],
expires_in=3600, # 1 hour expiration
)

# Extract the token from response
sapi_token = token_response['token']

# Get the storage API URL from client
storage_api_url = client.storage_client.base_api_url

# Generate OAuth URL
oauth_url = (
f'https://external.keboola.com/oauth/index.html?token={sapi_token}'
f'&sapiUrl={storage_api_url}#/{component_id}/{config_id}'
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the query part parameters should be properly encoded by using urllib.parse.urlencode function and the whole URL should be constructed using urllib.parse.urlunsplit function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into it and fix in follow-up PR. Thanks.


return oauth_url
79 changes: 79 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,82 @@ async def test_trigger_event(
if value
}
)

@pytest.mark.asyncio
@pytest.mark.parametrize(
('description', 'component_access', 'expires_in', 'expected_data'),
[
# Basic token creation with just description
('Test token', None, None, {'description': 'Test token'}),
# Token with component access
(
'OAuth token',
['keboola.ex-google-analytics-v4'],
None,
{'description': 'OAuth token', 'componentAccess': ['keboola.ex-google-analytics-v4']}
),
# Token with expiration
(
'Short-lived token',
None,
3600,
{'description': 'Short-lived token', 'expiresIn': 3600}
),
# Token with all parameters
(
'Full token',
['keboola.ex-gmail', 'keboola.ex-google-analytics-v4'],
7200,
{
'description': 'Full token',
'componentAccess': ['keboola.ex-gmail', 'keboola.ex-google-analytics-v4'],
'expiresIn': 7200
}
),
]
)
async def test_token_create(
self,
description: str,
component_access: list[str] | None,
expires_in: int | None,
expected_data: dict[str, Any],
keboola_client: KeboolaClient
):
"""Test token creation with various parameter combinations."""
with patch('httpx.AsyncClient') as mock_client_class:
mock_client_class.return_value.__aenter__.return_value = (mock_client := AsyncMock())
mock_client.post.return_value = (response := Mock(spec=httpx.Response))
response.status_code = 201
response.json.return_value = {
'id': '12345',
'token': 'KBC_TOKEN_TEST_12345',
'description': description,
'created': '2023-01-01T00:00:00+00:00',
'expiresIn': expires_in,
'componentAccess': component_access or []
}

result = await keboola_client.storage_client.token_create(
description=description,
component_access=component_access,
expires_in=expires_in
)

# Verify the response
assert result['token'] == 'KBC_TOKEN_TEST_12345'
assert result['description'] == description

# Verify the API call was made with correct parameters
version = importlib.metadata.version('keboola-mcp-server')
mock_client.post.assert_called_once_with(
'https://connection.nowhere/v2/storage/tokens',
params=None,
headers={
'Content-Type': 'application/json',
'Accept-encoding': 'gzip',
'X-StorageAPI-Token': 'test-token',
'User-Agent': f'Keboola MCP Server/{version} app_env=local'
},
json=expected_data
)
1 change: 1 addition & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async def test_list_tools(self):
'add_config_row',
'create_config',
'create_flow',
'create_oauth_url',
'create_sql_transformation',
'docs_query',
'find_component_id',
Expand Down
121 changes: 121 additions & 0 deletions tests/tools/test_oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Tests for OAuth URL generation tools."""

from typing import Any, Mapping

import pytest
from mcp.server.fastmcp import Context

from keboola_mcp_server.client import KeboolaClient
from keboola_mcp_server.tools.oauth import create_oauth_url


@pytest.fixture
def mock_token_response() -> Mapping[str, Any]:
"""Mock valid response from the token creation endpoint."""
return {
'token': 'KBC_TOKEN_12345',
'description': 'Short-lived token for OAuth URL - keboola.ex-google-analytics-v4/config-123',
'expiresIn': 3600,
}


@pytest.mark.asyncio
async def test_create_oauth_url_success(
mcp_context_client: Context, mock_token_response: Mapping[str, Any]
) -> None:
"""Test successful OAuth URL creation."""
# Mock the storage client's token_create method to return the token response
keboola_client = KeboolaClient.from_state(mcp_context_client.session.state)
keboola_client.storage_client.token_create.return_value = mock_token_response
keboola_client.storage_client.base_api_url = 'https://connection.test.keboola.com'

component_id = 'keboola.ex-google-analytics-v4'
config_id = 'config-123'

result = await create_oauth_url(component_id=component_id, config_id=config_id, ctx=mcp_context_client)

# Verify the storage client was called with correct parameters
keboola_client.storage_client.token_create.assert_called_once_with(
description=f'Short-lived token for OAuth URL - {component_id}/{config_id}',
component_access=[component_id],
expires_in=3600,
)

# Verify the response is the URL string
assert isinstance(result, str)

expected_url = (
f'https://external.keboola.com/oauth/index.html'
f'?token=KBC_TOKEN_12345'
f'&sapiUrl=https://connection.test.keboola.com'
f'#/{component_id}/{config_id}'
)
assert result == expected_url


@pytest.mark.asyncio
@pytest.mark.parametrize(
('component_id', 'config_id'),
[
('keboola.ex-google-analytics-v4', 'my-config-123'),
('keboola.ex-gmail', 'gmail-config-456'),
('other.component', 'test-config'),
],
)
async def test_create_oauth_url_different_components(
mcp_context_client: Context,
mock_token_response: Mapping[str, Any],
component_id: str,
config_id: str,
) -> None:
"""Test OAuth URL creation for different components."""
# Mock the storage client
keboola_client = KeboolaClient.from_state(mcp_context_client.session.state)
keboola_client.storage_client.token_create.return_value = mock_token_response
keboola_client.storage_client.base_api_url = 'https://connection.test.keboola.com'

result = await create_oauth_url(component_id=component_id, config_id=config_id, ctx=mcp_context_client)

# Verify component-specific parameters were used
assert isinstance(result, str)
assert f'#/{component_id}/{config_id}' in result

# Verify the API call included the correct component access
call_args = keboola_client.storage_client.token_create.call_args
assert call_args[1]['component_access'] == [component_id]
assert component_id in call_args[1]['description']
assert config_id in call_args[1]['description']


@pytest.mark.asyncio
async def test_create_oauth_url_token_creation_failure(
mcp_context_client: Context,
) -> None:
"""Test OAuth URL creation when token creation fails."""
# Mock the storage client to raise an exception
keboola_client = KeboolaClient.from_state(mcp_context_client.session.state)
keboola_client.storage_client.token_create.side_effect = Exception(
'Token creation failed'
)

with pytest.raises(Exception, match='Token creation failed'):
await create_oauth_url(
component_id='keboola.ex-google-analytics-v4', config_id='config-123', ctx=mcp_context_client
)


@pytest.mark.asyncio
async def test_create_oauth_url_missing_token_in_response(mcp_context_client: Context) -> None:
"""Test OAuth URL creation when token is missing from response."""
# Mock response without token field
invalid_response = {
'description': 'Short-lived token for OAuth URL',
'expiresIn': 3600,
}
keboola_client = KeboolaClient.from_state(mcp_context_client.session.state)
keboola_client.storage_client.token_create.return_value = invalid_response

with pytest.raises(KeyError):
await create_oauth_url(
component_id='keboola.ex-google-analytics-v4', config_id='config-123', ctx=mcp_context_client
)
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.