Skip to content

Commit ac42f4c

Browse files
committed
working "Downloads.jl" request
1 parent cb39da9 commit ac42f4c

File tree

4 files changed

+267
-8
lines changed

4 files changed

+267
-8
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"julia.environmentPath": "/home/manuelbb/.julia/dev/gRPCClient"
3+
}

src/curl.jl

Lines changed: 252 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,50 @@ function buffer_send_data(input::Channel{T}) where T <: ProtoType
4242
end
4343
=#
4444

45+
46+
function to_delimited_message_buffer(msg, max_message_length::Int)
47+
iob = IOBuffer()
48+
limitiob = LimitIO(iob, max_message_length)
49+
write(limitiob, UInt8(0)) # compression
50+
write(limitiob, hton(UInt32(0))) # message length (placeholder)
51+
data_len = writeproto(limitiob, msg) # message bytes
52+
53+
seek(iob, 1) # seek out the message length placeholder
54+
write(iob, hton(UInt32(data_len))) # fill the message length
55+
seek(iob, 0)
56+
return iob
57+
end
58+
59+
function _channel_to_bytes( input :: Channel, max_send_message_length )
60+
while true
61+
if isready(input)
62+
@info "Parsing input."
63+
input_buffer = to_delimited_message_buffer(
64+
take!(input), max_send_message_length)
65+
@info "Done parsing."
66+
return input_buffer
67+
end
68+
end
69+
end
70+
71+
function upload_data(easy::Curl.Easy, input::IO)
72+
while true
73+
yield()
74+
data = eof(input) ? nothing : readavailable(input)
75+
easy.input === nothing && break
76+
easy.input = data
77+
Curl.curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT)
78+
wait(easy.ready)
79+
easy.input === nothing && break
80+
easy.ready = Threads.Event()
81+
end
82+
@info "Done Uploading"
83+
end
84+
85+
function send_bytes(easy::Curl.Easy, msg_io)
86+
return upload_data( easy, msg_io )
87+
end
88+
4589
function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::Int) where T <: ProtoType
4690
while true
4791
yield()
@@ -87,6 +131,18 @@ function grpc_headers(; timeout::Real=Inf)
87131
headers
88132
end
89133

134+
function grpc_headers_dict(; timeout::Real=Inf)
135+
headers = Dict(
136+
"User-Agent" => "$(Curl.USER_AGENT)",
137+
"Content-Type" => "application/grpc+proto",
138+
"te" => "trailers"
139+
)
140+
if timeout !== Inf
141+
headers["grpc-timeout"] = "$(grpc_timeout_header_val(timeout))"
142+
end
143+
return headers
144+
end
145+
90146
function grpc_request_header(request_timeout::Real)
91147
if request_timeout == Inf
92148
GRPC_STATIC_HEADERS[]
@@ -127,6 +183,7 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length:
127183
datalen = UInt32(0)
128184
need_more = true
129185
for buf in easy.output
186+
@info "fol loop output"
130187
write(iob, buf)
131188
need_more = false
132189
while !need_more
@@ -160,6 +217,31 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length:
160217
close(output)
161218
end
162219

220+
function _bytes_to_channel( iob, output :: Channel{T}; max_recv_message_length = Inf) where T
221+
@info "Reading back proto response."
222+
seek(iob, 0)
223+
224+
compressed = read(iob, UInt8) # compression
225+
datalen = ntoh(read(iob, UInt32)) # message length
226+
@info "1"
227+
if datalen > max_recv_message_length
228+
@warn "error again"
229+
throw(gRPCMessageTooLargeException(max_recv_message_length, datalen))
230+
end
231+
232+
@info "2"
233+
234+
if bytesavailable(iob) >= datalen
235+
@show isopen(output)
236+
@show decoded_obj = readproto(iob, T())
237+
put!(output, decoded_obj) # decode message bytes
238+
end
239+
240+
@info "3"
241+
#close(output)
242+
return nothing
243+
end
244+
163245
function set_low_speed_limits(easy::Curl.Easy, low_speed_limit, low_speed_time)
164246
low_speed_limit >= 0 ||
165247
throw(ArgumentError("`low_speed_limit` must be non-negative, got $(low_speed_limit)."))
@@ -187,7 +269,72 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real)
187269
end
188270
end
189271

190-
function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
272+
function grpc_request(
273+
downloader::Downloader,
274+
url::String,
275+
input_channel::Channel{T1},
276+
output_channel::Channel{T2};
277+
maxage::Clong = typemax(Clong),
278+
keepalive::Clong = 60,
279+
negotiation::Symbol = :http2_prior_knowledge,
280+
revocation::Bool = true,
281+
request_timeout::Real = Inf,
282+
connect_timeout::Real = 0,
283+
max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH,
284+
max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH,
285+
verbose::Bool = false,
286+
low_speed_limit::Int = 0,
287+
low_speed_time::Int = 0)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType}
288+
289+
headers = grpc_headers_dict(; timeout = request_timeout )
290+
input = _channel_to_bytes(input_channel, max_send_message_length)
291+
output = IOBuffer()
292+
293+
local req_res
294+
Curl.with_handle(
295+
easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do _easy
296+
req_res = request(
297+
url;
298+
input,
299+
output,
300+
method = "POST",
301+
headers,
302+
timeout = request_timeout,
303+
verbose,
304+
downloader,
305+
_easy
306+
)
307+
308+
seek(output, 0)
309+
@show output_channel
310+
_bytes_to_channel( output, output_channel; max_recv_message_length )
311+
312+
# parse the grpc headers
313+
@show grpc_status = StatusCode.OK.code
314+
grpc_message = ""
315+
for hdr in req_res.headers
316+
if startswith(hdr.first, "grpc-status")
317+
grpc_status = parse(Int, strip(hdr.second))
318+
elseif startswith(hdr.first, "grpc-message")
319+
grpc_message = string(strip(hdr.second))
320+
end
321+
end
322+
if (_easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
323+
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
324+
end
325+
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
326+
grpc_message = grpc_status_message(grpc_status)
327+
end
328+
329+
if ((_easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code))
330+
gRPCStatus(true, grpc_status, "")
331+
else
332+
gRPCStatus(false, grpc_status, isempty(grpc_message) ? Curl.get_curl_errstr(_easy) : grpc_message)
333+
end
334+
end
335+
end
336+
337+
function _grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
191338
maxage::Clong = typemax(Clong),
192339
keepalive::Clong = 60,
193340
negotiation::Symbol = :http2_prior_knowledge,
@@ -213,6 +360,7 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
213360
Curl.add_handle(downloader.multi, easy)
214361

215362
function cleanup()
363+
@info "Cleaning up."
216364
Curl.remove_handle(downloader.multi, easy)
217365
# though remove_handle sets easy.handle to C_NULL, it does not close output and progress channels
218366
# we need to close them here to unblock anything waiting on them
@@ -253,9 +401,12 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
253401
end
254402
else
255403
try
404+
msg_io = _channel_to_bytes(input, max_recv_message_length)
405+
256406
Base.Experimental.@sync begin
257407
@async recv_data(easy, output, max_recv_message_length)
258-
@async send_data(easy, input, max_send_message_length)
408+
#@async send_data(easy, input, max_send_message_length)
409+
@async send_bytes(easy, msg_io)
259410
end
260411
finally # ensure handle is removed
261412
cleanup()
@@ -288,3 +439,102 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
288439
end
289440
end
290441
end
442+
443+
444+
function request(
445+
url :: AbstractString;
446+
input :: Union{Downloads.ArgRead, Nothing} = nothing,
447+
output :: Union{Downloads.ArgWrite, Nothing} = nothing,
448+
method :: Union{AbstractString, Nothing} = nothing,
449+
headers :: Union{AbstractVector, AbstractDict} = Pair{String,String}[],
450+
timeout :: Real = Inf,
451+
progress :: Union{Function, Nothing} = nothing,
452+
verbose :: Bool = false,
453+
throw :: Bool = true,
454+
downloader :: Union{Downloads.Downloader, Nothing} = nothing,
455+
_easy :: Union{Curl.Easy, Nothing} = nothing,
456+
) :: Union{Response, RequestError}
457+
lock(Downloads.DOWNLOAD_LOCK) do
458+
yield() # let other downloads finish
459+
downloader isa Downloads.Downloader && return
460+
while true
461+
downloader = Downloads.DOWNLOADER[]
462+
downloader isa Downloads.Downloader && return
463+
Downloads.DOWNLOADER[] = Downloads.Downloader()
464+
end
465+
end
466+
local response
467+
have_input = input !== nothing
468+
have_output = output !== nothing
469+
input = Downloads.something(input, devnull)
470+
output = Downloads.something(output, devnull)
471+
input_size = Downloads.arg_read_size(input)
472+
progress = Downloads.p_func(progress, input, output)
473+
Downloads.arg_read(input) do input
474+
Downloads.arg_write(output) do output
475+
Downloads.with_handle( isnothing(_easy) ? Curl.Easy() : _easy ) do easy
476+
# setup the request
477+
Downloads.set_url(easy, url)
478+
Downloads.set_timeout(easy, timeout)
479+
Downloads.set_verbose(easy, verbose)
480+
Downloads.add_headers(easy, headers)
481+
482+
# libcurl does not set the default header reliably so set it
483+
# explicitly unless user has specified it, xref
484+
# https://github.com/JuliaLang/Pkg.jl/pull/2357
485+
if !any(kv -> lowercase(kv[1]) == "user-agent", headers)
486+
Curl.add_header(easy, "User-Agent", Curl.USER_AGENT)
487+
end
488+
489+
if have_input
490+
Downloads.enable_upload(easy)
491+
if input_size !== nothing
492+
Downloads.set_upload_size(easy, input_size)
493+
end
494+
if applicable(seek, input, 0)
495+
Downloads.set_seeker(easy) do offset
496+
seek(input, Int(offset))
497+
end
498+
end
499+
else
500+
Downloads.set_body(easy, have_output)
501+
end
502+
method !== nothing && Downloads.set_method(easy, method)
503+
progress !== nothing && Downloads.enable_progress(easy)
504+
Downloads.set_ca_roots(downloader, easy)
505+
info = (url = url, method = method, headers = headers)
506+
Downloads.easy_hook(downloader, easy, info)
507+
508+
# do the request
509+
Downloads.add_handle(downloader.multi, easy)
510+
try # ensure handle is removed
511+
Base.Experimental.@sync begin
512+
if have_input
513+
@async Downloads.upload_data(easy, input)
514+
end
515+
@async for buf in easy.output
516+
write(output, buf)
517+
end
518+
if progress !== nothing
519+
@async for prog in easy.progress
520+
progress(prog...)
521+
end
522+
end
523+
end
524+
finally
525+
@info "cleining up"
526+
Downloads.remove_handle(downloader.multi, easy)
527+
end
528+
529+
# return the response or throw an error
530+
response = Downloads.Response(Downloads.get_response_info(easy)...)
531+
easy.code == Curl.CURLE_OK && return response
532+
message = Downloads.get_curl_errstr(easy)
533+
response = Downloads.RequestError(url, easy.code, message, response)
534+
throw && Base.throw(response)
535+
end
536+
end
537+
end
538+
@show response
539+
return response
540+
end

src/grpc.jl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,15 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
187187
end
188188
call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T}) where T <: ProtoType = call_method(channel, service, method, controller, input, get_response_type(method))
189189
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{Channel{T2}}) where {T1 <: ProtoType, T2 <: ProtoType}
190-
call_method(channel, service, method, controller, input, Channel{T2}())
190+
call_method(channel, service, method, controller, input, Channel{T2}(1))
191191
end
192192
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
193-
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}())
193+
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}(1))
194194
try
195-
take!(outchannel), status_future
196-
catch ex
195+
out = take!(outchannel)
196+
#close(outchannel)
197+
out, status_future
198+
catch exoutput_ch
197199
gRPCCheck(status_future) # check for core issue
198200
if isa(ex, InvalidStateException)
199201
throw(gRPCServiceCallException("Server closed connection without any response"))
@@ -204,7 +206,7 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
204206
end
205207
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
206208
url = string(channel.baseurl, "/", service.name, "/", method.name)
207-
status_future = @async grpc_request(channel.downloader, url, input, outchannel;
209+
status_future = grpc_request(channel.downloader, url, input, outchannel;
208210
maxage = controller.maxage,
209211
keepalive = controller.keepalive,
210212
negotiation = controller.negotiation,
@@ -217,5 +219,6 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
217219
low_speed_limit = controller.low_speed_limit,
218220
low_speed_time = controller.low_speed_time,
219221
)
222+
@show status_future
220223
outchannel, status_future
221224
end

src/limitio.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ end
88
LimitIO(io::IO, maxbytes) = LimitIO(io, maxbytes, 0)
99

1010
function Base.write(io::LimitIO, v::UInt8)
11-
io.n > io.maxbytes && throw(gRPCMessageTooLargeException(io.maxbytes, io.n))
11+
if io.n > io.maxbytes
12+
@warn "message to long"
13+
throw(gRPCMessageTooLargeException(io.maxbytes, io.n))
14+
end
1215
nincr = write(io.io, v)
1316
io.n += nincr
1417
nincr

0 commit comments

Comments
 (0)