Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
377 changes: 214 additions & 163 deletions json_rpc/client.nim

Large diffs are not rendered by default.

231 changes: 66 additions & 165 deletions json_rpc/clients/httpclient.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# json-rpc
# Copyright (c) 2019-2024 Status Research & Development GmbH
# Copyright (c) 2019-2025 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
Expand All @@ -10,21 +10,12 @@
{.push raises: [], gcsafe.}

import
std/[tables, uri],
stew/byteutils,
results,
std/uri,
chronos/apps/http/httpclient,
chronicles, httputils,
json_serialization/std/net as jsnet,
../client,
../errors,
../private/jrpc_sys
httputils,
../[client, errors]

export
client, errors, jsnet, HttpClientFlag, HttpClientFlags

logScope:
topics = "JSONRPC-HTTP-CLIENT"
export client, errors, HttpClientFlag, HttpClientFlags

type
HttpClientOptions* = object
Expand All @@ -33,53 +24,32 @@ type
RpcHttpClient* = ref object of RpcClient
httpSession: HttpSessionRef
httpAddress: HttpAddress
maxBodySize: int
getHeaders: GetJsonRpcRequestHeaders

# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------

proc `$`(v: HttpAddress): string =
v.id

proc new(
T: type RpcHttpClient, maxBodySize = MaxMessageBodyBytes, secure = false,
getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T =

proc new*(
T: type RpcHttpClient,
secure = false,
getHeaders: GetJsonRpcRequestHeaders = nil,
flags: HttpClientFlags = {},
maxMessageSize = defaultMaxMessageSize,
): T =
var moreFlags: HttpClientFlags
if secure:
moreFlags.incl HttpClientFlag.NoVerifyHost
moreFlags.incl HttpClientFlag.NoVerifyServerName

T(
maxBodySize: maxBodySize,
maxMessageSize: maxMessageSize,
httpSession: HttpSessionRef.new(flags = flags + moreFlags),
getHeaders: getHeaders
getHeaders: getHeaders,
)

template closeRefs(req, res: untyped) =
# We can't trust try/finally in async/await in all nim versions, so we
# do it manually instead
if req != nil:
try:
await req.closeWait()
except CatchableError as exc: # shouldn't happen
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
discard exc

if res != nil:
try:
await res.closeWait()
except CatchableError as exc: # shouldn't happen
debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg
discard exc

proc callImpl(client: RpcHttpClient, reqBody: string): Future[string] {.async.} =
method request(
client: RpcHttpClient, reqData: seq[byte]
): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} =
doAssert client.httpSession != nil
if client.httpAddress.addresses.len == 0:
raise newException(RpcPostError, "Not connected")

raise newException(RpcTransportError, "No remote addresses to connect to")

var headers =
if not isNil(client.getHeaders):
Expand All @@ -88,137 +58,68 @@ proc callImpl(client: RpcHttpClient, reqBody: string): Future[string] {.async.}
@[]
headers.add(("Content-Type", "application/json"))

var req: HttpClientRequestRef
var res: HttpClientResponseRef

req = HttpClientRequestRef.post(client.httpSession,
client.httpAddress,
body = reqBody.toOpenArrayByte(0, reqBody.len - 1),
headers = headers)
res =
try:
await req.send()
except CancelledError as e:
debug "Cancelled POST Request with JSON-RPC", e = e.msg
closeRefs(req, res)
raise e
except CatchableError as e:
debug "Failed to send POST Request with JSON-RPC", e = e.msg
closeRefs(req, res)
raise (ref RpcPostError)(msg: "Failed to send POST Request with JSON-RPC: " & e.msg, parent: e)

if res.status < 200 or res.status >= 300: # res.status is not 2xx (success)
debug "Unsuccessful POST Request with JSON-RPC",
status = res.status, reason = res.reason
closeRefs(req, res)
raise (ref ErrorResponse)(status: res.status, msg: res.reason)

let resBytes =
try:
await res.getBodyBytes(client.maxBodySize)
except CancelledError as e:
debug "Cancelled POST Response for JSON-RPC", e = e.msg
closeRefs(req, res)
raise e
except CatchableError as e:
debug "Failed to read POST Response for JSON-RPC", e = e.msg
closeRefs(req, res)
raise (ref FailedHttpResponse)(msg: "Failed to read POST Response for JSON-RPC: " & e.msg, parent: e)

result = string.fromBytes(resBytes)
trace "Response", text = result
closeRefs(req, res)

# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
let
req = HttpClientRequestRef.post(
client.httpSession, client.httpAddress, body = reqData, headers = headers
)

res =
try:
await req.send()
except HttpError as exc:
raise (ref RpcPostError)(msg: exc.msg, parent: exc)
finally:
await req.closeWait()

try:
if res.status < 200 or res.status >= 300: # res.status is not 2xx (success)
raise (ref ErrorResponse)(status: res.status, msg: res.reason)

let
resData = await res.getBodyBytes(client.maxMessageSize)
# TODO remove this processMessage hook when subscriptions / pubsub is
# properly supported
fallback = client.callOnProcessMessage(resData).valueOr:
raise (ref RequestDecodeError)(msg: error, payload: resData)

if not fallback:
# TODO http channels are unidirectional, so it doesn't really make sense
# to call onProcessMessage from http - this should be deprecated
# as soon as bidirectionality is supported
raise (ref InvalidResponse)(msg: "onProcessMessage handled response")

resData
except HttpError as exc:
raise (ref RpcTransportError)(msg: exc.msg, parent: exc)
finally:
await req.closeWait()

proc newRpcHttpClient*(
maxBodySize = MaxMessageBodyBytes, secure = false,
maxBodySize = defaultMaxMessageSize,
secure = false,
getHeaders: GetJsonRpcRequestHeaders = nil,
flags: HttpClientFlags = {}): RpcHttpClient =
RpcHttpClient.new(maxBodySize, secure, getHeaders, flags)
flags: HttpClientFlags = {},
): RpcHttpClient =
RpcHttpClient.new(secure, getHeaders, flags, maxBodySize)

method call*(client: RpcHttpClient, name: string,
params: RequestParamsTx): Future[JsonString]
{.async.} =
let
id = client.getNextId()
reqBody = requestTxEncode(name, params, id)

debug "Sending JSON-RPC request",
address = $client.httpAddress, len = len(reqBody), name, id
trace "Message", msg = reqBody

let resText = await client.callImpl(reqBody)

# completed by processMessage - the flow is quite weird here to accomodate
# socket and ws clients, but could use a more thorough refactoring
var newFut = newFuture[JsonString]()
# add to awaiting responses
client.awaiting[id] = newFut

# Might error for all kinds of reasons
let msgRes = client.processMessage(resText)
if msgRes.isErr:
# Need to clean up in case the answer was invalid
let exc = newException(JsonRpcError, msgRes.error)
newFut.fail(exc)
client.awaiting.del(id)
raise exc

client.awaiting.del(id)

# processMessage should have completed this future - if it didn't, `read` will
# raise, which is reasonable
if newFut.finished:
return newFut.read()
else:
# TODO: Provide more clarity regarding the failure here
debug "Invalid POST Response for JSON-RPC"
raise newException(InvalidResponse, "Invalid response")

method callBatch*(client: RpcHttpClient,
calls: RequestBatchTx): Future[ResponseBatchRx]
{.async.} =
let reqBody = requestBatchEncode(calls)
debug "Sending JSON-RPC batch",
address = $client.httpAddress, len = len(reqBody)
let resText = await client.callImpl(reqBody)

if client.batchFut.isNil or client.batchFut.finished():
client.batchFut = newFuture[ResponseBatchRx]()

# Might error for all kinds of reasons
let msgRes = client.processMessage(resText)
if msgRes.isErr:
# Need to clean up in case the answer was invalid
debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error
let exc = newException(JsonRpcError, msgRes.error)
client.batchFut.fail(exc)
raise exc

# processMessage should have completed this future - if it didn't, `read` will
# raise, which is reasonable
if client.batchFut.finished:
return client.batchFut.read()
else:
# TODO: Provide more clarity regarding the failure here
debug "Invalid POST Response for JSON-RPC"
raise newException(InvalidResponse, "Invalid response")

proc connect*(client: RpcHttpClient, url: string) {.async.} =
proc connect*(
client: RpcHttpClient, url: string
) {.async: (raises: [CancelledError, JsonRpcError]).} =
client.httpAddress = client.httpSession.getAddress(url).valueOr:
raise newException(RpcAddressUnresolvableError, error)
client.remote = client.httpAddress.id

proc connect*(client: RpcHttpClient, address: string, port: Port, secure: bool) {.async.} =
proc connect*(
client: RpcHttpClient, address: string, port: Port, secure: bool
) {.async: (raises: [CancelledError, JsonRpcError]).} =
let uri = Uri(
scheme: if secure: "https" else: "http",
hostname: address,
port: $port)

client.httpAddress = getAddress(client.httpSession, uri).valueOr:
raise newException(RpcAddressUnresolvableError, error)
client.remote = client.httpAddress.id

method close*(client: RpcHttpClient) {.async: (raises: []).} =
if not client.httpSession.isNil:
Expand Down
Loading
Loading