From 0652206a37640e859f11885fb1ea6ed410b0b73c Mon Sep 17 00:00:00 2001 From: Manuel Berkemeier Date: Thu, 28 Apr 2022 17:05:09 +0200 Subject: [PATCH 1/5] new `low_speed_limit` and `low_speed_time` options --- Project.toml | 2 +- src/curl.jl | 20 +++++++++++++++++++- src/grpc.jl | 21 +++++++++++++++++---- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/Project.toml b/Project.toml index b1a3620..6384909 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "gRPCClient" uuid = "aaca4a50-36af-4a1d-b878-4c443f2061ad" authors = ["Tanmay K.M. "] -version = "0.1.2" +version = "0.1.3" [deps] Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6" diff --git a/src/curl.jl b/src/curl.jl index 27286eb..6fe12ee 100644 --- a/src/curl.jl +++ b/src/curl.jl @@ -160,6 +160,21 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length: close(output) end +function set_low_speed_limits(easy::Curl.Easy, low_speed_limit, low_speed_time) + low_speed_limit >= 0 || + throw(ArgumentError("`low_speed_limit` must be non-negative, got $(low_speed_limit).")) + low_speed_time >= 0 || + throw(ArgumentError("`low_speed_time` must be non-negative, got $(low_speed_time).")) + + _max = typemax(Clong) + low_speed_limit = low_speed_limit <= _max ? round(Clong, low_speed_limit) : _max + low_speed_time = low_speed_time <= _max ? round(Clong, low_speed_time) : _max + + Curl.setopt(easy, CURLOPT_LOW_SPEED_LIMIT, low_speed_limit) + Curl.setopt(easy, CURLOPT_LOW_SPEED_TIME, low_speed_time) + return nothing +end + function set_connect_timeout(easy::Curl.Easy, timeout::Real) timeout >= 0 || throw(ArgumentError("timeout must be positive, got $timeout")) @@ -181,12 +196,15 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o connect_timeout::Real = 0, max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH, max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH, - verbose::Bool = false)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType} + verbose::Bool = false, + low_speed_limit::Int = 0, + low_speed_time::Int = 0)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType} Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do easy # setup the request Curl.set_url(easy, url) Curl.set_timeout(easy, request_timeout) set_connect_timeout(easy, connect_timeout) + set_low_speed_limits(easy, low_speed_limit, low_speed_time) Curl.set_verbose(easy, verbose) Curl.add_upload_callbacks(easy) Downloads.set_ca_roots(downloader, easy) diff --git a/src/grpc.jl b/src/grpc.jl index 98b0c96..5b280a4 100644 --- a/src/grpc.jl +++ b/src/grpc.jl @@ -78,6 +78,8 @@ end [ max_recv_message_length = 0, ] [ max_send_message_length = 0, ] [ verbose::Bool = false, ] + [ low_speed_limit = 0, ] + [ low_speed_time = 0, ] ) Contains settings to control the behavior of gRPC requests. @@ -98,6 +100,10 @@ Contains settings to control the behavior of gRPC requests. - `max_send_message_length`: maximum message length to send (default is `max_message_length`, same as setting this to 0) - `verbose`: whether to print out verbose communication logs (default false) +- `low_speed_limit`: speed in Bytes per second below which a connection is + considered slow (default is 0 and disables the setting) +- `low_speed_time`: duration in seconds for which a slow connection is tolerated + (default is 0 and disables the setting) """ struct gRPCController <: ProtoRpcController maxage::Clong @@ -109,7 +115,8 @@ struct gRPCController <: ProtoRpcController max_recv_message_length::Int max_send_message_length::Int verbose::Bool - + low_speed_limit::Int + low_speed_time::Int function gRPCController(; maxage::Integer = 0, keepalive::Integer = 60, @@ -120,15 +127,19 @@ struct gRPCController <: ProtoRpcController max_message_length::Integer = DEFAULT_MAX_MESSAGE_LENGTH, max_recv_message_length::Integer = 0, max_send_message_length::Integer = 0, - verbose::Bool = false + verbose::Bool = false, + low_speed_limit::Integer = 0, + low_speed_time::Integer = 0, ) if maxage < 0 || keepalive < 0 || request_timeout < 0 || connect_timeout < 0 || - max_message_length < 0 || max_recv_message_length < 0 || max_send_message_length < 0 + max_message_length < 0 || max_recv_message_length < 0 || max_send_message_length < 0 || + low_speed_limit < 0 || low_speed_time < 0 throw(ArgumentError("Invalid gRPCController parameter")) end (max_recv_message_length == 0) && (max_recv_message_length = max_message_length) (max_send_message_length == 0) && (max_send_message_length = max_message_length) - new(maxage, keepalive, negotiation, revocation, request_timeout, connect_timeout, max_recv_message_length, max_send_message_length, verbose) + return new(maxage, keepalive, negotiation, revocation, request_timeout, connect_timeout, + max_recv_message_length, max_send_message_length, verbose,low_speed_limit,low_speed_time) end end @@ -203,6 +214,8 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M max_recv_message_length = controller.max_recv_message_length, max_send_message_length = controller.max_send_message_length, verbose = controller.verbose, + low_speed_limit = controller.low_speed_limit, + low_speed_time = controller.low_speed_time, ) outchannel, status_future end From cb39da96221957497d47a2d7cf027b326c0a97b5 Mon Sep 17 00:00:00 2001 From: Manuel Berkemeier Date: Fri, 29 Apr 2022 10:22:08 +0200 Subject: [PATCH 2/5] revert ver push in Projet.toml --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 6384909..b1a3620 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "gRPCClient" uuid = "aaca4a50-36af-4a1d-b878-4c443f2061ad" authors = ["Tanmay K.M. "] -version = "0.1.3" +version = "0.1.2" [deps] Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6" From ac42f4c875d75aaa8c0ddac1f41b0a3ac89580fe Mon Sep 17 00:00:00 2001 From: Manuel Berkemeier Date: Fri, 6 May 2022 14:04:26 +0200 Subject: [PATCH 3/5] working "Downloads.jl" request --- .vscode/settings.json | 3 + src/curl.jl | 254 +++++++++++++++++++++++++++++++++++++++++- src/grpc.jl | 13 ++- src/limitio.jl | 5 +- 4 files changed, 267 insertions(+), 8 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..5ddbf36 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "julia.environmentPath": "/home/manuelbb/.julia/dev/gRPCClient" +} \ No newline at end of file diff --git a/src/curl.jl b/src/curl.jl index 6fe12ee..b2e1e76 100644 --- a/src/curl.jl +++ b/src/curl.jl @@ -42,6 +42,50 @@ function buffer_send_data(input::Channel{T}) where T <: ProtoType end =# + +function to_delimited_message_buffer(msg, max_message_length::Int) + iob = IOBuffer() + limitiob = LimitIO(iob, max_message_length) + write(limitiob, UInt8(0)) # compression + write(limitiob, hton(UInt32(0))) # message length (placeholder) + data_len = writeproto(limitiob, msg) # message bytes + + seek(iob, 1) # seek out the message length placeholder + write(iob, hton(UInt32(data_len))) # fill the message length + seek(iob, 0) + return iob +end + +function _channel_to_bytes( input :: Channel, max_send_message_length ) + while true + if isready(input) + @info "Parsing input." + input_buffer = to_delimited_message_buffer( + take!(input), max_send_message_length) + @info "Done parsing." + return input_buffer + end + end +end + +function upload_data(easy::Curl.Easy, input::IO) + while true + yield() + data = eof(input) ? nothing : readavailable(input) + easy.input === nothing && break + easy.input = data + Curl.curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT) + wait(easy.ready) + easy.input === nothing && break + easy.ready = Threads.Event() + end + @info "Done Uploading" +end + +function send_bytes(easy::Curl.Easy, msg_io) + return upload_data( easy, msg_io ) +end + function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::Int) where T <: ProtoType while true yield() @@ -87,6 +131,18 @@ function grpc_headers(; timeout::Real=Inf) headers end +function grpc_headers_dict(; timeout::Real=Inf) + headers = Dict( + "User-Agent" => "$(Curl.USER_AGENT)", + "Content-Type" => "application/grpc+proto", + "te" => "trailers" + ) + if timeout !== Inf + headers["grpc-timeout"] = "$(grpc_timeout_header_val(timeout))" + end + return headers +end + function grpc_request_header(request_timeout::Real) if request_timeout == Inf GRPC_STATIC_HEADERS[] @@ -127,6 +183,7 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length: datalen = UInt32(0) need_more = true for buf in easy.output + @info "fol loop output" write(iob, buf) need_more = false while !need_more @@ -160,6 +217,31 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length: close(output) end +function _bytes_to_channel( iob, output :: Channel{T}; max_recv_message_length = Inf) where T + @info "Reading back proto response." + seek(iob, 0) + + compressed = read(iob, UInt8) # compression + datalen = ntoh(read(iob, UInt32)) # message length + @info "1" + if datalen > max_recv_message_length + @warn "error again" + throw(gRPCMessageTooLargeException(max_recv_message_length, datalen)) + end + + @info "2" + + if bytesavailable(iob) >= datalen + @show isopen(output) + @show decoded_obj = readproto(iob, T()) + put!(output, decoded_obj) # decode message bytes + end + + @info "3" + #close(output) + return nothing +end + function set_low_speed_limits(easy::Curl.Easy, low_speed_limit, low_speed_time) low_speed_limit >= 0 || throw(ArgumentError("`low_speed_limit` must be non-negative, got $(low_speed_limit).")) @@ -187,7 +269,72 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real) end end -function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2}; +function grpc_request( + downloader::Downloader, + url::String, + input_channel::Channel{T1}, + output_channel::Channel{T2}; + maxage::Clong = typemax(Clong), + keepalive::Clong = 60, + negotiation::Symbol = :http2_prior_knowledge, + revocation::Bool = true, + request_timeout::Real = Inf, + connect_timeout::Real = 0, + max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH, + max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH, + verbose::Bool = false, + low_speed_limit::Int = 0, + low_speed_time::Int = 0)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType} + + headers = grpc_headers_dict(; timeout = request_timeout ) + input = _channel_to_bytes(input_channel, max_send_message_length) + output = IOBuffer() + + local req_res + Curl.with_handle( + easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do _easy + req_res = request( + url; + input, + output, + method = "POST", + headers, + timeout = request_timeout, + verbose, + downloader, + _easy + ) + + seek(output, 0) + @show output_channel + _bytes_to_channel( output, output_channel; max_recv_message_length ) + + # parse the grpc headers + @show grpc_status = StatusCode.OK.code + grpc_message = "" + for hdr in req_res.headers + if startswith(hdr.first, "grpc-status") + grpc_status = parse(Int, strip(hdr.second)) + elseif startswith(hdr.first, "grpc-message") + grpc_message = string(strip(hdr.second)) + end + end + if (_easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code) + grpc_status = StatusCode.DEADLINE_EXCEEDED.code + end + if (grpc_status != StatusCode.OK.code) && isempty(grpc_message) + grpc_message = grpc_status_message(grpc_status) + end + + if ((_easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code)) + gRPCStatus(true, grpc_status, "") + else + gRPCStatus(false, grpc_status, isempty(grpc_message) ? Curl.get_curl_errstr(_easy) : grpc_message) + end + end +end + +function _grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2}; maxage::Clong = typemax(Clong), keepalive::Clong = 60, negotiation::Symbol = :http2_prior_knowledge, @@ -213,6 +360,7 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o Curl.add_handle(downloader.multi, easy) function cleanup() + @info "Cleaning up." Curl.remove_handle(downloader.multi, easy) # though remove_handle sets easy.handle to C_NULL, it does not close output and progress channels # we need to close them here to unblock anything waiting on them @@ -253,9 +401,12 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o end else try + msg_io = _channel_to_bytes(input, max_recv_message_length) + Base.Experimental.@sync begin @async recv_data(easy, output, max_recv_message_length) - @async send_data(easy, input, max_send_message_length) + #@async send_data(easy, input, max_send_message_length) + @async send_bytes(easy, msg_io) end finally # ensure handle is removed cleanup() @@ -288,3 +439,102 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o end end end + + +function request( + url :: AbstractString; + input :: Union{Downloads.ArgRead, Nothing} = nothing, + output :: Union{Downloads.ArgWrite, Nothing} = nothing, + method :: Union{AbstractString, Nothing} = nothing, + headers :: Union{AbstractVector, AbstractDict} = Pair{String,String}[], + timeout :: Real = Inf, + progress :: Union{Function, Nothing} = nothing, + verbose :: Bool = false, + throw :: Bool = true, + downloader :: Union{Downloads.Downloader, Nothing} = nothing, + _easy :: Union{Curl.Easy, Nothing} = nothing, +) :: Union{Response, RequestError} + lock(Downloads.DOWNLOAD_LOCK) do + yield() # let other downloads finish + downloader isa Downloads.Downloader && return + while true + downloader = Downloads.DOWNLOADER[] + downloader isa Downloads.Downloader && return + Downloads.DOWNLOADER[] = Downloads.Downloader() + end + end + local response + have_input = input !== nothing + have_output = output !== nothing + input = Downloads.something(input, devnull) + output = Downloads.something(output, devnull) + input_size = Downloads.arg_read_size(input) + progress = Downloads.p_func(progress, input, output) + Downloads.arg_read(input) do input + Downloads.arg_write(output) do output + Downloads.with_handle( isnothing(_easy) ? Curl.Easy() : _easy ) do easy + # setup the request + Downloads.set_url(easy, url) + Downloads.set_timeout(easy, timeout) + Downloads.set_verbose(easy, verbose) + Downloads.add_headers(easy, headers) + + # libcurl does not set the default header reliably so set it + # explicitly unless user has specified it, xref + # https://github.com/JuliaLang/Pkg.jl/pull/2357 + if !any(kv -> lowercase(kv[1]) == "user-agent", headers) + Curl.add_header(easy, "User-Agent", Curl.USER_AGENT) + end + + if have_input + Downloads.enable_upload(easy) + if input_size !== nothing + Downloads.set_upload_size(easy, input_size) + end + if applicable(seek, input, 0) + Downloads.set_seeker(easy) do offset + seek(input, Int(offset)) + end + end + else + Downloads.set_body(easy, have_output) + end + method !== nothing && Downloads.set_method(easy, method) + progress !== nothing && Downloads.enable_progress(easy) + Downloads.set_ca_roots(downloader, easy) + info = (url = url, method = method, headers = headers) + Downloads.easy_hook(downloader, easy, info) + + # do the request + Downloads.add_handle(downloader.multi, easy) + try # ensure handle is removed + Base.Experimental.@sync begin + if have_input + @async Downloads.upload_data(easy, input) + end + @async for buf in easy.output + write(output, buf) + end + if progress !== nothing + @async for prog in easy.progress + progress(prog...) + end + end + end + finally + @info "cleining up" + Downloads.remove_handle(downloader.multi, easy) + end + + # return the response or throw an error + response = Downloads.Response(Downloads.get_response_info(easy)...) + easy.code == Curl.CURLE_OK && return response + message = Downloads.get_curl_errstr(easy) + response = Downloads.RequestError(url, easy.code, message, response) + throw && Base.throw(response) + end + end + end + @show response + return response +end \ No newline at end of file diff --git a/src/grpc.jl b/src/grpc.jl index 5b280a4..01b3353 100644 --- a/src/grpc.jl +++ b/src/grpc.jl @@ -187,13 +187,15 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M end call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T}) where T <: ProtoType = call_method(channel, service, method, controller, input, get_response_type(method)) function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{Channel{T2}}) where {T1 <: ProtoType, T2 <: ProtoType} - call_method(channel, service, method, controller, input, Channel{T2}()) + call_method(channel, service, method, controller, input, Channel{T2}(1)) end function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType} - outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}()) + outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}(1)) try - take!(outchannel), status_future - catch ex + out = take!(outchannel) + #close(outchannel) + out, status_future + catch exoutput_ch gRPCCheck(status_future) # check for core issue if isa(ex, InvalidStateException) throw(gRPCServiceCallException("Server closed connection without any response")) @@ -204,7 +206,7 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M end function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType} url = string(channel.baseurl, "/", service.name, "/", method.name) - status_future = @async grpc_request(channel.downloader, url, input, outchannel; + status_future = grpc_request(channel.downloader, url, input, outchannel; maxage = controller.maxage, keepalive = controller.keepalive, negotiation = controller.negotiation, @@ -217,5 +219,6 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M low_speed_limit = controller.low_speed_limit, low_speed_time = controller.low_speed_time, ) + @show status_future outchannel, status_future end diff --git a/src/limitio.jl b/src/limitio.jl index f8f45da..e67d69f 100644 --- a/src/limitio.jl +++ b/src/limitio.jl @@ -8,7 +8,10 @@ end LimitIO(io::IO, maxbytes) = LimitIO(io, maxbytes, 0) function Base.write(io::LimitIO, v::UInt8) - io.n > io.maxbytes && throw(gRPCMessageTooLargeException(io.maxbytes, io.n)) + if io.n > io.maxbytes + @warn "message to long" + throw(gRPCMessageTooLargeException(io.maxbytes, io.n)) + end nincr = write(io.io, v) io.n += nincr nincr From d64801209978b0b77556c372436cedd8e83b8cb8 Mon Sep 17 00:00:00 2001 From: Manuel Berkemeier Date: Fri, 6 May 2022 14:09:55 +0200 Subject: [PATCH 4/5] Revert "working "Downloads.jl" request" This reverts commit ac42f4c875d75aaa8c0ddac1f41b0a3ac89580fe. --- .vscode/settings.json | 3 - src/curl.jl | 254 +----------------------------------------- src/grpc.jl | 13 +-- src/limitio.jl | 5 +- 4 files changed, 8 insertions(+), 267 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 5ddbf36..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "julia.environmentPath": "/home/manuelbb/.julia/dev/gRPCClient" -} \ No newline at end of file diff --git a/src/curl.jl b/src/curl.jl index b2e1e76..6fe12ee 100644 --- a/src/curl.jl +++ b/src/curl.jl @@ -42,50 +42,6 @@ function buffer_send_data(input::Channel{T}) where T <: ProtoType end =# - -function to_delimited_message_buffer(msg, max_message_length::Int) - iob = IOBuffer() - limitiob = LimitIO(iob, max_message_length) - write(limitiob, UInt8(0)) # compression - write(limitiob, hton(UInt32(0))) # message length (placeholder) - data_len = writeproto(limitiob, msg) # message bytes - - seek(iob, 1) # seek out the message length placeholder - write(iob, hton(UInt32(data_len))) # fill the message length - seek(iob, 0) - return iob -end - -function _channel_to_bytes( input :: Channel, max_send_message_length ) - while true - if isready(input) - @info "Parsing input." - input_buffer = to_delimited_message_buffer( - take!(input), max_send_message_length) - @info "Done parsing." - return input_buffer - end - end -end - -function upload_data(easy::Curl.Easy, input::IO) - while true - yield() - data = eof(input) ? nothing : readavailable(input) - easy.input === nothing && break - easy.input = data - Curl.curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT) - wait(easy.ready) - easy.input === nothing && break - easy.ready = Threads.Event() - end - @info "Done Uploading" -end - -function send_bytes(easy::Curl.Easy, msg_io) - return upload_data( easy, msg_io ) -end - function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::Int) where T <: ProtoType while true yield() @@ -131,18 +87,6 @@ function grpc_headers(; timeout::Real=Inf) headers end -function grpc_headers_dict(; timeout::Real=Inf) - headers = Dict( - "User-Agent" => "$(Curl.USER_AGENT)", - "Content-Type" => "application/grpc+proto", - "te" => "trailers" - ) - if timeout !== Inf - headers["grpc-timeout"] = "$(grpc_timeout_header_val(timeout))" - end - return headers -end - function grpc_request_header(request_timeout::Real) if request_timeout == Inf GRPC_STATIC_HEADERS[] @@ -183,7 +127,6 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length: datalen = UInt32(0) need_more = true for buf in easy.output - @info "fol loop output" write(iob, buf) need_more = false while !need_more @@ -217,31 +160,6 @@ function recv_data(easy::Curl.Easy, output::Channel{T}, max_recv_message_length: close(output) end -function _bytes_to_channel( iob, output :: Channel{T}; max_recv_message_length = Inf) where T - @info "Reading back proto response." - seek(iob, 0) - - compressed = read(iob, UInt8) # compression - datalen = ntoh(read(iob, UInt32)) # message length - @info "1" - if datalen > max_recv_message_length - @warn "error again" - throw(gRPCMessageTooLargeException(max_recv_message_length, datalen)) - end - - @info "2" - - if bytesavailable(iob) >= datalen - @show isopen(output) - @show decoded_obj = readproto(iob, T()) - put!(output, decoded_obj) # decode message bytes - end - - @info "3" - #close(output) - return nothing -end - function set_low_speed_limits(easy::Curl.Easy, low_speed_limit, low_speed_time) low_speed_limit >= 0 || throw(ArgumentError("`low_speed_limit` must be non-negative, got $(low_speed_limit).")) @@ -269,72 +187,7 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real) end end -function grpc_request( - downloader::Downloader, - url::String, - input_channel::Channel{T1}, - output_channel::Channel{T2}; - maxage::Clong = typemax(Clong), - keepalive::Clong = 60, - negotiation::Symbol = :http2_prior_knowledge, - revocation::Bool = true, - request_timeout::Real = Inf, - connect_timeout::Real = 0, - max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH, - max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH, - verbose::Bool = false, - low_speed_limit::Int = 0, - low_speed_time::Int = 0)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType} - - headers = grpc_headers_dict(; timeout = request_timeout ) - input = _channel_to_bytes(input_channel, max_send_message_length) - output = IOBuffer() - - local req_res - Curl.with_handle( - easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do _easy - req_res = request( - url; - input, - output, - method = "POST", - headers, - timeout = request_timeout, - verbose, - downloader, - _easy - ) - - seek(output, 0) - @show output_channel - _bytes_to_channel( output, output_channel; max_recv_message_length ) - - # parse the grpc headers - @show grpc_status = StatusCode.OK.code - grpc_message = "" - for hdr in req_res.headers - if startswith(hdr.first, "grpc-status") - grpc_status = parse(Int, strip(hdr.second)) - elseif startswith(hdr.first, "grpc-message") - grpc_message = string(strip(hdr.second)) - end - end - if (_easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code) - grpc_status = StatusCode.DEADLINE_EXCEEDED.code - end - if (grpc_status != StatusCode.OK.code) && isempty(grpc_message) - grpc_message = grpc_status_message(grpc_status) - end - - if ((_easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code)) - gRPCStatus(true, grpc_status, "") - else - gRPCStatus(false, grpc_status, isempty(grpc_message) ? Curl.get_curl_errstr(_easy) : grpc_message) - end - end -end - -function _grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2}; +function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, output::Channel{T2}; maxage::Clong = typemax(Clong), keepalive::Clong = 60, negotiation::Symbol = :http2_prior_knowledge, @@ -360,7 +213,6 @@ function _grpc_request(downloader::Downloader, url::String, input::Channel{T1}, Curl.add_handle(downloader.multi, easy) function cleanup() - @info "Cleaning up." Curl.remove_handle(downloader.multi, easy) # though remove_handle sets easy.handle to C_NULL, it does not close output and progress channels # we need to close them here to unblock anything waiting on them @@ -401,12 +253,9 @@ function _grpc_request(downloader::Downloader, url::String, input::Channel{T1}, end else try - msg_io = _channel_to_bytes(input, max_recv_message_length) - Base.Experimental.@sync begin @async recv_data(easy, output, max_recv_message_length) - #@async send_data(easy, input, max_send_message_length) - @async send_bytes(easy, msg_io) + @async send_data(easy, input, max_send_message_length) end finally # ensure handle is removed cleanup() @@ -439,102 +288,3 @@ function _grpc_request(downloader::Downloader, url::String, input::Channel{T1}, end end end - - -function request( - url :: AbstractString; - input :: Union{Downloads.ArgRead, Nothing} = nothing, - output :: Union{Downloads.ArgWrite, Nothing} = nothing, - method :: Union{AbstractString, Nothing} = nothing, - headers :: Union{AbstractVector, AbstractDict} = Pair{String,String}[], - timeout :: Real = Inf, - progress :: Union{Function, Nothing} = nothing, - verbose :: Bool = false, - throw :: Bool = true, - downloader :: Union{Downloads.Downloader, Nothing} = nothing, - _easy :: Union{Curl.Easy, Nothing} = nothing, -) :: Union{Response, RequestError} - lock(Downloads.DOWNLOAD_LOCK) do - yield() # let other downloads finish - downloader isa Downloads.Downloader && return - while true - downloader = Downloads.DOWNLOADER[] - downloader isa Downloads.Downloader && return - Downloads.DOWNLOADER[] = Downloads.Downloader() - end - end - local response - have_input = input !== nothing - have_output = output !== nothing - input = Downloads.something(input, devnull) - output = Downloads.something(output, devnull) - input_size = Downloads.arg_read_size(input) - progress = Downloads.p_func(progress, input, output) - Downloads.arg_read(input) do input - Downloads.arg_write(output) do output - Downloads.with_handle( isnothing(_easy) ? Curl.Easy() : _easy ) do easy - # setup the request - Downloads.set_url(easy, url) - Downloads.set_timeout(easy, timeout) - Downloads.set_verbose(easy, verbose) - Downloads.add_headers(easy, headers) - - # libcurl does not set the default header reliably so set it - # explicitly unless user has specified it, xref - # https://github.com/JuliaLang/Pkg.jl/pull/2357 - if !any(kv -> lowercase(kv[1]) == "user-agent", headers) - Curl.add_header(easy, "User-Agent", Curl.USER_AGENT) - end - - if have_input - Downloads.enable_upload(easy) - if input_size !== nothing - Downloads.set_upload_size(easy, input_size) - end - if applicable(seek, input, 0) - Downloads.set_seeker(easy) do offset - seek(input, Int(offset)) - end - end - else - Downloads.set_body(easy, have_output) - end - method !== nothing && Downloads.set_method(easy, method) - progress !== nothing && Downloads.enable_progress(easy) - Downloads.set_ca_roots(downloader, easy) - info = (url = url, method = method, headers = headers) - Downloads.easy_hook(downloader, easy, info) - - # do the request - Downloads.add_handle(downloader.multi, easy) - try # ensure handle is removed - Base.Experimental.@sync begin - if have_input - @async Downloads.upload_data(easy, input) - end - @async for buf in easy.output - write(output, buf) - end - if progress !== nothing - @async for prog in easy.progress - progress(prog...) - end - end - end - finally - @info "cleining up" - Downloads.remove_handle(downloader.multi, easy) - end - - # return the response or throw an error - response = Downloads.Response(Downloads.get_response_info(easy)...) - easy.code == Curl.CURLE_OK && return response - message = Downloads.get_curl_errstr(easy) - response = Downloads.RequestError(url, easy.code, message, response) - throw && Base.throw(response) - end - end - end - @show response - return response -end \ No newline at end of file diff --git a/src/grpc.jl b/src/grpc.jl index 01b3353..5b280a4 100644 --- a/src/grpc.jl +++ b/src/grpc.jl @@ -187,15 +187,13 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M end call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T}) where T <: ProtoType = call_method(channel, service, method, controller, input, get_response_type(method)) function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{Channel{T2}}) where {T1 <: ProtoType, T2 <: ProtoType} - call_method(channel, service, method, controller, input, Channel{T2}(1)) + call_method(channel, service, method, controller, input, Channel{T2}()) end function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, ::Type{T2}) where {T1 <: ProtoType, T2 <: ProtoType} - outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}(1)) + outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}()) try - out = take!(outchannel) - #close(outchannel) - out, status_future - catch exoutput_ch + take!(outchannel), status_future + catch ex gRPCCheck(status_future) # check for core issue if isa(ex, InvalidStateException) throw(gRPCServiceCallException("Server closed connection without any response")) @@ -206,7 +204,7 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M end function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType} url = string(channel.baseurl, "/", service.name, "/", method.name) - status_future = grpc_request(channel.downloader, url, input, outchannel; + status_future = @async grpc_request(channel.downloader, url, input, outchannel; maxage = controller.maxage, keepalive = controller.keepalive, negotiation = controller.negotiation, @@ -219,6 +217,5 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M low_speed_limit = controller.low_speed_limit, low_speed_time = controller.low_speed_time, ) - @show status_future outchannel, status_future end diff --git a/src/limitio.jl b/src/limitio.jl index e67d69f..f8f45da 100644 --- a/src/limitio.jl +++ b/src/limitio.jl @@ -8,10 +8,7 @@ end LimitIO(io::IO, maxbytes) = LimitIO(io, maxbytes, 0) function Base.write(io::LimitIO, v::UInt8) - if io.n > io.maxbytes - @warn "message to long" - throw(gRPCMessageTooLargeException(io.maxbytes, io.n)) - end + io.n > io.maxbytes && throw(gRPCMessageTooLargeException(io.maxbytes, io.n)) nincr = write(io.io, v) io.n += nincr nincr From 30ff20d8b8f0cf6d7ebed7c0e2c80d726d9cd3b2 Mon Sep 17 00:00:00 2001 From: Manuel Berkemeier Date: Tue, 10 May 2022 13:23:56 +0200 Subject: [PATCH 5/5] fix https://github.com/JuliaLang/Downloads.jl/issues/193 --- src/curl.jl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/curl.jl b/src/curl.jl index 6fe12ee..ce4205d 100644 --- a/src/curl.jl +++ b/src/curl.jl @@ -166,7 +166,7 @@ function set_low_speed_limits(easy::Curl.Easy, low_speed_limit, low_speed_time) low_speed_time >= 0 || throw(ArgumentError("`low_speed_time` must be non-negative, got $(low_speed_time).")) - _max = typemax(Clong) + _max = typemax(Clong) ÷ 1000 low_speed_limit = low_speed_limit <= _max ? round(Clong, low_speed_limit) : _max low_speed_time = low_speed_time <= _max ? round(Clong, low_speed_time) : _max @@ -182,8 +182,7 @@ function set_connect_timeout(easy::Curl.Easy, timeout::Real) timeout_ms = round(Clong, timeout * 1000) Curl.setopt(easy, CURLOPT_CONNECTTIMEOUT_MS, timeout_ms) else - timeout = timeout ≤ typemax(Clong) ? round(Clong, timeout) : Clong(0) - Curl.setopt(easy, CURLOPT_CONNECTTIMEOUT, timeout) + Curl.setopt(easy, CURLOPT_CONNECTTIMEOUT, Clong(0)) end end