Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from semantic_kernel.agents import Agent, ChatCompletionAgent, SequentialOrchestration
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.contents import StreamingChatMessageContent

"""
The following sample demonstrates how to create a sequential orchestration for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import AuthorRole, ChatMessageContent
from semantic_kernel.contents import AuthorRole, ChatMessageContent, FunctionCallContent, FunctionResultContent
from semantic_kernel.functions import kernel_function

"""
Expand Down Expand Up @@ -120,8 +120,18 @@ def get_agents() -> tuple[list[Agent], OrchestrationHandoffs]:


def agent_response_callback(message: ChatMessageContent) -> None:
"""Observer function to print the messages from the agents."""
"""Observer function to print the messages from the agents.

Please note that this function is called whenever the agent generates a response,
including the internal processing messages (such as tool calls) that are not visible
to other agents in the orchestration.
"""
print(f"{message.name}: {message.content}")
for item in message.items:
if isinstance(item, FunctionCallContent):
print(f"Calling '{item.name}' with arguments '{item.arguments}'")
if isinstance(item, FunctionResultContent):
print(f"Result from '{item.name}' is '{item.result}'")


def human_response_function() -> ChatMessageContent:
Expand All @@ -147,7 +157,7 @@ async def main():

# 3. Invoke the orchestration with a task and the runtime
orchestration_result = await handoff_orchestration.invoke(
task="A customer is on the line.",
task="Greet the customer who is reaching out for support.",
runtime=runtime,
)

Expand All @@ -160,24 +170,48 @@ async def main():

"""
Sample output:
TriageAgent: Hello! Thank you for reaching out. How can I assist you today?
TriageAgent: Hello! Thank you for reaching out for support. How can I assist you today?
User: I'd like to track the status of my order
OrderStatusAgent: Sure, I can help you with that. Could you please provide me with your order ID?
TriageAgent:
Calling 'Handoff-transfer_to_OrderStatusAgent' with arguments '{}'
TriageAgent:
Result from 'Handoff-transfer_to_OrderStatusAgent' is 'None'
OrderStatusAgent: Could you please provide me with your order ID so I can check the status for you?
User: My order ID is 123
OrderStatusAgent: Your order with ID 123 has been shipped and is expected to arrive in 2-3 days. Is there anything
else I can assist you with?
OrderStatusAgent:
Calling 'OrderStatusPlugin-check_order_status' with arguments '{"order_id":"123"}'
OrderStatusAgent:
Result from 'OrderStatusPlugin-check_order_status' is 'Order 123 is shipped and will arrive in 2-3 days.'
OrderStatusAgent: Your order with ID 123 has been shipped and is expected to arrive in 2-3 days. If you have any
more questions, feel free to ask!
User: I want to return another order of mine
OrderReturnAgent: I can help you with returning your order. Could you please provide the order ID for the return
and the reason you'd like to return it?
OrderStatusAgent: I can help you with that. Could you please provide me with the order ID of the order you want
to return?
User: Order ID 321
OrderReturnAgent: Please provide the reason for returning the order with ID 321.
OrderStatusAgent:
Calling 'Handoff-transfer_to_TriageAgent' with arguments '{}'
OrderStatusAgent:
Result from 'Handoff-transfer_to_TriageAgent' is 'None'
TriageAgent:
Calling 'Handoff-transfer_to_OrderReturnAgent' with arguments '{}'
TriageAgent:
Result from 'Handoff-transfer_to_OrderReturnAgent' is 'None'
OrderReturnAgent: Could you please provide me with the reason for the return for order ID 321?
User: Broken item
Processing return for order 321 due to: Broken item
OrderReturnAgent: The return for your order with ID 321 has been successfully processed due to the broken item.
Is there anything else I can assist you with?
OrderReturnAgent:
Calling 'OrderReturnPlugin-process_return' with arguments '{"order_id":"321","reason":"Broken item"}'
OrderReturnAgent:
Result from 'OrderReturnPlugin-process_return' is 'Return for order 321 has been processed successfully.'
OrderReturnAgent: The return for order ID 321 has been processed successfully due to a broken item. If you need
further assistance or have any other questions, feel free to let me know!
User: No, bye
Task is completed with summary: Handled order return for order ID 321 due to a broken item, and successfully
processed the return.
Task is completed with summary: Processed the return request for order ID 321 due to a broken item.
OrderReturnAgent:
Calling 'Handoff-complete_task' with arguments '{"task_summary":"Processed the return request for order ID 321
due to a broken item."}'
OrderReturnAgent:
Result from 'Handoff-complete_task' is 'None'
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio

from semantic_kernel.agents import Agent, ChatCompletionAgent, HandoffOrchestration, OrchestrationHandoffs
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import (
AuthorRole,
ChatMessageContent,
FunctionCallContent,
FunctionResultContent,
StreamingChatMessageContent,
)
from semantic_kernel.functions import kernel_function

"""
The following sample demonstrates how to create a handoff orchestration that represents
a customer support triage system. The orchestration consists of 4 agents, each specialized
in a different area of customer support: triage, refunds, order status, and order returns.

The orchestration is configured with a streaming agent response callback that prints the
messages from the agents as they are generated.

Depending on the customer's request, agents can hand off the conversation to the appropriate
agent.

Human in the loop is achieved via a callback function similar to the one used in group chat
orchestration. Except that in the handoff orchestration, all agents have access to the
human response function, whereas in the group chat orchestration, only the manager has access
to the human response function.

This sample demonstrates the basic steps of creating and starting a runtime, creating
a handoff orchestration, invoking the orchestration, and finally waiting for the results.
"""


class OrderStatusPlugin:
@kernel_function
def check_order_status(self, order_id: str) -> str:
"""Check the status of an order."""
# Simulate checking the order status
return f"Order {order_id} is shipped and will arrive in 2-3 days."


class OrderRefundPlugin:
@kernel_function
def process_refund(self, order_id: str, reason: str) -> str:
"""Process a refund for an order."""
# Simulate processing a refund
print(f"Processing refund for order {order_id} due to: {reason}")
return f"Refund for order {order_id} has been processed successfully."


class OrderReturnPlugin:
@kernel_function
def process_return(self, order_id: str, reason: str) -> str:
"""Process a return for an order."""
# Simulate processing a return
print(f"Processing return for order {order_id} due to: {reason}")
return f"Return for order {order_id} has been processed successfully."


def get_agents() -> tuple[list[Agent], OrchestrationHandoffs]:
"""Return a list of agents that will participate in the Handoff orchestration and the handoff relationships.

Feel free to add or remove agents and handoff connections.
"""
support_agent = ChatCompletionAgent(
name="TriageAgent",
description="A customer support agent that triages issues.",
instructions="Handle customer requests.",
service=AzureChatCompletion(),
)

refund_agent = ChatCompletionAgent(
name="RefundAgent",
description="A customer support agent that handles refunds.",
instructions="Handle refund requests.",
service=AzureChatCompletion(),
plugins=[OrderRefundPlugin()],
)

order_status_agent = ChatCompletionAgent(
name="OrderStatusAgent",
description="A customer support agent that checks order status.",
instructions="Handle order status requests.",
service=AzureChatCompletion(),
plugins=[OrderStatusPlugin()],
)

order_return_agent = ChatCompletionAgent(
name="OrderReturnAgent",
description="A customer support agent that handles order returns.",
instructions="Handle order return requests.",
service=AzureChatCompletion(),
plugins=[OrderReturnPlugin()],
)

# Define the handoff relationships between agents
handoffs = (
OrchestrationHandoffs()
.add_many(
source_agent=support_agent.name,
target_agents={
refund_agent.name: "Transfer to this agent if the issue is refund related",
order_status_agent.name: "Transfer to this agent if the issue is order status related",
order_return_agent.name: "Transfer to this agent if the issue is order return related",
},
)
.add(
source_agent=refund_agent.name,
target_agent=support_agent.name,
description="Transfer to this agent if the issue is not refund related",
)
.add(
source_agent=order_status_agent.name,
target_agent=support_agent.name,
description="Transfer to this agent if the issue is not order status related",
)
.add(
source_agent=order_return_agent.name,
target_agent=support_agent.name,
description="Transfer to this agent if the issue is not order return related",
)
)

return [support_agent, refund_agent, order_status_agent, order_return_agent], handoffs


# Flag to indicate if a new message is being received
is_new_message = True


def streaming_agent_response_callback(message: StreamingChatMessageContent, is_final: bool) -> None:
"""Observer function to print the messages from the agents.

Please note that this function is called whenever the agent generates a response,
including the internal processing messages (such as tool calls) that are not visible
to other agents in the orchestration.

In streaming mode, the FunctionCallContent and FunctionResultContent are provided as a
complete message.

Args:
message (StreamingChatMessageContent): The streaming message content from the agent.
is_final (bool): Indicates if this is the final part of the message.
"""
global is_new_message
if is_new_message:
print(f"{message.name}: ", end="", flush=True)
is_new_message = False
print(message.content, end="", flush=True)

for item in message.items:
if isinstance(item, FunctionCallContent):
print(f"Calling '{item.name}' with arguments '{item.arguments}'", end="", flush=True)
if isinstance(item, FunctionResultContent):
print(f"Result from '{item.name}' is '{item.result}'", end="", flush=True)

if is_final:
print()
is_new_message = True


def human_response_function() -> ChatMessageContent:
"""Observer function to print the messages from the agents."""
user_input = input("User: ")
return ChatMessageContent(role=AuthorRole.USER, content=user_input)


async def main():
"""Main function to run the agents."""
# 1. Create a handoff orchestration with multiple agents
agents, handoffs = get_agents()
handoff_orchestration = HandoffOrchestration(
members=agents,
handoffs=handoffs,
streaming_agent_response_callback=streaming_agent_response_callback,
human_response_function=human_response_function,
)

# 2. Create a runtime and start it
runtime = InProcessRuntime()
runtime.start()

# 3. Invoke the orchestration with a task and the runtime
orchestration_result = await handoff_orchestration.invoke(
task="Greet the customer who is reaching out for support.",
runtime=runtime,
)

# 4. Wait for the results
value = await orchestration_result.get()
print(value)

# 5. Stop the runtime after the invocation is complete
await runtime.stop_when_idle()

"""
Sample output:
TriageAgent: Hello! Thank you for reaching out for support. How can I assist you today?
User: I'd like to track the status of my order
TriageAgent: Calling 'Handoff-transfer_to_OrderStatusAgent' with arguments '{}'
TriageAgent: Result from 'Handoff-transfer_to_OrderStatusAgent' is 'None'
OrderStatusAgent: Could you please provide me with your order ID? This will help me check the status of your order.
User: My order ID is 123
OrderStatusAgent: Calling 'OrderStatusPlugin-check_order_status' with arguments '{"order_id":"123"}'
OrderStatusAgent: Result from 'OrderStatusPlugin-check_order_status' is 'Order 123 is shipped and will arrive in
2-3 days.'
OrderStatusAgent: Your order with ID 123 has been shipped and is expected to arrive in 2-3 days. If you have any
more questions, feel free to ask!
User: I want to return another order of mine
OrderStatusAgent: Calling 'Handoff-transfer_to_TriageAgent' with arguments '{}'
OrderStatusAgent: Result from 'Handoff-transfer_to_TriageAgent' is 'None'
TriageAgent: Calling 'Handoff-transfer_to_OrderReturnAgent' with arguments '{}'
TriageAgent: Result from 'Handoff-transfer_to_OrderReturnAgent' is 'None'
OrderReturnAgent: Could you please provide me with the order ID for the order you would like to return, as well
as the reason for the return?
User: Order ID 321
OrderReturnAgent: What is the reason for returning order ID 321?
User: Broken item
Processing return for order 321 due to: Broken item
OrderReturnAgent: Calling 'OrderReturnPlugin-process_return' with arguments '{"order_id":"321","reason":"Broken
item"}'
OrderReturnAgent: Result from 'OrderReturnPlugin-process_return' is 'Return for order 321 has been processed
successfully.'
OrderReturnAgent: Task is completed with summary: Processed return for order ID 321 due to a broken item.
Calling 'Handoff-complete_task' with arguments '{"task_summary":"Processed return for order ID 321 due to a
broken item."}'
OrderReturnAgent: Result from 'Handoff-complete_task' is 'None'
"""


if __name__ == "__main__":
asyncio.run(main())
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ async def _drain_mutated_messages(
drained: list[ChatMessageContent] = []
for i in range(start, len(history)):
msg: ChatMessageContent = history[i] # type: ignore
msg.name = self.name
await thread.on_new_message(msg)
drained.append(msg)
return drained
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ async def _invoke_agent(self, additional_messages: DefaultTypeAlias | None = Non
streaming_message_buffer: list[StreamingChatMessageContent] = []
messages = self._create_messages(additional_messages)

async for response_item in self._agent.invoke_stream(messages=messages, thread=self._agent_thread, **kwargs): # type: ignore[arg-type]
async for response_item in self._agent.invoke_stream(
messages, # type: ignore[arg-type]
thread=self._agent_thread,
on_intermediate_message=self._handle_intermediate_message,
**kwargs,
):
# Buffer message chunks and stream them with correct is_final flag.
streaming_message_buffer.append(response_item.message)
if len(streaming_message_buffer) > 1:
Expand Down Expand Up @@ -149,3 +154,24 @@ def _create_messages(self, additional_messages: DefaultTypeAlias | None = None)
if isinstance(additional_messages, list):
return base_messages + additional_messages
return [*base_messages, additional_messages]

async def _handle_intermediate_message(self, message: ChatMessageContent) -> None:
"""Handle intermediate messages from the agent.

This method is called with messages produced during streaming agent responses.
Although the parameter is typed as `ChatMessageContent` (to match the `invoke_stream` callback signature),
the actual object will always be a `StreamingChatMessageContent` (a subclass of `ChatMessageContent`).

The agent response callback expects a `ChatMessageContent`, so we can pass the message directly.
However, the streaming agent response callback specifically requires a `StreamingChatMessageContent`.
To avoid type errors from the static type checker due to down casting (from `ChatMessageContent` to
`StreamingChatMessageContent`), we check that the message is of the correct type before calling the callbacks.
Since it will always be a `StreamingChatMessageContent`, this check is safe.
"""
if not isinstance(message, StreamingChatMessageContent):
raise TypeError(
f"Expected message to be of type 'StreamingChatMessageContent', "
f"but got '{type(message).__name__}' instead."
)
await self._call_agent_response_callback(message)
await self._call_streaming_agent_response_callback(message, is_final=True)
Loading
Loading