Skip to content

Commit

Permalink
Merge pull request #10 from JuliaComputing/tan/ci
Browse files Browse the repository at this point in the history
 improve timeout and server error handling
  • Loading branch information
tanmaykm authored May 3, 2021
2 parents 305b4d8 + 8574b40 commit f8f2f68
Show file tree
Hide file tree
Showing 22 changed files with 1,477 additions and 117 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ the server.
`gRPCStatus` represents the status of a request. It has the following fields:

- `success`: whether the request was completed successfully.
- `grpc_status`: the grpc status code returned
- `message`: any error message if request was not successful

### `gRPCCheck`
Expand Down Expand Up @@ -227,6 +228,7 @@ A `gRPMessageTooLargeException` has the following members:
A `gRPCServiceCallException` is thrown if a gRPC request is not successful.
It has the following members:

- `grpc_status`: grpc status code for this request
- `message`: any error message if request was not successful

## Credits
Expand Down
88 changes: 83 additions & 5 deletions src/curl.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
const GRPC_STATIC_HEADERS = Ref{Ptr{Nothing}}(C_NULL)

const StatusCode = (
OK = (code=0, message="Success"),
CANCELLED = (code=1, message="The operation was cancelled"),
UNKNOWN = (code=2, message="Unknown error"),
INVALID_ARGUMENT = (code=3, message="Client specified an invalid argument"),
DEADLINE_EXCEEDED = (code=4, message="Deadline expired before the operation could complete"),
NOT_FOUND = (code=5, message="Requested entity was not found"),
ALREADY_EXISTS = (code=6, message="Entity already exists"),
PERMISSION_DENIED = (code=7, message="No permission to execute the specified operation"),
RESOURCE_EXHAUSTED = (code=8, message="Resource exhausted"),
FAILED_PRECONDITION = (code=9, message="Operation was rejected because the system is not in a state required for the operation's execution"),
ABORTED = (code=10, message="Operation was aborted"),
OUT_OF_RANGE = (code=11, message="Operation was attempted past the valid range"),
UNIMPLEMENTED = (code=12, message="Operation is not implemented or is not supported/enabled in this service"),
INTERNAL = (code=13, message="Internal error"),
UNAVAILABLE = (code=14, message="The service is currently unavailable"),
DATA_LOSS = (code=15, message="Unrecoverable data loss or corruption"),
UNAUTHENTICATED = (code=16, message="The request does not have valid authentication credentials for the operation")
)

grpc_status_info(code) = StatusCode[code+1]
grpc_status_message(code) = (grpc_status_info(code)).message
grpc_status_code_str(code) = string(propertynames(StatusCode)[code+1])

#=
const SEND_BUFFER_SZ = 1024 * 1024
function buffer_send_data(input::Channel{T}) where T <: ProtoType
Expand Down Expand Up @@ -31,15 +55,46 @@ function send_data(easy::Curl.Easy, input::Channel{T}, max_send_message_length::
end
end

function grpc_headers()
function grpc_timeout_header_val(timeout::Real)
if round(Int, timeout) == timeout
timeout_secs = round(Int64, timeout)
return "$(timeout_secs)S"
end
timeout *= 1000
if round(Int, timeout) == timeout
timeout_millisecs = round(Int64, timeout)
return "$(timeout_millisecs)m"
end
timeout *= 1000
if round(Int, timeout) == timeout
timeout_microsecs = round(Int64, timeout)
return "$(timeout_microsecs)u"
end
timeout *= 1000
timeout_nanosecs = round(Int64, timeout)
return "$(timeout_nanosecs)n"
end

function grpc_headers(; timeout::Real=Inf)
headers = C_NULL
headers = LibCURL.curl_slist_append(headers, "User-Agent: $(Curl.USER_AGENT)")
headers = LibCURL.curl_slist_append(headers, "Content-Type: application/grpc+proto")
headers = LibCURL.curl_slist_append(headers, "Content-Length:")
if timeout !== Inf
headers = LibCURL.curl_slist_append(headers, "grpc-timeout: $(grpc_timeout_header_val(timeout))")
end
headers
end

function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool)
function grpc_request_header(request_timeout::Real)
if request_timeout == Inf
GRPC_STATIC_HEADERS[]
else
grpc_headers(; timeout=request_timeout)
end
end

function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revocation::Bool, request_timeout::Real)
easy = Curl.Easy()
http_version = (negotiation === :http2) ? CURL_HTTP_VERSION_2_0 :
(negotiation === :http2_tls) ? CURL_HTTP_VERSION_2TLS :
Expand All @@ -48,7 +103,7 @@ function easy_handle(maxage::Clong, keepalive::Clong, negotiation::Symbol, revoc
Curl.setopt(easy, CURLOPT_HTTP_VERSION, http_version)
Curl.setopt(easy, CURLOPT_PIPEWAIT, Clong(1))
Curl.setopt(easy, CURLOPT_POST, Clong(1))
Curl.setopt(easy, CURLOPT_HTTPHEADER, GRPC_STATIC_HEADERS[])
Curl.setopt(easy, CURLOPT_HTTPHEADER, grpc_request_header(request_timeout))
if !revocation
Curl.setopt(easy, CURLOPT_SSL_OPTIONS, CURLSSLOPT_NO_REVOKE)
end
Expand Down Expand Up @@ -126,7 +181,7 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
max_recv_message_length::Int = DEFAULT_MAX_RECV_MESSAGE_LENGTH,
max_send_message_length::Int = DEFAULT_MAX_SEND_MESSAGE_LENGTH,
verbose::Bool = false)::gRPCStatus where {T1 <: ProtoType, T2 <: ProtoType}
Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation)) do easy
Curl.with_handle(easy_handle(maxage, keepalive, negotiation, revocation, request_timeout)) do easy
# setup the request
Curl.set_url(easy, url)
Curl.set_timeout(easy, request_timeout)
Expand Down Expand Up @@ -188,6 +243,29 @@ function grpc_request(downloader::Downloader, url::String, input::Channel{T1}, o
end
end

(easy.code == CURLE_OK) ? gRPCStatus(true, "") : gRPCStatus(false, Curl.get_curl_errstr(easy))
@debug("response headers", easy.res_hdrs)

# parse the grpc headers
grpc_status = StatusCode.OK.code
grpc_message = ""
for hdr in easy.res_hdrs
if startswith(hdr, "grpc-status")
grpc_status = parse(Int, strip(last(split(hdr, ':'; limit=2))))
elseif startswith(hdr, "grpc-message")
grpc_message = string(strip(last(split(hdr, ':'; limit=2))))
end
end
if (easy.code == CURLE_OPERATION_TIMEDOUT) && (grpc_status == StatusCode.OK.code)
grpc_status = StatusCode.DEADLINE_EXCEEDED.code
end
if (grpc_status != StatusCode.OK.code) && isempty(grpc_message)
grpc_message = grpc_status_message(grpc_status)
end

if ((easy.code == CURLE_OK) && (grpc_status == StatusCode.OK.code))
gRPCStatus(true, grpc_status, "")
else
gRPCStatus(false, grpc_status, isempty(grpc_message) ? Curl.get_curl_errstr(easy) : grpc_message)
end
end
end
2 changes: 1 addition & 1 deletion src/gRPCClient.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using ProtoBuf
import Downloads: Curl
import ProtoBuf: call_method

export gRPCController, gRPCChannel, gRPCException, gRPCServiceCallException, gRPCMessageTooLargeException, gRPCStatus, gRPCCheck
export gRPCController, gRPCChannel, gRPCException, gRPCServiceCallException, gRPCMessageTooLargeException, gRPCStatus, gRPCCheck, StatusCode

abstract type gRPCException <: Exception end

Expand Down
19 changes: 13 additions & 6 deletions src/grpc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
"""
struct gRPCStatus
success::Bool
grpc_status::Int
message::String
exception::Union{Nothing,Exception}
end

gRPCStatus(success::Bool, message::AbstractString) = gRPCStatus(success, string(message), nothing)
gRPCStatus(success::Bool, grpc_status::Int, message::AbstractString) = gRPCStatus(success, grpc_status, string(message), nothing)
function gRPCStatus(status_future)
try
fetch(status_future)
Expand All @@ -24,7 +25,7 @@ function gRPCStatus(status_future)
while isa(task_exception, TaskFailedException)
task_exception = task_exception.task.exception
end
gRPCStatus(false, string(task_exception), task_exception)
gRPCStatus(false, StatusCode.INTERNAL.code, string(task_exception), task_exception)
end
end

Expand All @@ -39,10 +40,11 @@ It has the following members:
- `message`: any error message if request was not successful
"""
struct gRPCServiceCallException <: gRPCException
grpc_status::Int
message::String
end

Base.show(io::IO, m::gRPCServiceCallException) = print(io, "gRPCServiceCallException - $(m.message)")
Base.show(io::IO, m::gRPCServiceCallException) = print(io, "gRPCServiceCallException: $(m.grpc_status), $(m.message)")

"""
gRPCCheck(status; throw_error::Bool=true)
Expand All @@ -56,7 +58,7 @@ gRPCCheck(status_future; throw_error::Bool=true) = gRPCCheck(gRPCStatus(status_f
function gRPCCheck(status::gRPCStatus; throw_error::Bool=true)
if throw_error && !status.success
if status.exception === nothing
throw(gRPCServiceCallException(status.message))
throw(gRPCServiceCallException(status.grpc_status, status.message))
else
throw(status.exception)
end
Expand Down Expand Up @@ -180,8 +182,13 @@ function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::M
outchannel, status_future = call_method(channel, service, method, controller, input, Channel{T2}())
try
take!(outchannel), status_future
catch
nothing, status_future
catch ex
gRPCCheck(status_future) # check for core issue
if isa(ex, InvalidStateException)
throw(gRPCServiceCallException("Server closed connection without any response"))
else
rethrow() # throw this error if there's no other issue
end
end
end
function call_method(channel::gRPCChannel, service::ServiceDescriptor, method::MethodDescriptor, controller::gRPCController, input::Channel{T1}, outchannel::Channel{T2}) where {T1 <: ProtoType, T2 <: ProtoType}
Expand Down
4 changes: 3 additions & 1 deletion test/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
grpc-go
server.pid
runserver_*
routeguide_*
grpcerrors_*
testservers
84 changes: 84 additions & 0 deletions test/GrpcerrorsClients/GrpcerrorsClients.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
module GrpcerrorsClients
using gRPCClient

include("grpcerrors.jl")
using .grpcerrors

import Base: show

# begin service: grpcerrors.GRPCErrors

export GRPCErrorsBlockingClient, GRPCErrorsClient

struct GRPCErrorsBlockingClient
controller::gRPCController
channel::gRPCChannel
stub::GRPCErrorsBlockingStub

function GRPCErrorsBlockingClient(api_base_url::String; kwargs...)
controller = gRPCController(; kwargs...)
channel = gRPCChannel(api_base_url)
stub = GRPCErrorsBlockingStub(channel)
new(controller, channel, stub)
end
end

struct GRPCErrorsClient
controller::gRPCController
channel::gRPCChannel
stub::GRPCErrorsStub

function GRPCErrorsClient(api_base_url::String; kwargs...)
controller = gRPCController(; kwargs...)
channel = gRPCChannel(api_base_url)
stub = GRPCErrorsStub(channel)
new(controller, channel, stub)
end
end

show(io::IO, client::GRPCErrorsBlockingClient) = print(io, "GRPCErrorsBlockingClient(", client.channel.baseurl, ")")
show(io::IO, client::GRPCErrorsClient) = print(io, "GRPCErrorsClient(", client.channel.baseurl, ")")

import .grpcerrors: SimpleRPC
"""
SimpleRPC
- input: grpcerrors.Data
- output: grpcerrors.Data
"""
SimpleRPC(client::GRPCErrorsBlockingClient, inp::grpcerrors.Data) = SimpleRPC(client.stub, client.controller, inp)
SimpleRPC(client::GRPCErrorsClient, inp::grpcerrors.Data, done::Function) = SimpleRPC(client.stub, client.controller, inp, done)

import .grpcerrors: StreamResponse
"""
StreamResponse
- input: grpcerrors.Data
- output: Channel{grpcerrors.Data}
"""
StreamResponse(client::GRPCErrorsBlockingClient, inp::grpcerrors.Data) = StreamResponse(client.stub, client.controller, inp)
StreamResponse(client::GRPCErrorsClient, inp::grpcerrors.Data, done::Function) = StreamResponse(client.stub, client.controller, inp, done)

import .grpcerrors: StreamRequest
"""
StreamRequest
- input: Channel{grpcerrors.Data}
- output: grpcerrors.Data
"""
StreamRequest(client::GRPCErrorsBlockingClient, inp::Channel{grpcerrors.Data}) = StreamRequest(client.stub, client.controller, inp)
StreamRequest(client::GRPCErrorsClient, inp::Channel{grpcerrors.Data}, done::Function) = StreamRequest(client.stub, client.controller, inp, done)

import .grpcerrors: StreamRequestResponse
"""
StreamRequestResponse
- input: Channel{grpcerrors.Data}
- output: Channel{grpcerrors.Data}
"""
StreamRequestResponse(client::GRPCErrorsBlockingClient, inp::Channel{grpcerrors.Data}) = StreamRequestResponse(client.stub, client.controller, inp)
StreamRequestResponse(client::GRPCErrorsClient, inp::Channel{grpcerrors.Data}, done::Function) = StreamRequestResponse(client.stub, client.controller, inp, done)

# end service: grpcerrors.GRPCErrors

end # module GrpcerrorsClients
4 changes: 4 additions & 0 deletions test/GrpcerrorsClients/grpcerrors.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module grpcerrors
const _ProtoBuf_Top_ = @static isdefined(parentmodule(@__MODULE__), :_ProtoBuf_Top_) ? (parentmodule(@__MODULE__))._ProtoBuf_Top_ : parentmodule(@__MODULE__)
include("grpcerrors_pb.jl")
end
77 changes: 77 additions & 0 deletions test/GrpcerrorsClients/grpcerrors_pb.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# syntax: proto3
using ProtoBuf
import ProtoBuf.meta

mutable struct Data <: ProtoType
__protobuf_jl_internal_meta::ProtoMeta
__protobuf_jl_internal_values::Dict{Symbol,Any}
__protobuf_jl_internal_defaultset::Set{Symbol}

function Data(; kwargs...)
obj = new(meta(Data), Dict{Symbol,Any}(), Set{Symbol}())
values = obj.__protobuf_jl_internal_values
symdict = obj.__protobuf_jl_internal_meta.symdict
for nv in kwargs
fldname, fldval = nv
fldtype = symdict[fldname].jtyp
(fldname in keys(symdict)) || error(string(typeof(obj), " has no field with name ", fldname))
values[fldname] = isa(fldval, fldtype) ? fldval : convert(fldtype, fldval)
end
obj
end
end # mutable struct Data
const __meta_Data = Ref{ProtoMeta}()
function meta(::Type{Data})
ProtoBuf.metalock() do
if !isassigned(__meta_Data)
__meta_Data[] = target = ProtoMeta(Data)
allflds = Pair{Symbol,Union{Type,String}}[:mode => Int32, :param => Int32]
meta(target, Data, allflds, ProtoBuf.DEF_REQ, ProtoBuf.DEF_FNUM, ProtoBuf.DEF_VAL, ProtoBuf.DEF_PACK, ProtoBuf.DEF_WTYPES, ProtoBuf.DEF_ONEOFS, ProtoBuf.DEF_ONEOF_NAMES)
end
__meta_Data[]
end
end
function Base.getproperty(obj::Data, name::Symbol)
if name === :mode
return (obj.__protobuf_jl_internal_values[name])::Int32
elseif name === :param
return (obj.__protobuf_jl_internal_values[name])::Int32
else
getfield(obj, name)
end
end

# service methods for GRPCErrors
const _GRPCErrors_methods = MethodDescriptor[
MethodDescriptor("SimpleRPC", 1, Data, Data),
MethodDescriptor("StreamResponse", 2, Data, Channel{Data}),
MethodDescriptor("StreamRequest", 3, Channel{Data}, Data),
MethodDescriptor("StreamRequestResponse", 4, Channel{Data}, Channel{Data})
] # const _GRPCErrors_methods
const _GRPCErrors_desc = ServiceDescriptor("grpcerrors.GRPCErrors", 1, _GRPCErrors_methods)

GRPCErrors(impl::Module) = ProtoService(_GRPCErrors_desc, impl)

mutable struct GRPCErrorsStub <: AbstractProtoServiceStub{false}
impl::ProtoServiceStub
GRPCErrorsStub(channel::ProtoRpcChannel) = new(ProtoServiceStub(_GRPCErrors_desc, channel))
end # mutable struct GRPCErrorsStub

mutable struct GRPCErrorsBlockingStub <: AbstractProtoServiceStub{true}
impl::ProtoServiceBlockingStub
GRPCErrorsBlockingStub(channel::ProtoRpcChannel) = new(ProtoServiceBlockingStub(_GRPCErrors_desc, channel))
end # mutable struct GRPCErrorsBlockingStub

SimpleRPC(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Data, done::Function) = call_method(stub.impl, _GRPCErrors_methods[1], controller, inp, done)
SimpleRPC(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Data) = call_method(stub.impl, _GRPCErrors_methods[1], controller, inp)

StreamResponse(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Data, done::Function) = call_method(stub.impl, _GRPCErrors_methods[2], controller, inp, done)
StreamResponse(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Data) = call_method(stub.impl, _GRPCErrors_methods[2], controller, inp)

StreamRequest(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Channel{Data}, done::Function) = call_method(stub.impl, _GRPCErrors_methods[3], controller, inp, done)
StreamRequest(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Channel{Data}) = call_method(stub.impl, _GRPCErrors_methods[3], controller, inp)

StreamRequestResponse(stub::GRPCErrorsStub, controller::ProtoRpcController, inp::Channel{Data}, done::Function) = call_method(stub.impl, _GRPCErrors_methods[4], controller, inp, done)
StreamRequestResponse(stub::GRPCErrorsBlockingStub, controller::ProtoRpcController, inp::Channel{Data}) = call_method(stub.impl, _GRPCErrors_methods[4], controller, inp)

export Data, GRPCErrors, GRPCErrorsStub, GRPCErrorsBlockingStub, SimpleRPC, StreamResponse, StreamRequest, StreamRequestResponse
Loading

0 comments on commit f8f2f68

Please sign in to comment.