Skip to content

Commit 6c9f5f4

Browse files
committed
Allow for Julia to stream. Bump version. Enhance error handling.
1 parent dff6f7a commit 6c9f5f4

File tree

2 files changed

+222
-146
lines changed

2 files changed

+222
-146
lines changed

julia/Bytez/Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "Bytez"
22
uuid = "aeec0939-83cf-4f14-8c49-2dc7569399f2"
3-
version = "0.0.6"
3+
version = "0.0.7"
44
authors = ["Bytez <[email protected]>"]
55
license = "MIT"
66

julia/Bytez/src/Bytez.jl

Lines changed: 221 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -1,150 +1,226 @@
11
module Bytez
2-
using HTTP
3-
using JSON3
4-
include("types.jl")
5-
6-
global API_KEY = ""
7-
8-
struct List
9-
models::Function
10-
instances::Function
11-
List() = new(list_models, list_instances)
12-
end
13-
struct Client
14-
key::String
15-
list::List
16-
model::Function
17-
end
18-
mutable struct Model
19-
# props
20-
id::String
21-
timeout::Int
22-
concurrency::Int
23-
# methods
24-
status::Function
25-
start::Function
26-
stop::Function
27-
run::Function
28-
load::Function
29-
# custom constructor
30-
function Model(model_id::String, concurrency::Int, timeout::Int)
31-
body_with_model_id = Dict("model" => model_id)
32-
json_body = JSON3.write(body_with_model_id)
33-
json_body_model_start = JSON3.write(
34-
merge(
35-
body_with_model_id,
36-
Dict("concurrency" => concurrency, "timeout" => timeout)
37-
)
38-
)
39-
40-
model = new(
41-
model_id,
42-
timeout,
43-
concurrency,
44-
() -> status(json_body),
45-
() -> start(json_body_model_start),
46-
() -> stop(json_body),
47-
(input, params = Dict()) -> run(input, body_with_model_id, params),
48-
() -> load(json_body_model_start)
49-
)
50-
51-
return model
52-
end
53-
end
54-
55-
function request(path::String, body::String = "")
56-
response = HTTP.request(
57-
body == "" ? "GET" : "POST",
58-
"$HOST/$path",
59-
status_exception = false,
60-
body = body,
61-
headers = Dict(
62-
"Authorization" => "Key $API_KEY",
63-
"Content-type" => "application/json"
64-
)
65-
)
66-
67-
return JSON3.read(response.body)
68-
end
69-
#
70-
# model methods
71-
#
72-
const status = (body::String) -> request("model/status", body)
73-
const start = (body::String) -> request("model/load", body)
74-
const stop = (body::String) -> request("model/delete", body)
75-
76-
function run(input::Any, body_with_model_id::Dict, params::Dict)
77-
body = merge(
78-
body_with_model_id,
79-
Dict("stream"=> false),
80-
Dict("params"=> params)
81-
)
82-
83-
# Check if input is a dictionary
84-
if isa(input, Dict)
85-
body = merge(body, input)
86-
else
87-
body["input"] = input
88-
end
89-
90-
return request("model/run", JSON3.write(body))
91-
end
92-
function load(body::String)
93-
json = nothing
94-
lastStatus = ""
95-
status = ""
96-
97-
try
98-
json = start(body)
99-
catch
100-
json = request("model/status", body)
101-
finally
102-
status = json["status"]
103-
end
104-
105-
while status != "FAILED" && status != "RUNNING"
106-
json = request("model/status", body)
107-
status = json["status"]
108-
109-
if status != "RUNNING"
110-
if status != lastStatus
111-
lastStatus = status;
112-
println(status);
113-
end
114-
115-
sleep(5)
116-
end
117-
end
118-
end
119-
#
120-
# list functions
121-
#
122-
const list_models = () -> request("model/list")
2+
using HTTP
3+
using JSON3
4+
include("types.jl")
5+
6+
global API_KEY = ""
7+
8+
const MODEL_LOAD_TIMEOUT_MINUTES = 15 # Number of minutes for timeout
9+
const MODEL_LOAD_TIMEOUT_MINUTES_AS_SECONDS = MODEL_LOAD_TIMEOUT_MINUTES * 60
10+
11+
struct HttpError <: Exception
12+
message::String
13+
http_status::Int
14+
end
15+
16+
17+
struct List
18+
models::Function
19+
instances::Function
20+
List() = new(list_models, list_instances)
21+
end
22+
struct Client
23+
key::String
24+
list::List
25+
model::Function
26+
end
27+
mutable struct Model
28+
# props
29+
id::String
30+
timeout::Int
31+
concurrency::Int
32+
# methods
33+
start::Function
34+
load::Function
35+
stop::Function
36+
status::Function
37+
run::Function
38+
39+
# custom constructor
40+
function Model(model_id::String, concurrency::Int, timeout::Int)
41+
model_id_dict = Dict("model" => model_id)
42+
start_dict = merge(
43+
model_id_dict,
44+
Dict("concurrency" => concurrency, "timeout" => timeout))
45+
46+
model = new(
47+
model_id,
48+
timeout,
49+
concurrency,
50+
() -> start(start_dict),
51+
() -> load(start_dict),
52+
() -> stop(model_id_dict),
53+
() -> status(model_id_dict),
54+
(input, params = Dict()) -> run(input, model_id_dict, params),
55+
)
56+
57+
return model
58+
end
59+
end
60+
61+
function request(path::String, body::Union{Dict, Nothing} = nothing)
62+
is_streaming = get(body, "stream", false)
63+
data_channel = Channel{Any}(Inf)
64+
status_code_channel = Channel{Any}(Inf)
65+
66+
@async begin
67+
try
68+
response = HTTP.open(
69+
body === nothing ? "GET" : "POST",
70+
"$HOST/$path",
71+
status_exception = false,
72+
body = JSON3.write(body),
73+
headers = Dict(
74+
"Authorization" => "Key $API_KEY",
75+
"Content-type" => "application/json",
76+
),
77+
) do http_io
78+
79+
write(http_io, JSON3.write(body))
80+
startread(http_io)
81+
82+
# Stream the http_io in chunks and write each to the channel
83+
while !eof(http_io)
84+
chunk = String(readavailable(http_io)) # Read available chunk as a string
85+
86+
put!(data_channel, chunk) # Write chunk to channel
87+
end
88+
end
89+
# always make the status code available via a channel for non streaming calls
90+
status_code = response.status
91+
put!(status_code_channel, status_code)
92+
catch error
93+
println(error)
94+
finally
95+
close(data_channel)
96+
close(status_code_channel)
97+
end
98+
end
99+
100+
# just return the data channel directly if streaming
101+
if is_streaming
102+
return data_channel
103+
end
104+
105+
result = JSON3.read(String(take!(data_channel)))
106+
107+
error = get(result, "error", nothing)
108+
109+
if error !== nothing
110+
http_status = take!(status_code_channel)
111+
http_error = HttpError(error, http_status)
112+
113+
return Dict("error" => http_error)
114+
end
115+
116+
return result
117+
end
118+
#
119+
# model methods
120+
#
121+
const status = (body::Dict) -> request("model/status", body)
122+
const start = (body::Dict) -> request("model/load", body)
123+
const stop = (body::Dict) -> request("model/delete", body)
124+
125+
function run(input::Any, model_id_dict::Dict, options::Dict)
126+
body = merge(
127+
model_id_dict,
128+
Dict("stream" => get(options, "stream", false)),
129+
Dict("params" => get(options, "params", Dict())),
130+
)
131+
132+
# Check if input is a dictionary
133+
if isa(input, Dict)
134+
body = merge(body, input)
135+
else
136+
body["input"] = input
137+
end
138+
139+
results = request("model/run", body)
140+
141+
if get(body, "stream", false) === true
142+
return results
143+
end
144+
145+
error = get(results, "error", nothing)
146+
147+
if error !== nothing
148+
throw(Exception(error))
149+
end
150+
151+
return results
152+
end
153+
154+
function load(body::Dict)
155+
json = start(body)
156+
error = get(json, "error", nothing)
157+
158+
if error !== nothing
159+
# model is already loaded
160+
if error.http_status === 409
161+
return
162+
end
163+
# // We allow 429's to proceed, that means that a loading operation is already in progress
164+
if error.http_status !== 429 || occursin("credits", error.message)
165+
throw(error)
166+
end
167+
168+
end
169+
170+
time_to_timeout = time() * 1000 + MODEL_LOAD_TIMEOUT_MINUTES_AS_SECONDS
171+
172+
status = "UNSET"
173+
while time() < time_to_timeout
174+
json = request("model/status", body)
175+
new_status = json["status"]
176+
error = get(json, "error", nothing)
177+
178+
if status !== new_status
179+
status = new_status
180+
println(status)
181+
end
182+
183+
if status === "RUNNING"
184+
return
185+
end
186+
187+
if status === "FAILED"
188+
exception = Exception(error)
189+
throw(exception)
190+
end
191+
192+
sleep(5)
193+
end
194+
end
195+
#
196+
# list functions
197+
#
198+
const list_models = () -> request("model/list")
123199
# function list_models(task::Union{Task,Nothing} = nothing)
124200
# return request(task === nothing ? "model/list" : "model/list?task=$task")
125201
# end
126-
const list_instances = () -> request("model/instances")
127-
#
128-
# bytez
129-
#
130-
function init(key::String, dev::Bool = false)
131-
global API_KEY = key
132-
133-
global HOST
134-
135-
if dev
136-
HOST = "http://localhost:8080"
137-
else
138-
HOST = "https://bytez.com"
139-
end
140-
141-
function create_new_model(model_id::String; concurrency::Int = 1, timeout::Int = 300)
142-
return Model(model_id, concurrency, timeout)
143-
end
144-
145-
return Client(key, List(), create_new_model)
146-
end
147-
148-
149-
export init
202+
const list_instances = () -> request("model/instances")
203+
#
204+
# bytez
205+
#
206+
function init(key::String, dev::Bool = false)
207+
global API_KEY = key
208+
209+
global HOST
210+
211+
if dev
212+
HOST = "http://localhost:8080"
213+
else
214+
HOST = "https://bytez.com"
215+
end
216+
217+
function create_new_model(model_id::String; concurrency::Int = 1, timeout::Int = 300)
218+
return Model(model_id, concurrency, timeout)
219+
end
220+
221+
return Client(key, List(), create_new_model)
222+
end
223+
224+
225+
export init
150226
end

0 commit comments

Comments
 (0)