Skip to content

Commit 24c9d70

Browse files
committed
streaming fixes and refactoring
1 parent 933ae0e commit 24c9d70

File tree

10 files changed

+511
-131
lines changed

10 files changed

+511
-131
lines changed

README.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,26 @@ fun main() = runBlocking {
104104
client.messages.stream {
105105
+"Write me a poem."
106106
}
107-
.filterIsInstance<ContentBlockDeltaEvent>()
108-
.map { (it.delta as Delta.TextDelta).text }
107+
.filter { it.delta is Event.ContentBlockDelta.Delta.TextDelta }
108+
.map { (it.delta as Event.ContentBlockDelta.Delta.TextDelta).text }
109109
.collect { delta -> print(delta) }
110110
}
111111
```
112112

113+
The `toMessageResponse` function will return the complete `MessageResponse` from the stream:
114+
115+
```kotlin
116+
fun main() = runBlocking {
117+
val client = Anthropic()
118+
val response = client.messages.stream {
119+
+"Write me a poem."
120+
}
121+
.onEach { println("Event: $it") }
122+
.toMessageResponse()
123+
println(response)
124+
}
125+
```
126+
113127
### Using tools
114128

115129
> [!NOTE]

src/commonMain/kotlin/Anthropic.kt

Lines changed: 87 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.xemantic.ai.anthropic
1818

19+
import com.xemantic.ai.anthropic.content.Content
1920
import com.xemantic.ai.anthropic.content.ToolUse
2021
import com.xemantic.ai.anthropic.cost.CostCollector
2122
import com.xemantic.ai.anthropic.cost.CostWithUsage
@@ -25,13 +26,16 @@ import com.xemantic.ai.anthropic.event.Event
2526
import com.xemantic.ai.anthropic.json.anthropicJson
2627
import com.xemantic.ai.anthropic.message.MessageRequest
2728
import com.xemantic.ai.anthropic.message.MessageResponse
29+
import com.xemantic.ai.anthropic.tool.Tool
30+
import com.xemantic.ai.anthropic.usage.Usage
2831
import io.ktor.client.*
2932
import io.ktor.client.call.*
3033
import io.ktor.client.plugins.*
3134
import io.ktor.client.plugins.contentnegotiation.*
3235
import io.ktor.client.plugins.logging.*
3336
import io.ktor.client.plugins.sse.*
3437
import io.ktor.client.request.*
38+
import io.ktor.client.statement.*
3539
import io.ktor.http.*
3640
import io.ktor.serialization.kotlinx.json.*
3741
import kotlinx.coroutines.flow.Flow
@@ -119,19 +123,35 @@ class Anthropic internal constructor(
119123

120124
private val client = HttpClient {
121125

122-
val retriableResponses = setOf<HttpStatusCode>(
126+
val retriableResponses = setOf(
123127
HttpStatusCode.RequestTimeout,
124128
HttpStatusCode.Conflict,
125129
HttpStatusCode.TooManyRequests,
126130
HttpStatusCode.InternalServerError
127131
)
128132

133+
// declaration order matters :(
134+
install(SSE)
135+
136+
HttpResponseValidator {
137+
validateResponse { response ->
138+
if (response.status != HttpStatusCode.OK
139+
&& !(response.status in retriableResponses || response.status.value >= 500)) {
140+
val bytes = response.readRawBytes()
141+
val errorString = String(bytes)
142+
val errorResponse = anthropicJson.decodeFromString<ErrorResponse>(errorString)
143+
throw AnthropicApiException(
144+
error = errorResponse.error,
145+
httpStatusCode = response.status
146+
)
147+
}
148+
}
149+
}
150+
129151
install(ContentNegotiation) {
130152
json(anthropicJson)
131153
}
132154

133-
install(SSE)
134-
135155
if (logLevel != LogLevel.NONE) {
136156
install(Logging) {
137157
level = logLevel
@@ -181,28 +201,16 @@ class Anthropic internal constructor(
181201
}
182202
val response = apiResponse.body<Response>()
183203
when (response) {
184-
is MessageResponse -> response.apply {
185-
resolvedModel = anthropicModel
186-
costCollector += costWithUsage
187-
188-
val toolMap = request.tools?.associateBy { it.name } ?: emptyMap()
189-
content.filterIsInstance<ToolUse>().forEach { toolUse ->
190-
val tool = toolMap[toolUse.name]
191-
if (tool != null) {
192-
toolUse.tool = tool
193-
} else {
194-
// Sometimes it happens that Claude is sending non-defined tool name in tool use
195-
// TODO in the future it should go to the stderr
196-
println("Error!!! Unexpected tool use: ${toolUse.name}")
197-
}
198-
}
204+
is MessageResponse -> {
205+
val toolMap = request.toolMap
206+
response.resolvedModel = response.anthropicModel
207+
response.content.resolveTools(toolMap)
208+
costCollector += response.costWithUsage
199209
}
200-
201-
is ErrorResponse -> throw AnthropicApiException(
210+
is ErrorResponse -> throw AnthropicApiException( // technically, this should be handled by the validator
202211
error = response.error,
203212
httpStatusCode = apiResponse.status
204213
)
205-
206214
else -> throw RuntimeException(
207215
"Unsupported response: $response"
208216
) // should never happen
@@ -221,27 +229,50 @@ class Anthropic internal constructor(
221229
stream = true
222230
}.build()
223231

224-
client.sse(
225-
urlString = "/v1/messages",
226-
request = {
227-
method = HttpMethod.Post
228-
contentType(ContentType.Application.Json)
229-
setBody(request)
230-
}
231-
) {
232-
incoming
233-
.map { it.data }
234-
.filterNotNull()
235-
.map { anthropicJson.decodeFromString<Event>(it) }
236-
.collect { event ->
237-
// TODO we need better way of handling subsequent deltas with usage
238-
if (event is Event.MessageStart) {
239-
// TODO more rules are needed here
240-
//costCollector += usageWithCost
241-
//updateUsage(event.message)
242-
}
243-
emit(event)
232+
try {
233+
client.sse(
234+
urlString = "/v1/messages",
235+
request = {
236+
method = HttpMethod.Post
237+
contentType(ContentType.Application.Json)
238+
setBody(request)
244239
}
240+
) {
241+
var usage = Usage.ZERO
242+
var resolvedModel: AnthropicModel? = null
243+
incoming
244+
.map { it.data }
245+
.filterNotNull()
246+
.map { anthropicJson.decodeFromString<Event>(it) }
247+
.collect { event ->
248+
when (event) {
249+
is Event.MessageDelta -> {
250+
usage += Usage {
251+
inputTokens = 0
252+
outputTokens = event.usage.outputTokens
253+
}
254+
}
255+
is Event.MessageStart -> {
256+
resolvedModel = event.message.anthropicModel
257+
usage += event.message.usage
258+
}
259+
is Event.MessageStop -> {
260+
event.toolMap = request.toolMap
261+
event.resolvedModel = resolvedModel!!
262+
val costWithUsage = CostWithUsage(
263+
cost = resolvedModel.cost * usage,
264+
usage = usage
265+
)
266+
costCollector += costWithUsage
267+
}
268+
else -> { /* do nothing */ }
269+
}
270+
emit(event)
271+
}
272+
}
273+
} catch (e: SSEClientException) {
274+
if (e.cause is AnthropicApiException) throw e.cause!!
275+
throw e
245276
}
246277
}
247278

@@ -270,3 +301,18 @@ class AnthropicConfigException(
270301
) : AnthropicException(
271302
msg, cause
272303
)
304+
305+
internal fun List<Content>.resolveTools(toolMap: Map<String, Tool>) {
306+
filterIsInstance<ToolUse>().forEach { toolUse ->
307+
val tool = toolMap[toolUse.name]
308+
if (tool != null) {
309+
toolUse.tool = tool
310+
} else {
311+
// Sometimes it happens that Claude is sending non-defined tool name in tool use
312+
// TODO in the future it should go to the stderr
313+
println("Error!!! Unexpected tool use: ${toolUse.name}")
314+
}
315+
}
316+
}
317+
318+
private val MessageRequest.toolMap: Map<String, Tool> get() = tools?.associateBy { it.name } ?: emptyMap()

src/commonMain/kotlin/agent/Agent.kt

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,8 @@ import com.xemantic.ai.anthropic.cache.CacheControl
2121
import com.xemantic.ai.anthropic.content.Content
2222
import com.xemantic.ai.anthropic.content.Text
2323
import com.xemantic.ai.anthropic.content.ToolUse
24-
import com.xemantic.ai.anthropic.message.Message
25-
import com.xemantic.ai.anthropic.message.MessageResponse
26-
import com.xemantic.ai.anthropic.message.StopReason
27-
import com.xemantic.ai.anthropic.message.plusAssign
24+
import com.xemantic.ai.anthropic.message.*
2825
import com.xemantic.ai.anthropic.tool.Tool
29-
import com.xemantic.ai.anthropic.util.transformLast
3026

3127
/**
3228
* This is highly experimental work in progress code. Most likely
@@ -64,15 +60,7 @@ class Agent(
6460
do {
6561
response = anthropic.messages.create {
6662
tools = this@Agent.tools
67-
messages = conversation.transformLast { message ->
68-
message.copy {
69-
content = content.transformLast { contentElement ->
70-
contentElement.alterCacheControl(
71-
CacheControl.Ephemeral()
72-
)
73-
}
74-
}
75-
}
63+
messages = conversation.addCacheBreakpoint()
7664
}
7765
conversation += response
7866
if (response.stopReason == StopReason.TOOL_USE) {

0 commit comments

Comments
 (0)