Skip to content

Commit 007cf8e

Browse files
authored
KG-294 Support for Non-Ktor http clients. Part 1 (#707)
## Motivation and Context KG-294 Support non-Ktor clients in public API > Currently, all LLM Clients accept HttpClient from Ktor (optional). It will confuse Java users who don't have Ktor dependency but already have a client. First step its to introduce KoogHttpClient interface and migrate client code in OpenAI-compatible LLM Clients from io.ktor.client.HttpClient to this new abstraction. Though, KTor isn't completely decoupled yet. ### Main Changes: **New HTTP Client Interface (KoogHttpClient):** - Introduces an interface (KoogHttpClient) defining contracts for HTTP operations (e.g., post, sse). - Implements KoogHttpClient using Ktor-based functionality (KtorHttpClient). **Dependency Enhancements:** - Adds new dependencies: ktor-client-core, oshai.kotlin.logging, and related libraries. - Updates a dependency on test utilities in various Gradle files. **Refactoring of Existing Components:** - Replaces inline HTTP operations with reusable HTTP client methods (KoogHttpClient) in OpenAI-based related LLM clients. - Streamlines HTTP error handling and logging via centralized logic in the HTTP client. **Improvement in Workflow Configuration:** - Updates the GitHub workflow (checks.yml) to cancel jobs not just for main but also for develop branches. **Code Cleanups:** - Removes unused imports and redundant code in HTTP-related sections. - Refactors streaming and moderation response handling for clarity and consistency. ## Breaking Changes --- #### Type of the changes - [x] New feature (non-breaking change which adds functionality) - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Tests improvement - [x] Refactoring #### Checklist - [x] The pull request has a description of the proposed change - [x] I read the [Contributing Guidelines](https://github.com/JetBrains/koog/blob/main/CONTRIBUTING.md) before opening the pull request - [x] The pull request uses **`develop`** as the base branch - [ ] Tests for the changes have been added - [ ] All new and existing tests passed ##### Additional steps for pull requests adding a new feature - [x] An issue describing the proposed change exists - [x] The pull request includes a link to the issue - [ ] The change was discussed and approved in the issue - [ ] Docs have been added / updated
1 parent fabd964 commit 007cf8e

File tree

18 files changed

+584
-301
lines changed

18 files changed

+584
-301
lines changed

.github/workflows/checks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ on:
1515
concurrency:
1616
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
1717
# Cancel only when the run is not on main
18-
cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
18+
cancel-in-progress: ${{ github.ref != 'refs/heads/main' || github.ref != 'refs/heads/develop' }}
1919

2020
jobs:
2121
compilation:

agents/agents-utils/build.gradle.kts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ kotlin {
1212
sourceSets {
1313
commonMain {
1414
dependencies {
15+
api(libs.jetbrains.annotations)
1516
api(libs.kotlinx.coroutines.core)
17+
implementation(libs.ktor.client.core)
18+
implementation(libs.oshai.kotlin.logging)
1619
}
1720
}
1821

@@ -21,8 +24,10 @@ kotlin {
2124

2225
commonTest {
2326
dependencies {
24-
implementation(kotlin("test"))
25-
implementation(libs.kotlinx.coroutines.test)
27+
implementation(project(":test-utils"))
28+
implementation(libs.ktor.serialization.kotlinx.json)
29+
implementation(libs.ktor.client.content.negotiation)
30+
implementation(libs.ktor.client.mock)
2631
}
2732
}
2833

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package ai.koog.agents.utils
2+
3+
import kotlinx.coroutines.flow.Flow
4+
import org.jetbrains.annotations.ApiStatus.Experimental
5+
import kotlin.reflect.KClass
6+
7+
/**
8+
* Abstract interfaces defining a contract for HTTP client implementations.
9+
* Provides methods for making HTTP POST requests and handling Server-Sent Events (SSE) streams.
10+
*
11+
* Implementations are supposed to use a particular library or framework.
12+
*/
13+
@Experimental
14+
public interface KoogHttpClient {
15+
16+
/**
17+
* Sends an HTTP POST request to the specified `path` with the provided `request` payload.
18+
* The type of the request body and the expected response must be explicitly specified
19+
* using `requestBodyType` and `responseType`, respectively.
20+
*
21+
* @param path The endpoint path to which the HTTP POST request is sent.
22+
* @param request The request payload to be sent in the POST request.
23+
* @param requestBodyType The Kotlin class reference representing the type of the request body.
24+
* @param responseType The Kotlin class reference representing the expected type of the response.
25+
* @return The response payload, deserialized into the specified type.
26+
* @throws Exception if the request fails or the response cannot be deserialized.
27+
*/
28+
public suspend fun <T : Any, R : Any> post(
29+
path: String,
30+
request: T,
31+
requestBodyType: KClass<T>,
32+
responseType: KClass<R>
33+
): R
34+
35+
/**
36+
* Sends an HTTP POST request to the specified `path` with the provided `request` payload.
37+
*
38+
* @param path The endpoint path to which the HTTP POST request is sent.
39+
* @param request The request payload to be sent in the POST request. It must be a string.
40+
* @return The response payload from the server, represented as a string.
41+
*/
42+
public suspend fun post(
43+
path: String,
44+
request: String
45+
): String =
46+
post(path, request, String::class, String::class)
47+
48+
/**
49+
* Initiates a Server-Sent Events (SSE) streaming operation over an HTTP POST request.
50+
*
51+
* This function sends a request to the specified `path` with the given `request` payload,
52+
* processes the streamed chunks of data from the server, and emits the processed results as a flow of strings.
53+
*
54+
* @param path The endpoint path to which the SSE POST request is sent.
55+
* @param request The request payload to be sent in the POST request.
56+
* @param requestBodyType The Kotlin class reference representing the type of the request body.
57+
* @param dataFilter A lambda function that determines whether a received streaming data chunk should be processed.
58+
* It takes the raw data as a string and returns `true` if the data should be included, or `false` otherwise.
59+
* Defaults to accepting all non-null chunks.
60+
* @param decodeStreamingResponse A lambda function used to decode the raw streaming response data
61+
* into the target type. It takes a raw string and converts it into an object of type `R`.
62+
* @param processStreamingChunk A lambda function that processes the decoded streaming chunk and returns
63+
* a string result. If the returned value is `null`, the chunk will not be emitted to the resulting flow.
64+
* @return A [Flow] emitting processed strings derived from the streamed chunks of data.
65+
*/
66+
@Suppress("LongParameterList")
67+
public fun <T : Any, R : Any> sse(
68+
path: String,
69+
request: T,
70+
requestBodyType: KClass<T>,
71+
dataFilter: (String?) -> Boolean = { true },
72+
decodeStreamingResponse: (String) -> R,
73+
processStreamingChunk: (R) -> String?
74+
): Flow<String>
75+
76+
public companion object
77+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package ai.koog.agents.utils
2+
3+
import io.github.oshai.kotlinlogging.KLogger
4+
import io.ktor.client.HttpClientConfig
5+
import io.ktor.client.call.body
6+
import io.ktor.client.engine.HttpClientEngineConfig
7+
import io.ktor.client.plugins.sse.SSEClientException
8+
import io.ktor.client.plugins.sse.sse
9+
import io.ktor.client.request.accept
10+
import io.ktor.client.request.headers
11+
import io.ktor.client.request.post
12+
import io.ktor.client.request.setBody
13+
import io.ktor.client.statement.bodyAsText
14+
import io.ktor.client.statement.readRawBytes
15+
import io.ktor.http.ContentType
16+
import io.ktor.http.HttpHeaders
17+
import io.ktor.http.HttpMethod
18+
import io.ktor.http.isSuccess
19+
import io.ktor.util.reflect.TypeInfo
20+
import kotlinx.coroutines.Dispatchers
21+
import kotlinx.coroutines.flow.Flow
22+
import kotlinx.coroutines.flow.flow
23+
import kotlinx.coroutines.withContext
24+
import org.jetbrains.annotations.ApiStatus.Experimental
25+
import kotlin.jvm.JvmOverloads
26+
import kotlin.reflect.KClass
27+
28+
/**
29+
* KtorHttpClient is an implementation of the KoogHttpClient interface, utilizing Ktor's HttpClient
30+
* to perform HTTP operations, including POST requests and Server-Sent Events (SSE) streaming.
31+
*
32+
* This client provides enhanced logging, flexible request and response handling, and supports
33+
* configurability for underlying Ktor HttpClient instances.
34+
*
35+
* @property clientName The name of the client, used for logging and traceability.
36+
* @property logger A logging instance of type KLogger for recording client-related events and errors.
37+
* @constructor Creates a KtorHttpClient instance with an optional base Ktor HttpClient and configuration block.
38+
*
39+
* @param baseClient The base Ktor HttpClient instance to be used. Default is a newly created instance.
40+
* @param configurer A lambda function to configure the base Ktor HttpClient instance.
41+
* The configuration is applied using the Ktor `HttpClient.config` method.
42+
*/
43+
@Experimental
44+
internal class KoogKtorHttpClient internal constructor(
45+
private val clientName: String,
46+
private val logger: KLogger,
47+
baseClient: io.ktor.client.HttpClient = io.ktor.client.HttpClient(),
48+
configurer: HttpClientConfig<out HttpClientEngineConfig>.() -> Unit
49+
) : KoogHttpClient {
50+
51+
/**
52+
* A configured instance of the Ktor HTTP client used for making HTTP requests.
53+
*
54+
* This property is initialized with a base client configuration, extended using a custom
55+
* `configurer` function to adapt to specific requirements or settings.
56+
*
57+
* It is designed to interact with various endpoints to perform HTTP operations such as
58+
* POST requests and Server-Sent Events (SSE) streaming, supporting request and response
59+
* serialization and deserialization for different data types.
60+
*/
61+
val ktorClient: io.ktor.client.HttpClient = baseClient.config(configurer)
62+
63+
override suspend fun <T : Any, R : Any> post(
64+
path: String,
65+
request: T,
66+
requestBodyType: KClass<T>,
67+
responseType: KClass<R>
68+
): R = withContext(Dispatchers.SuitableForIO) {
69+
val response = ktorClient.post(path) {
70+
if (requestBodyType == String::class) {
71+
@Suppress("UNCHECKED_CAST")
72+
setBody(request as String)
73+
} else {
74+
setBody(request, TypeInfo(requestBodyType))
75+
}
76+
}
77+
78+
if (response.status.isSuccess()) {
79+
if (responseType == String::class) {
80+
@Suppress("UNCHECKED_CAST")
81+
response.bodyAsText() as R
82+
} else {
83+
response.body(TypeInfo(responseType))
84+
}
85+
} else {
86+
val errorBody = response.bodyAsText()
87+
logger.error { "Error from $clientName API: ${response.status}: $errorBody" }
88+
error("Error from $clientName API: ${response.status}: $errorBody")
89+
}
90+
}
91+
92+
override fun <T : Any, R : Any> sse(
93+
path: String,
94+
request: T,
95+
requestBodyType: KClass<T>,
96+
dataFilter: (String?) -> Boolean,
97+
decodeStreamingResponse: (String) -> R,
98+
processStreamingChunk: (R) -> String?
99+
): Flow<String> = flow {
100+
@Suppress("TooGenericExceptionCaught")
101+
try {
102+
ktorClient.sse(
103+
urlString = path,
104+
request = {
105+
method = HttpMethod.Post
106+
accept(ContentType.Text.EventStream)
107+
headers {
108+
append(HttpHeaders.CacheControl, "no-cache")
109+
append(HttpHeaders.Connection, "keep-alive")
110+
}
111+
if (requestBodyType == String::class) {
112+
@Suppress("UNCHECKED_CAST")
113+
setBody(request as String)
114+
} else {
115+
setBody(request, TypeInfo(requestBodyType))
116+
}
117+
}
118+
) {
119+
incoming.collect { event ->
120+
event
121+
.takeIf { dataFilter.invoke(it.data) }
122+
?.data?.trim()
123+
?.let(decodeStreamingResponse)
124+
?.let(processStreamingChunk)
125+
?.let { emit(it) }
126+
}
127+
}
128+
} catch (e: SSEClientException) {
129+
e.response?.let { response ->
130+
val body = response.readRawBytes().decodeToString()
131+
logger.error(e) { "Error from $clientName API: ${response.status}: ${e.message}.\nBody:\n$body" }
132+
error("Error from $clientName API: ${response.status}: ${e.message}")
133+
}
134+
} catch (e: Exception) {
135+
logger.error { "Exception during streaming from $clientName: $e" }
136+
error(e.message ?: "Unknown error during streaming from $clientName: $e")
137+
}
138+
}
139+
}
140+
141+
/**
142+
* Creates a new instance of `KoogHttpClient` using a Ktor-based HTTP client for performing HTTP operations.
143+
*
144+
* This function allows configuring the underlying Ktor `HttpClient` through the provided configuration lambda
145+
* and enables enhanced logging, flexibility, and customization in HTTP interactions.
146+
*
147+
* @param clientName The name of the client instance, used for identifying or logging client operations.
148+
* @param logger A `KLogger` instance used for logging client events and errors.
149+
* @param baseClient The base Ktor `HttpClient` instance to be used. Defaults to a new Ktor `HttpClient` instance.
150+
* @param configurer A lambda function to configure the base Ktor `HttpClient` instance. It is applied using
151+
* Ktor’s `HttpClientConfig`.
152+
* @return An instance of `KoogHttpClient` configured with the provided parameters.
153+
*/
154+
@Experimental
155+
@JvmOverloads
156+
public fun KoogHttpClient.Companion.fromKtorClient(
157+
clientName: String,
158+
logger: KLogger,
159+
baseClient: io.ktor.client.HttpClient = io.ktor.client.HttpClient(),
160+
configurer: HttpClientConfig<out HttpClientEngineConfig>.() -> Unit
161+
): KoogHttpClient = KoogKtorHttpClient(clientName, logger, baseClient, configurer)

0 commit comments

Comments
 (0)