Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions cookbook/tools/custom_tool_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""This example demonstrate how to yield custom events from a custom tool."""

import asyncio
from dataclasses import dataclass
from typing import Optional

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.run.agent import CustomEvent
from agno.tools import tool


# Our custom event, extending the CustomEvent class
@dataclass
class CustomerProfileEvent(CustomEvent):
"""CustomEvent for customer profile."""

customer_name: Optional[str] = None
customer_email: Optional[str] = None
customer_phone: Optional[str] = None


# Our custom tool
@tool()
async def get_customer_profile():
"""Example custom tool that simply yields a custom event."""

yield CustomerProfileEvent(
customer_name="John Doe",
customer_email="[email protected]",
customer_phone="1234567890",
)


# Setup an Agent with our custom tool.
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[get_customer_profile],
instructions="Your task is to retrieve customer profiles for the user.",
)


async def run_agent():
# Running the Agent: it should call our custom tool and yield the custom event
async for event in agent.arun(
"Hello, can you get me the customer profile for customer with ID 123?",
stream=True,
):
if isinstance(event, CustomEvent):
print(f"✅ Custom event emitted: {event}")


asyncio.run(run_agent())
10 changes: 9 additions & 1 deletion libs/agno/agno/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from agno.models.message import Citations, Message
from agno.models.metrics import Metrics
from agno.models.response import ModelResponse, ModelResponseEvent, ToolExecution
from agno.run.agent import RunContentEvent, RunOutput, RunOutputEvent
from agno.run.agent import CustomEvent, RunContentEvent, RunOutput, RunOutputEvent
from agno.run.team import RunContentEvent as TeamRunContentEvent
from agno.run.team import TeamRunOutputEvent
from agno.tools.function import Function, FunctionCall, FunctionExecutionResult, UserInputField
Expand Down Expand Up @@ -1223,6 +1223,9 @@ def run_function_call(
if function_call.function.show_result:
yield ModelResponse(content=item.content)

if isinstance(item, CustomEvent):
function_call_output += str(item)

# Yield the event itself to bubble it up
yield item

Expand Down Expand Up @@ -1623,8 +1626,13 @@ async def arun_function_calls(
yield ModelResponse(content=item.content)
continue

if isinstance(item, CustomEvent):
function_call_output += str(item)

# Yield the event itself to bubble it up
yield item

# Yield custom events emitted by the tool
else:
function_call_output += str(item)
if function_call.function.show_result:
Expand Down
9 changes: 9 additions & 0 deletions libs/agno/agno/run/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class RunEvent(str, Enum):
output_model_response_started = "OutputModelResponseStarted"
output_model_response_completed = "OutputModelResponseCompleted"

custom_event = "CustomEvent"


@dataclass
class BaseAgentRunEvent(BaseRunOutputEvent):
Expand Down Expand Up @@ -226,6 +228,11 @@ class OutputModelResponseCompletedEvent(BaseAgentRunEvent):
event: str = RunEvent.output_model_response_completed.value


@dataclass
class CustomEvent(BaseAgentRunEvent):
event: str = RunEvent.custom_event.value


RunOutputEvent = Union[
RunStartedEvent,
RunContentEvent,
Expand All @@ -246,6 +253,7 @@ class OutputModelResponseCompletedEvent(BaseAgentRunEvent):
ParserModelResponseCompletedEvent,
OutputModelResponseStartedEvent,
OutputModelResponseCompletedEvent,
CustomEvent,
]


Expand All @@ -270,6 +278,7 @@ class OutputModelResponseCompletedEvent(BaseAgentRunEvent):
RunEvent.parser_model_response_completed.value: ParserModelResponseCompletedEvent,
RunEvent.output_model_response_started.value: OutputModelResponseStartedEvent,
RunEvent.output_model_response_completed.value: OutputModelResponseCompletedEvent,
RunEvent.custom_event.value: CustomEvent,
}


Expand Down
9 changes: 9 additions & 0 deletions libs/agno/agno/run/team.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class TeamRunEvent(str, Enum):
output_model_response_started = "TeamOutputModelResponseStarted"
output_model_response_completed = "TeamOutputModelResponseCompleted"

custom_event = "CustomEvent"


@dataclass
class BaseTeamRunEvent(BaseRunOutputEvent):
Expand Down Expand Up @@ -213,6 +215,11 @@ class OutputModelResponseCompletedEvent(BaseTeamRunEvent):
event: str = TeamRunEvent.output_model_response_completed.value


@dataclass
class CustomEvent(BaseTeamRunEvent):
event: str = TeamRunEvent.custom_event.value


TeamRunOutputEvent = Union[
RunStartedEvent,
RunContentEvent,
Expand All @@ -231,6 +238,7 @@ class OutputModelResponseCompletedEvent(BaseTeamRunEvent):
ParserModelResponseCompletedEvent,
OutputModelResponseStartedEvent,
OutputModelResponseCompletedEvent,
CustomEvent,
]

# Map event string to dataclass for team events
Expand All @@ -252,6 +260,7 @@ class OutputModelResponseCompletedEvent(BaseTeamRunEvent):
TeamRunEvent.parser_model_response_completed.value: ParserModelResponseCompletedEvent,
TeamRunEvent.output_model_response_started.value: OutputModelResponseStartedEvent,
TeamRunEvent.output_model_response_completed.value: OutputModelResponseCompletedEvent,
TeamRunEvent.custom_event.value: CustomEvent,
}


Expand Down
10 changes: 10 additions & 0 deletions libs/agno/agno/run/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class WorkflowRunEvent(str, Enum):

step_output = "StepOutput"

custom_event = "CustomEvent"


@dataclass
class BaseWorkflowRunOutputEvent:
Expand Down Expand Up @@ -380,6 +382,13 @@ def stop(self) -> bool:
return self.step_output.stop if self.step_output else False


@dataclass
class CustomEvent(BaseWorkflowRunOutputEvent):
"""Event sent when a custom event is produced"""

event: str = WorkflowRunEvent.custom_event.value


# Union type for all workflow run response events
WorkflowRunOutputEvent = Union[
WorkflowStartedEvent,
Expand All @@ -402,6 +411,7 @@ def stop(self) -> bool:
StepsExecutionStartedEvent,
StepsExecutionCompletedEvent,
StepOutputEvent,
CustomEvent,
]


Expand Down
4 changes: 2 additions & 2 deletions libs/agno/agno/team/team.py
Original file line number Diff line number Diff line change
Expand Up @@ -2059,7 +2059,7 @@ async def _ahandle_model_response_stream(
stream_model_response=stream_model_response,
) # type: ignore
async for model_response_event in model_stream:
for chunk in self._handle_model_response_chunk(
for event in self._handle_model_response_chunk(
session=session,
run_response=run_response,
full_model_response=full_model_response,
Expand All @@ -2069,7 +2069,7 @@ async def _ahandle_model_response_stream(
parse_structured_output=self.should_parse_structured_output,
workflow_context=workflow_context,
):
yield chunk
yield event

# Handle structured outputs
if (self.output_schema is not None) and not self.use_json_mode and (full_model_response.parsed is not None):
Expand Down
3 changes: 1 addition & 2 deletions libs/agno/agno/tools/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def __init__(
*,
env: Optional[dict[str, str]] = None,
server_params_list: Optional[
List[Union[SSEClientParams, StdioServerParameters, StreamableHTTPClientParams]]
list[Union[SSEClientParams, StdioServerParameters, StreamableHTTPClientParams]]
] = None,
timeout_seconds: int = 5,
client=None,
Expand Down Expand Up @@ -531,7 +531,6 @@ async def _connect(self) -> None:
session = await self._async_exit_stack.enter_async_context(ClientSession(read, write))
self._active_contexts.append(session)
await self.initialize(session)

# Handle Streamable HTTP connections
elif isinstance(server_params, StreamableHTTPClientParams):
client_connection = await self._async_exit_stack.enter_async_context(
Expand Down