diff --git a/Project.toml b/Project.toml index 70e2479..13216a6 100644 --- a/Project.toml +++ b/Project.toml @@ -7,11 +7,13 @@ version = "0.10.1" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +StreamCallbacks = "c7e14b3c-d701-4e11-a899-b3d7637fd44d" [compat] Dates = "1" HTTP = "1" JSON3 = "1" +StreamCallbacks = "0.1" julia = "1" [extras] diff --git a/src/OpenAI.jl b/src/OpenAI.jl index fef7d6e..9f02fe3 100644 --- a/src/OpenAI.jl +++ b/src/OpenAI.jl @@ -88,48 +88,38 @@ function request_body(url, method; input, headers, query, kwargs...) end function request_body_live(url; method, input, headers, streamcallback, kwargs...) - resp = nothing - - body = sprint() do output - resp = HTTP.open("POST", url, headers) do stream - body = String(take!(input)) - write(stream, body) - - HTTP.closewrite(stream) # indicate we're done writing to the request - - r = HTTP.startread(stream) # start reading the response - isdone = false - - while !eof(stream) || !isdone - # Extract all available messages - masterchunk = String(readavailable(stream)) - - # Split into subchunks on newlines. - # Occasionally, the streaming will append multiple messages together, - # and iterating through each line in turn will make sure that - # streamingcallback is called on each message in turn. - chunks = String.(filter(!isempty, split(masterchunk, "\n"))) + # Create a StreamCallback based on the provided streamcallback + callback = if streamcallback isa Function + # If it's a function, wrap it in a StreamCallback with OpenAIStream flavor + StreamCallback( + out = chunk -> streamcallback(String(chunk.data)), + flavor = OpenAIStream() + ) + elseif streamcallback isa IO + # If it's an IO, create a StreamCallback that writes to it + StreamCallback( + out = chunk -> write(streamcallback, String(chunk.data)), + flavor = OpenAIStream() + ) + elseif streamcallback isa StreamCallback + # If it's already a StreamCallback, use it as is + streamcallback + else + # Default case, create a basic StreamCallback + StreamCallback( + out = chunk -> nothing, + flavor = OpenAIStream() + ) + end - # Iterate through each chunk in turn. - for chunk in chunks - if occursin(chunk, "data: [DONE]") # TODO - maybe don't strip, but instead us a regex in the endswith call - isdone = true - end + # Use StreamCallbacks.jl's streamed_request! + body = String(take!(input)) + resp = streamed_request!(callback, url, headers, body; kwargs...) - # call the callback (if present) on the latest chunk - if !isnothing(streamcallback) - streamcallback(chunk) - end + # Build the response body from the accumulated chunks + response_body = build_response_body(callback) - # append the latest chunk to the body - print(output, chunk) - end - end - HTTP.closeread(stream) - end - end - - return resp, body + return resp, response_body end function status_error(resp, log = nothing) @@ -180,14 +170,23 @@ function _request(api::AbstractString, return if isnothing(streamcallback) OpenAIResponse(resp.status, JSON3.read(body)) else - # assemble the streaming response body into a proper JSON object - lines = split(body, "\n") # split body into lines - - # throw out empty lines, skip "data: [DONE] bits - lines = filter(x -> !isempty(x) && !occursin("[DONE]", x), lines) + # Handle both StreamCallbacks.jl and legacy streaming responses + lines = if streamcallback isa StreamCallback + # StreamCallbacks.jl response is already properly formatted + String.(filter(!isempty, split(body, "\n"))) + else + # Legacy streaming response handling + filter(x -> !isempty(x) && !occursin("[DONE]", x), split(body, "\n")) + end - # read each line, which looks like "data: {}" - parsed = map(line -> JSON3.read(line[6:end]), lines) + # Parse each line as JSON, handling both formats + parsed = map(lines) do line + if startswith(line, "data: ") + JSON3.read(line[6:end]) + else + JSON3.read(line) + end + end OpenAIResponse(resp.status, parsed) end @@ -295,68 +294,91 @@ Create chat - `api_key::String`: OpenAI API key - `model_id::String`: Model id - `messages::Vector`: The chat history so far. -- `streamcallback=nothing`: Function to call on each chunk (delta) of the chat response in streaming mode. +- `streamcallback=nothing`: Callback for streaming responses. Can be: + - A function that takes a String (basic streaming) + - An IO object to write chunks to + - A StreamCallback object for advanced streaming control (see StreamCallbacks.jl) # Keyword Arguments (check the OpenAI docs for the exhaustive list): - `temperature::Float64=1.0`: What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. We generally recommend altering this or top_p but not both. - `top_p::Float64=1.0`: An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. We generally recommend altering this or temperature but not both. !!! note - Do not use `stream=true` option here, instead use the `streamcallback` keyword argument (see the relevant section below). + Do not use `stream=true` option here, instead use the `streamcallback` keyword argument (see the Streaming section below). For more details about the endpoint and additional arguments, visit # HTTP.request keyword arguments: - `http_kwargs::NamedTuple=NamedTuple()`: Keyword arguments to pass to HTTP.request (e. g., `http_kwargs=(connection_timeout=2,)` to set a connection timeout of 2 seconds). -## Example: +## Basic Example: ```julia -julia> CC = create_chat("..........", "gpt-4o-mini", +julia> CC = create_chat("..........", "gpt-4-turbo-preview", [Dict("role" => "user", "content"=> "What is the OpenAI mission?")] ); julia> CC.response.choices[1][:message][:content] -"\n\nThe OpenAI mission is to create safe and beneficial artificial intelligence (AI) that can help humanity achieve its full potential. The organization aims to discover and develop technical approaches to AI that are safe and aligned with human values. OpenAI believes that AI can help to solve some of the world's most pressing problems, such as climate change, disease, inequality, and poverty. The organization is committed to advancing research and development in AI while ensuring that it is used ethically and responsibly." +"OpenAI's mission is to ensure artificial general intelligence benefits all of humanity." ``` -### Streaming +## Streaming + +The package supports three ways to handle streaming responses: -When a function that takes a single `String` as an argument is passed in the `streamcallback` argument, a request will be made in -in streaming mode. The `streamcallback` callback will be called on every line of the streamed response. Here we use a callback -that prints out the current time to demonstrate how different parts of the response are received at different times. +### 1. Basic Streaming (Function Callback) -The response body will reflect the chunked nature of the response, so some reassembly will be required to recover the full -message returned by the API. +Pass a function that takes a String argument to process each chunk: ```julia -julia> CC = create_chat(key, "gpt-4o-mini", - [Dict("role" => "user", "content"=> "What continent is New York in? Two word answer.")], - streamcallback = x->println(Dates.now())); - 2023-03-27T12:34:50.428 -2023-03-27T12:34:50.524 -2023-03-27T12:34:50.524 -2023-03-27T12:34:50.524 -2023-03-27T12:34:50.545 -2023-03-27T12:34:50.556 -2023-03-27T12:34:50.556 - -julia> map(r->r["choices"][1]["delta"], CC.response) -5-element Vector{JSON3.Object{Base.CodeUnits{UInt8, SubString{String}}, SubArray{UInt64, 1, Vector{UInt64}, Tuple{UnitRange{Int64}}, true}}}: - { - "role": "assistant" -} - { - "content": "North" -} - { - "content": " America" -} - { - "content": "." -} - {} +julia> CC = create_chat(key, "gpt-4-turbo-preview", + [Dict("role" => "user", "content"=> "Count to 5 slowly")], + streamcallback = chunk -> println("Received: ", chunk) +); +Received: One +Received: , two +Received: , three +Received: , four +Received: , five ``` + +### 2. IO Streaming + +Stream directly to an IO object: + +```julia +julia> output_buffer = IOBuffer() +julia> CC = create_chat(key, "gpt-4-turbo-preview", + [Dict("role" => "user", "content"=> "Say hello")], + streamcallback = output_buffer +); +julia> String(take!(output_buffer)) +"Hello! How can I help you today?" +``` + +### 3. Advanced Streaming with StreamCallbacks.jl + +For advanced streaming control, use StreamCallbacks.jl's StreamCallback: + +```julia +using StreamCallbacks + +# Custom chunk processing +callback = StreamCallback( + # Process each chunk + out = chunk -> println("Token: ", chunk.content), + # Use OpenAI-specific stream parsing + flavor = OpenAIStream() +) + +CC = create_chat(key, "gpt-4-turbo-preview", + [Dict("role" => "user", "content"=> "Count to 3")], + streamcallback = callback +) +``` + +For advanced streaming features like custom stream parsing, specialized sinks, or detailed chunk inspection, +refer to the [StreamCallbacks.jl](https://github.com/svilupp/StreamCallbacks.jl) package documentation. """ function create_chat(api_key::String, model_id::String,