Skip to content

Commit

Permalink
Revert "working "Downloads.jl" request"
Browse files Browse the repository at this point in the history
This reverts commit ac42f4c.
  • Loading branch information
manuelbb-upb committed May 6, 2022
1 parent ac42f4c commit d648012
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 267 deletions.
3 changes: 0 additions & 3 deletions .vscode/settings.json

This file was deleted.

254 changes: 2 additions & 252 deletions src/curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,50 +42,6 @@ function buffer_send_data(input::Channel{T}) where T <: ProtoType
end
=#


function to_delimited_message_buffer(msg, max_message_length::Int)
iob = IOBuffer()
limitiob = LimitIO(iob, max_message_length)
write(limitiob, UInt8(0)) # compression
write(limitiob, hton(UInt32(0))) # message length (placeholder)
data_len = writeproto(limitiob, msg) # message bytes

seek(iob, 1) # seek out the message length placeholder
write(iob, hton(UInt32(data_len))) # fill the message length
seek(iob, 0)
return iob
end

function _channel_to_bytes( input :: Channel, max_send_message_length )
while true
if isready(input)
@info "Parsing input."
input_buffer = to_delimited_message_buffer(
take!(input), max_send_message_length)
@info "Done parsing."
return input_buffer
end
end
end

function upload_data(easy::Curl.Easy, input::IO)
while true
yield()
data = eof(input) ? nothing : readavailable(input)
easy.input === nothing && break
easy.input = data
Curl.curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT)
wait(easy.ready)
easy.input === nothing && break
easy.ready = Threads.Event()
end
@info "Done Uploading"
end

function send_bytes(easy::Curl.Easy, msg_io)
return upload_data( easy, msg_io )
end

function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::Int) where T <: ProtoType
while true
yield()
Expand Down Expand Up @@ -131,18 +87,6 @@ function grpc_headers(; timeout::Real=Inf)
headers
end

function grpc_headers_dict(; timeout::Real=Inf)
headers = Dict(
"User-Agent" => "$(Curl.USER_AGENT)",
"Content-Type" => "application/grpc+proto",
"te" => "trailers"
)
if timeout !== Inf
headers["grpc-timeout"] = "$(grpc_timeout_header_val(timeout))"
end
return headers
end

function grpc_request_header(request_timeout::Real)
if request_timeout == Inf
GRPC_STATIC_HEADERS[]
Expand Down Expand Up @@ -183,7 +127,6 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length:
datalen = UInt32(0)
need_more = true
for buf in easy.output
@info "fol loop output"
write(iob, buf)
need_more = false
while !need_more
Expand Down Expand Up @@ -217,31 +160,6 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length:
close(output)
end

function _bytes_to_channel( iob, output :: Channel{T}; max_recv_message_length = Inf) where T
@info "Reading back proto response."
seek(iob, 0)

compressed = read(iob, UInt8) # compression
datalen = ntoh(read(iob, UInt32)) # message length
@info "1"
if datalen > max_recv_message_length
@warn "error again"
throw(gRPCMessageTooLargeException(max_recv_message_length, datalen))
end

@info "2"

if bytesavailable(iob) >= datalen
@show isopen(output)
@show decoded_obj = readproto(iob, T())
put!(output, decoded_obj) # decode message bytes
end

@info "3"
#close(output)
return nothing
end

function set_low_speed_limits(easy::Curl.Easy, low_speed_limit, low_speed_time)
low_speed_limit >= 0 ||
throw(ArgumentError("`low_speed_limit` must be non-negative, got $(low_speed_limit)."))
Expand Down Expand Up @@ -269,72 +187,7 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real)
end
end

function grpc_request(
downloader::Downloader,
url::String,
input_channel::Channel{T1},
output_channel::Channel{T2};
maxage::Clong = typemax(Clong),
keepalive::Clong = 60,
negotiation::Symbol = :http2_prior_knowledge,
revocation::Bool = true,
request_timeout::Real = Inf,
connect_timeout::Real = 0,
max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH,
max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH,
verbose::Bool = false,
low_speed_limit::Int = 0,
low_speed_time::Int = 0)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType}

headers = grpc_headers_dict(; timeout = request_timeout )
input = _channel_to_bytes(input_channel, max_send_message_length)
output = IOBuffer()

local req_res
Curl.with_handle(
easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do _easy
req_res = request(
url;
input,
output,
method = "POST",
headers,
timeout = request_timeout,
verbose,
downloader,
_easy
)

seek(output, 0)
@show output_channel
_bytes_to_channel( output, output_channel; max_recv_message_length )

# parse the grpc headers
@show grpc_status = StatusCode.OK.code
grpc_message = ""
for hdr in req_res.headers
if startswith(hdr.first, "grpc-status")
grpc_status = parse(Int, strip(hdr.second))
elseif startswith(hdr.first, "grpc-message")
grpc_message = string(strip(hdr.second))
end
end
if (_easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
end
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
grpc_message = grpc_status_message(grpc_status)
end

if ((_easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code))
gRPCStatus(true, grpc_status, "")
else
gRPCStatus(false, grpc_status, isempty(grpc_message) ? Curl.get_curl_errstr(_easy) : grpc_message)
end
end
end

function _grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2};
maxage::Clong = typemax(Clong),
keepalive::Clong = 60,
negotiation::Symbol = :http2_prior_knowledge,
Expand All @@ -360,7 +213,6 @@ function _grpc_request(downloader::Downloader, url::String, input::Channel{T1},
Curl.add_handle(downloader.multi, easy)

function cleanup()
@info "Cleaning up."
Curl.remove_handle(downloader.multi, easy)
# though remove_handle sets easy.handle to C_NULL, it does not close output and progress channels
# we need to close them here to unblock anything waiting on them
Expand Down Expand Up @@ -401,12 +253,9 @@ function _grpc_request(downloader::Downloader, url::String, input::Channel{T1},
end
else
try
msg_io = _channel_to_bytes(input, max_recv_message_length)

Base.Experimental.@sync begin
@async recv_data(easy, output, max_recv_message_length)
#@async send_data(easy, input, max_send_message_length)
@async send_bytes(easy, msg_io)
@async send_data(easy, input, max_send_message_length)
end
finally # ensure handle is removed
cleanup()
Expand Down Expand Up @@ -439,102 +288,3 @@ function _grpc_request(downloader::Downloader, url::String, input::Channel{T1},
end
end
end


function request(
url :: AbstractString;
input :: Union{Downloads.ArgRead, Nothing} = nothing,
output :: Union{Downloads.ArgWrite, Nothing} = nothing,
method :: Union{AbstractString, Nothing} = nothing,
headers :: Union{AbstractVector, AbstractDict} = Pair{String,String}[],
timeout :: Real = Inf,
progress :: Union{Function, Nothing} = nothing,
verbose :: Bool = false,
throw :: Bool = true,
downloader :: Union{Downloads.Downloader, Nothing} = nothing,
_easy :: Union{Curl.Easy, Nothing} = nothing,
) :: Union{Response, RequestError}
lock(Downloads.DOWNLOAD_LOCK) do
yield() # let other downloads finish
downloader isa Downloads.Downloader && return
while true
downloader = Downloads.DOWNLOADER[]
downloader isa Downloads.Downloader && return
Downloads.DOWNLOADER[] = Downloads.Downloader()
end
end
local response
have_input = input !== nothing
have_output = output !== nothing
input = Downloads.something(input, devnull)
output = Downloads.something(output, devnull)
input_size = Downloads.arg_read_size(input)
progress = Downloads.p_func(progress, input, output)
Downloads.arg_read(input) do input
Downloads.arg_write(output) do output
Downloads.with_handle( isnothing(_easy) ? Curl.Easy() : _easy ) do easy
# setup the request
Downloads.set_url(easy, url)
Downloads.set_timeout(easy, timeout)
Downloads.set_verbose(easy, verbose)
Downloads.add_headers(easy, headers)

# libcurl does not set the default header reliably so set it
# explicitly unless user has specified it, xref
# https://github.com/JuliaLang/Pkg.jl/pull/2357
if !any(kv -> lowercase(kv[1]) == "user-agent", headers)
Curl.add_header(easy, "User-Agent", Curl.USER_AGENT)
end

if have_input
Downloads.enable_upload(easy)
if input_size !== nothing
Downloads.set_upload_size(easy, input_size)
end
if applicable(seek, input, 0)
Downloads.set_seeker(easy) do offset
seek(input, Int(offset))
end
end
else
Downloads.set_body(easy, have_output)
end
method !== nothing && Downloads.set_method(easy, method)
progress !== nothing && Downloads.enable_progress(easy)
Downloads.set_ca_roots(downloader, easy)
info = (url = url, method = method, headers = headers)
Downloads.easy_hook(downloader, easy, info)

# do the request
Downloads.add_handle(downloader.multi, easy)
try # ensure handle is removed
Base.Experimental.@sync begin
if have_input
@async Downloads.upload_data(easy, input)
end
@async for buf in easy.output
write(output, buf)
end
if progress !== nothing
@async for prog in easy.progress
progress(prog...)
end
end
end
finally
@info "cleining up"
Downloads.remove_handle(downloader.multi, easy)
end

# return the response or throw an error
response = Downloads.Response(Downloads.get_response_info(easy)...)
easy.code == Curl.CURLE_OK && return response
message = Downloads.get_curl_errstr(easy)
response = Downloads.RequestError(url, easy.code, message, response)
throw && Base.throw(response)
end
end
end
@show response
return response
end
13 changes: 5 additions & 8 deletions src/grpc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,13 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
end
call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T}) where T <: ProtoType = call_method(channel, service, method, controller, input, get_response_type(method))
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{Channel{T2}}) where {T1 <: ProtoType, T2 <: ProtoType}
call_method(channel, service, method, controller, input, Channel{T2}(1))
call_method(channel, service, method, controller, input, Channel{T2}())
end
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}(1))
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}())
try
out = take!(outchannel)
#close(outchannel)
out, status_future
catch exoutput_ch
take!(outchannel), status_future
catch ex
gRPCCheck(status_future) # check for core issue
if isa(ex, InvalidStateException)
throw(gRPCServiceCallException("Server closed connection without any response"))
Expand All @@ -206,7 +204,7 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
end
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
url = string(channel.baseurl, "/", service.name, "/", method.name)
status_future = grpc_request(channel.downloader, url, input, outchannel;
status_future = @async grpc_request(channel.downloader, url, input, outchannel;
maxage = controller.maxage,
keepalive = controller.keepalive,
negotiation = controller.negotiation,
Expand All @@ -219,6 +217,5 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
low_speed_limit = controller.low_speed_limit,
low_speed_time = controller.low_speed_time,
)
@show status_future
outchannel, status_future
end
5 changes: 1 addition & 4 deletions src/limitio.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ end
LimitIO(io::IO, maxbytes) = LimitIO(io, maxbytes, 0)

function Base.write(io::LimitIO, v::UInt8)
if io.n > io.maxbytes
@warn "message to long"
throw(gRPCMessageTooLargeException(io.maxbytes, io.n))
end
io.n > io.maxbytes && throw(gRPCMessageTooLargeException(io.maxbytes, io.n))
nincr = write(io.io, v)
io.n += nincr
nincr
Expand Down

0 comments on commit d648012

Please sign in to comment.