Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
bd0a601
Refactor streaming interface to use `StreamingFrame` and add tool cal…
dosier Sep 5, 2025
2ec615c
Remove deprecated annotation from legacy streaming functions and docu…
dosier Sep 5, 2025
d67bf89
Add `tools` parameter to `execute` calls in `CalculatorPromptExecutor`.
dosier Sep 5, 2025
2667102
Refactor streaming interface to replace `StreamingFrame` with `Stream…
dosier Sep 5, 2025
1b630e2
Refactor `StreamChunk` to `StreamFrame` across streaming interfaces a…
dosier Sep 5, 2025
8c83a71
Fix typo: Replace `StreamingFrame` with `StreamFrame` in `executeStre…
dosier Sep 5, 2025
7d62c0f
Add `mapTextOnly` utility and integrate `WithTools` streaming refinem…
dosier Sep 5, 2025
3d060dc
feat: enhance streaming API with tool call support and nullable handling
rubencagnie-toast Sep 12, 2025
29f1546
docs: add KDoc documentation for new streaming functions and properties
rubencagnie-toast Sep 12, 2025
8a3f7c3
test: fix test compilation errors after streaming API changes
rubencagnie-toast Sep 12, 2025
2481948
Remove `-SNAPSHOT`
rubencagnie-toast Sep 12, 2025
8800d77
Add handlers
rubencagnie-toast Sep 13, 2025
fc1ab09
Suspend
rubencagnie-toast Sep 13, 2025
37b5879
Added tests
rubencagnie-toast Sep 13, 2025
b1644df
Merge remote-tracking branch 'refs/remotes/jb/develop' into streaming…
dosier Sep 13, 2025
ddcbdf1
Changed signature of `executeStreaming` and related funcs to return `…
dosier Sep 13, 2025
65d5b53
Merge upstream/streaming-tools: Resolve conflicts between streaming i…
rubencagnie-toast Sep 13, 2025
273e903
Merge conflicts
rubencagnie-toast Sep 13, 2025
b2e7f2b
Omit `WithTools`
rubencagnie-toast Sep 13, 2025
89163d7
Fix tests
rubencagnie-toast Sep 13, 2025
b63a71c
Refactor `filterTextOnly` utility: move to `ai.koog.prompt.streaming`…
dosier Sep 13, 2025
7c1479b
Added `StreamFrame.End` and removed index from ToolCall
rubencagnie-toast Sep 13, 2025
3bbf8c1
Added StreamFrame.End, introduced stream flow builder and operators
dosier Sep 13, 2025
22177d7
Fixed formatting and resolved ktlin errors
dosier Sep 13, 2025
62971ae
Merge branch 'streaming-tools' into streaming-tools-ruben
dosier Sep 13, 2025
ca326b8
Fixed remaining merging errors, using new flow building extensions
dosier Sep 13, 2025
053b608
Removed partial tool call streaming support, relying on `response.out…
dosier Sep 13, 2025
5b684ce
Adding an example to test the streaming logic
rubencagnie-toast Sep 15, 2025
0895d55
Remove logging from Example
rubencagnie-toast Sep 15, 2025
e0027a7
Updated stream flow builder api to support for auto merging of chunke…
dosier Sep 15, 2025
1937850
Some refactoring, removed unused tool call context class
dosier Sep 15, 2025
ef4ef49
Emit previous pending tool call if the new id is different
dosier Sep 15, 2025
f6f7279
Merge remote-tracking branch 'ruben/streaming-tools' into streaming-t…
dosier Sep 15, 2025
e9fe8fd
Add test cases for StreamFrameFlowBuilder; refactor streaming chunk p…
dosier Sep 15, 2025
3aa210d
Slight refactor / doc fix in StreamFrameFlowBuilder.kt
dosier Sep 15, 2025
0a0b20a
Fix GoogleLLMClient (include finish reason and emit end frame correctly)
dosier Sep 15, 2025
4babf3e
Remove useless elvis operator for non null mapping in StreamFrameExt.kt
dosier Sep 15, 2025
ee1729c
Added `ResponseMetaInfo` property to `StreamFrame.End`
dosier Sep 15, 2025
169f719
Merge remote-tracking branch 'jb/develop' into streaming-tools
dosier Sep 15, 2025
5bbbc46
Remove the blank frame check in `.toAssistant` and return null on no …
dosier Sep 16, 2025
5f6c462
Refactor to use Iterable instead of List in StreamFrame extension fun…
dosier Sep 16, 2025
3c4de4f
Refactor and improve `.toMessageResponses` logic; simplify `.toTools`…
dosier Sep 16, 2025
98b000c
Rename extension functions for clarity: `.toTools` to `.toToolCallMes…
dosier Sep 16, 2025
74a1ef4
Reorder message creation logic in `StreamFrameExt` to prioritize tool…
dosier Sep 16, 2025
7a63f59
Add unit tests for `StreamFrameExt` extension functions in `StreamFra…
dosier Sep 16, 2025
9fb31e1
Refactor `filterTextOnly` to inline `filterAppendsOnly` for simplicit…
dosier Sep 16, 2025
08b5a73
Refactor `nodeLLMRequestStreamingAndSendResults` to simplify stream p…
dosier Sep 16, 2025
bd3ee43
Partially fix Anthropic streaming logic
OKatrych Sep 16, 2025
badf8c9
Update `streaming-api.md` to enhance clarity and align with recent AP…
dosier Sep 16, 2025
c65fa33
Merge pull request #2 from OKatrych/anthropic-fix
dosier Sep 16, 2025
a996247
Refactor `AnthropicLLMClient` to use `buildStreamFrameFlow` for clean…
dosier Sep 16, 2025
3aaacb0
Rename `appendToolCall` to `upsertToolCall` in StreamFrameFlowBuilder
dosier Sep 16, 2025
29503a9
Fixed styling issues (ktlint)
dosier Sep 16, 2025
60391c6
Updated AWS models stream response parsing and tests
dosier Sep 16, 2025
04fd2d2
Handle `ResponseCompleted` event in `OpenAILLMClient` to create `Stre…
dosier Sep 16, 2025
22fe17e
Merge remote-tracking branch 'jb/develop' into streaming-tools
dosier Sep 16, 2025
14273ad
Add `StreamFrameFlowBuilderError` sealed class and enhance error hand…
dosier Sep 17, 2025
6387b0a
Enhance error handling in `StreamFrameFlowBuilder` with additional er…
dosier Sep 17, 2025
86d775e
Align `onStreamError` handling across core, examples, and documentati…
dosier Sep 17, 2025
b02483d
Rethrow the caught error in stream frame flow
dosier Sep 17, 2025
b810722
Fix example example-streaming-api-02-02.kt
kpavlov Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import ai.koog.agents.core.tools.ToolArgs
import ai.koog.agents.core.tools.ToolDescriptor
import ai.koog.agents.core.tools.ToolResult
import ai.koog.prompt.message.Message
import ai.koog.prompt.streaming.StreamFrame
import ai.koog.prompt.structure.StructureFixingParser
import ai.koog.prompt.structure.StructuredDataDefinition
import ai.koog.prompt.structure.StructuredResponse
Expand Down Expand Up @@ -171,12 +172,12 @@ public suspend inline fun <reified T> AIAgentFunctionalContext.requestLLMStructu
*
* @param message The content of the message to be sent to the LLM.
* @param structureDefinition Optional structure to guide the LLM response.
* @return A flow of string chunks from the LLM response.
* @return A flow of [StreamFrame] objects from the LLM response.
*/
public suspend fun AIAgentFunctionalContext.requestLLMStreaming(
message: String,
structureDefinition: StructuredDataDefinition? = null
): Flow<String> {
): Flow<StreamFrame> {
return llm.writeSession {
updatePrompt {
user(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import ai.koog.prompt.executor.model.PromptExecutor
import ai.koog.prompt.llm.LLModel
import ai.koog.prompt.message.Message
import ai.koog.prompt.params.LLMParams
import ai.koog.prompt.streaming.StreamFrame
import ai.koog.prompt.structure.StructureFixingParser
import ai.koog.prompt.structure.StructuredDataDefinition
import ai.koog.prompt.structure.StructuredOutputConfig
Expand Down Expand Up @@ -477,9 +478,9 @@ public class AIAgentLLMWriteSession internal constructor(
*
* @param definition an optional parameter to define a structured data format. When provided, it will be used
* in constructing the prompt for the language model request.
* @return a flow of strings that streams the responses from the language model.
* @return a flow of `StreamingFrame` objects that streams the responses from the language model.
*/
public fun requestLLMStreaming(definition: StructuredDataDefinition? = null): Flow<String> {
public fun requestLLMStreaming(definition: StructuredDataDefinition? = null): Flow<StreamFrame> {
if (definition != null) {
val prompt = prompt(prompt, clock) {
user {
Expand All @@ -488,7 +489,6 @@ public class AIAgentLLMWriteSession internal constructor(
}
this.prompt = prompt
}

return executor.executeStreaming(prompt, model)
return executor.executeStreaming(prompt, model, tools)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import ai.koog.prompt.dsl.PromptBuilder
import ai.koog.prompt.dsl.prompt
import ai.koog.prompt.llm.LLModel
import ai.koog.prompt.message.Message
import ai.koog.prompt.streaming.StreamFrame
import ai.koog.prompt.streaming.toMessageResponses
import ai.koog.prompt.structure.StructureFixingParser
import ai.koog.prompt.structure.StructuredDataDefinition
import ai.koog.prompt.structure.StructuredOutputConfig
import ai.koog.prompt.structure.StructuredResponse
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.toList

/**
* A pass-through node that does nothing and returns input as output
Expand Down Expand Up @@ -240,7 +243,7 @@ public inline fun <reified T> AIAgentSubgraphBuilderBase<*, *>.nodeLLMRequestStr
public fun <T> AIAgentSubgraphBuilderBase<*, *>.nodeLLMRequestStreaming(
name: String? = null,
structureDefinition: StructuredDataDefinition? = null,
transformStreamData: suspend (Flow<String>) -> Flow<T>
transformStreamData: suspend (Flow<StreamFrame>) -> Flow<T>
): AIAgentNodeDelegate<String, Flow<T>> =
node(name) { message ->
llm.writeSession {
Expand All @@ -264,7 +267,7 @@ public fun <T> AIAgentSubgraphBuilderBase<*, *>.nodeLLMRequestStreaming(
public fun AIAgentSubgraphBuilderBase<*, *>.nodeLLMRequestStreaming(
name: String? = null,
structureDefinition: StructuredDataDefinition? = null,
): AIAgentNodeDelegate<String, Flow<String>> = nodeLLMRequestStreaming(name, structureDefinition) { it }
): AIAgentNodeDelegate<String, Flow<StreamFrame>> = nodeLLMRequestStreaming(name, structureDefinition) { it }

/**
* A node that appends a user message to the LLM prompt and gets multiple LLM responses with tool calls enabled.
Expand Down Expand Up @@ -315,6 +318,43 @@ public inline fun <reified T> AIAgentSubgraphBuilderBase<*, *>.nodeLLMCompressHi
input
}

/**
* A node that performs LLM streaming, collects all stream frames, converts them to response messages,
* and updates the prompt with the results.
*
* This node is useful when you want to:
* - Stream responses from the LLM for real-time feedback
* - Collect the complete streamed response as messages
* - Automatically update the conversation history with the streamed responses
*
* The node will:
* 1. Initiate a streaming request to the LLM
* 2. Collect all stream frames (text, tool calls, etc.)
* 3. Convert the collected frames into proper Message.Response objects
* 4. Update the prompt with these messages for conversation continuity
* 5. Return the collected messages
*
* @param T The type of input this node accepts (passed through without modification)
* @param name Optional node name for identification in the agent graph
* @param structureDefinition Optional structure definition to guide the LLM's response format
* @return A node delegate that accepts input of type T and returns a list of response messages
*
* @see nodeLLMRequestStreaming for streaming without automatic prompt updates
* @see ai.koog.agents.core.agent.session.AIAgentLLMWriteSession.requestLLMStreaming for the underlying streaming functionality
*/
@AIAgentBuilderDslMarker
public inline fun <reified T> AIAgentSubgraphBuilderBase<*, *>.nodeLLMRequestStreamingAndSendResults(
name: String? = null,
structureDefinition: StructuredDataDefinition? = null
): AIAgentNodeDelegate<T, List<Message.Response>> = node(name) { input ->
llm.writeSession {
requestLLMStreaming(structureDefinition)
.toList()
.toMessageResponses()
.also { updatePrompt { messages(it) } }
}
}

// ==========
// Tool nodes
// ==========
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import ai.koog.agents.core.environment.AIAgentEnvironment
import ai.koog.agents.core.feature.config.FeatureConfig
import ai.koog.agents.core.feature.handler.AfterLLMCallContext
import ai.koog.agents.core.feature.handler.AfterLLMCallHandler
import ai.koog.agents.core.feature.handler.AfterStreamContext
import ai.koog.agents.core.feature.handler.AfterStreamHandler
import ai.koog.agents.core.feature.handler.AgentBeforeCloseContext
import ai.koog.agents.core.feature.handler.AgentBeforeCloseHandler
import ai.koog.agents.core.feature.handler.AgentContextHandler
Expand All @@ -23,13 +25,20 @@ import ai.koog.agents.core.feature.handler.AgentTransformEnvironmentContext
import ai.koog.agents.core.feature.handler.BeforeAgentStartedHandler
import ai.koog.agents.core.feature.handler.BeforeLLMCallContext
import ai.koog.agents.core.feature.handler.BeforeLLMCallHandler
import ai.koog.agents.core.feature.handler.BeforeStreamContext
import ai.koog.agents.core.feature.handler.BeforeStreamHandler
import ai.koog.agents.core.feature.handler.ExecuteLLMHandler
import ai.koog.agents.core.feature.handler.ExecuteToolHandler
import ai.koog.agents.core.feature.handler.StrategyFinishContext
import ai.koog.agents.core.feature.handler.StrategyFinishedHandler
import ai.koog.agents.core.feature.handler.StrategyHandler
import ai.koog.agents.core.feature.handler.StrategyStartContext
import ai.koog.agents.core.feature.handler.StrategyStartedHandler
import ai.koog.agents.core.feature.handler.StreamErrorContext
import ai.koog.agents.core.feature.handler.StreamErrorHandler
import ai.koog.agents.core.feature.handler.StreamFrameContext
import ai.koog.agents.core.feature.handler.StreamFrameHandler
import ai.koog.agents.core.feature.handler.StreamHandler
import ai.koog.agents.core.feature.handler.ToolCallContext
import ai.koog.agents.core.feature.handler.ToolCallFailureContext
import ai.koog.agents.core.feature.handler.ToolCallFailureHandler
Expand All @@ -46,6 +55,7 @@ import ai.koog.prompt.dsl.ModerationResult
import ai.koog.prompt.dsl.Prompt
import ai.koog.prompt.llm.LLModel
import ai.koog.prompt.message.Message
import ai.koog.prompt.streaming.StreamFrame
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -117,6 +127,12 @@ public abstract class AIAgentPipeline {
*/
protected val executeLLMHandlers: MutableMap<AIAgentStorageKey<*>, ExecuteLLMHandler> = mutableMapOf()

/**
* Map of feature storage keys to their stream handlers.
* These handlers manage the streaming lifecycle events (before, during, and after streaming).
*/
protected val streamHandlers: MutableMap<AIAgentStorageKey<*>, StreamHandler> = mutableMapOf()

internal suspend fun prepareFeatures() {
withContext(featurePrepareDispatcher) {
registeredFeatures.values.forEach { featureConfig ->
Expand Down Expand Up @@ -202,6 +218,71 @@ public abstract class AIAgentPipeline {
agentHandlers.values.forEach { handler -> handler.agentRunErrorHandler.handle(eventContext) }
}

/**
* Invoked before streaming from a language model begins.
*
* This method notifies all registered stream handlers that streaming is about to start,
* allowing them to perform preprocessing or logging operations.
*
* @param runId The unique identifier for this streaming session
* @param prompt The prompt being sent to the language model
* @param model The language model being used for streaming
* @param tools The list of available tool descriptors for this streaming session
*/
public suspend fun onBeforeStream(runId: String, prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>) {
val eventContext = BeforeStreamContext(runId, prompt, model, tools)
streamHandlers.values.forEach { handler -> handler.beforeStreamHandler.handle(eventContext) }
}

/**
* Invoked when a stream frame is received during the streaming process.
*
* This method notifies all registered stream handlers about each incoming stream frame,
* allowing them to process, transform, or aggregate the streaming content in real-time.
*
* @param runId The unique identifier for this streaming session
* @param streamFrame The individual stream frame containing partial response data
*/
public suspend fun onStreamFrame(runId: String, streamFrame: StreamFrame) {
val eventContext = StreamFrameContext(runId, streamFrame)
streamHandlers.values.forEach { handler -> handler.streamFrameHandler.handle(eventContext) }
}

/**
* Invoked if an error occurs during the streaming process.
*
* This method notifies all registered stream handlers about the streaming error,
* allowing them to handle or log the error.
*
* @param runId The unique identifier for this streaming session
* @param throwable The exception that occurred during streaming, if applicable
*/
public suspend fun onStreamError(runId: String, throwable: Throwable) {
val eventContext = StreamErrorContext(runId, throwable)
streamHandlers.values.forEach { handler -> handler.streamErrorHandler.handle(eventContext) }
}

/**
* Invoked after streaming from a language model completes.
*
* This method notifies all registered stream handlers that streaming has finished,
* allowing them to perform post-processing, cleanup, or final logging operations.
*
* @param runId The unique identifier for this streaming session
* @param prompt The prompt that was sent to the language model
* @param model The language model that was used for streaming
* @param tools The list of tool descriptors that were available for this streaming session
*/
public suspend fun onAfterStream(
runId: String,
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>
) {
val eventContext = AfterStreamContext(runId, prompt, model, tools)
streamHandlers.values.forEach { handler -> handler.afterStreamHandler.handle(eventContext) }
}

/**
* Invoked before an agent is closed to perform necessary pre-closure operations.
*
Expand Down Expand Up @@ -661,6 +742,104 @@ public abstract class AIAgentPipeline {
}
}

/**
* Intercepts streaming operations before they begin to modify or log the streaming request.
*
* This method allows features to hook into the streaming pipeline before streaming starts,
* enabling preprocessing, validation, or logging of streaming requests.
*
* @param interceptContext The context containing the feature and its implementation
* @param handle The handler that processes before-stream events
*
* Example:
* ```
* pipeline.interceptBeforeStream(InterceptContext) { eventContext ->
* logger.info("About to start streaming with prompt: ${eventContext.prompt.messages.last().content}")
* }
* ```
*/
public fun <TFeature : Any> interceptBeforeStream(
interceptContext: InterceptContext<TFeature>,
handle: suspend TFeature.(eventContext: BeforeStreamContext) -> Unit
) {
val existingHandler = streamHandlers.getOrPut(interceptContext.feature.key) { StreamHandler() }

existingHandler.beforeStreamHandler = BeforeStreamHandler { eventContext: BeforeStreamContext ->
with(interceptContext.featureImpl) { handle(eventContext) }
}
}

/**
* Intercepts stream frames as they are received during the streaming process.
*
* This method allows features to process individual stream frames in real-time,
* enabling monitoring, transformation, or aggregation of streaming content.
*
* @param interceptContext The context containing the feature and its implementation
* @param handle The handler that processes stream frame events
*
* Example:
* ```
* pipeline.interceptOnStreamFrame(InterceptContext) { eventContext ->
* logger.debug("Received stream frame: ${eventContext.streamFrame}")
* }
* ```
*/
public fun <TFeature : Any> interceptOnStreamFrame(
interceptContext: InterceptContext<TFeature>,
handle: suspend TFeature.(eventContext: StreamFrameContext) -> Unit
) {
val existingHandler = streamHandlers.getOrPut(interceptContext.feature.key) { StreamHandler() }

existingHandler.streamFrameHandler = StreamFrameHandler { eventContext: StreamFrameContext ->
with(interceptContext.featureImpl) { handle(eventContext) }
}
}

/**
* Intercepts errors during the streaming process.
*
* @param interceptContext The context containing the feature and its implementation
* @param handle The handler that processes stream errors
*/
public fun <TFeature : Any> interceptOnStreamError(
interceptContext: InterceptContext<TFeature>,
handle: suspend TFeature.(eventContext: StreamErrorContext) -> Unit
) {
val existingHandler = streamHandlers.getOrPut(interceptContext.feature.key) { StreamHandler() }

existingHandler.streamErrorHandler = StreamErrorHandler { eventContext: StreamErrorContext ->
with(interceptContext.featureImpl) { handle(eventContext) }
}
}

/**
* Intercepts streaming operations after they complete to perform post-processing or cleanup.
*
* This method allows features to hook into the streaming pipeline after streaming finishes,
* enabling post-processing, cleanup, or final logging of the streaming session.
*
* @param interceptContext The context containing the feature and its implementation
* @param handle The handler that processes after-stream events
*
* Example:
* ```
* pipeline.interceptAfterStream(InterceptContext) { eventContext ->
* logger.info("Streaming completed for run: ${eventContext.runId}")
* }
* ```
*/
public fun <TFeature : Any> interceptAfterStream(
interceptContext: InterceptContext<TFeature>,
handle: suspend TFeature.(eventContext: AfterStreamContext) -> Unit
) {
val existingHandler = streamHandlers.getOrPut(interceptContext.feature.key) { StreamHandler() }

existingHandler.afterStreamHandler = AfterStreamHandler { eventContext: AfterStreamContext ->
with(interceptContext.featureImpl) { handle(eventContext) }
}
}

/**
* Intercepts and handles tool calls for the specified feature and its implementation.
* Updates the tool call handler for the given feature key with a custom handler.
Expand Down
Loading
Loading