-
Notifications
You must be signed in to change notification settings - Fork 30
Add support for two-way communication #127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| --define:"async_backend=asyncdispatch" | ||
| --threads:on | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,12 @@ | ||
| import | ||
| std/[tables, macros], | ||
| chronos, | ||
| faststreams/async_backend, | ||
| ./jsonmarshal | ||
|
|
||
| from strutils import toLowerAscii, replace | ||
|
|
||
| export | ||
| chronos, jsonmarshal, tables | ||
| jsonmarshal, tables | ||
|
|
||
| type | ||
| ClientId* = int64 | ||
|
|
@@ -26,13 +26,16 @@ proc getNextId*(client: RpcClient): ClientId = | |
| proc rpcCallNode*(path: string, params: JsonNode, id: ClientId): JsonNode = | ||
| %{"jsonrpc": %"2.0", "method": %path, "params": params, "id": %id} | ||
|
|
||
| proc rpcNotificationNode*(path: string, params: JsonNode): JsonNode = | ||
| %{"jsonrpc": %"2.0", "method": %path, "params": params} | ||
|
|
||
| method call*(client: RpcClient, name: string, | ||
| params: JsonNode): Future[Response] {. | ||
| base, async, gcsafe, raises: [Defect, CatchableError].} = | ||
| base, async, gcsafe, raises: [Defect, CatchableError, Exception].} = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. throughout,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will track down that. I believe it was caused by async when used in combination with
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we generally don't support / work with ah, I also see that this function in particular is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, here is the root cause: import std/asyncdispatch
proc echoString(params: string): Future[string]
{.async, raises: [Defect, CatchableError].} =
return paramsFails with
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, so what |
||
| discard | ||
|
|
||
| method close*(client: RpcClient): Future[void] {. | ||
| base, async, gcsafe, raises: [Defect, CatchableError].} = | ||
| base, async, gcsafe, raises: [Defect, CatchableError, Exception].} = | ||
| discard | ||
|
|
||
| template `or`(a: JsonNode, b: typed): JsonNode = | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,21 @@ | ||
| import | ||
| std/[macros, options, strutils, tables], | ||
| chronicles, chronos, json_serialization/writer, | ||
| std/[macros, options, strutils, tables], sugar, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sugar can go in the std brackets
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure. |
||
| chronicles, faststreams/async_backend, json_serialization/writer, | ||
| ./jsonmarshal, ./errors | ||
|
|
||
| export | ||
| chronos, jsonmarshal | ||
| export jsonmarshal | ||
|
|
||
| type | ||
| StringOfJson* = JsonString | ||
|
|
||
| RpcResult* = Option[JsonString] | ||
|
|
||
| # Procedure signature accepted as an RPC call by server | ||
| RpcProc* = proc(input: JsonNode): Future[StringOfJson] {.gcsafe, raises: [Defect, CatchableError].} | ||
| RpcProc* = proc(input: JsonNode): Future[RpcResult] {.gcsafe, raises: [Defect, CatchableError, Exception].} | ||
|
|
||
| RpcRouter* = object | ||
| procs*: Table[string, RpcProc] | ||
| fullParams*: bool # if false send "params" to the handlers | ||
|
|
||
| const | ||
| methodField = "method" | ||
|
|
@@ -34,7 +36,7 @@ proc newRpcRouter*: RpcRouter {.deprecated.} = | |
| RpcRouter.init() | ||
|
|
||
| proc register*(router: var RpcRouter, path: string, call: RpcProc) = | ||
| router.procs.add(path, call) | ||
| router.procs[path] = call | ||
|
|
||
| proc clear*(router: var RpcRouter) = | ||
| router.procs.clear | ||
|
|
@@ -59,48 +61,57 @@ proc wrapError*(code: int, msg: string, id: JsonNode = newJNull(), | |
| $id, $code, escapeJson(msg), $data | ||
| ] & "\r\n") | ||
|
|
||
| proc route*(router: RpcRouter, node: JsonNode): Future[StringOfJson] {.async, gcsafe.} = | ||
| proc hasReturnType(params: NimNode): bool = | ||
| if params != nil and params.len > 0 and params[0] != nil and | ||
| params[0].kind != nnkEmpty: | ||
| result = true | ||
|
|
||
| proc route*(router: RpcRouter, node: JsonNode): Future[RpcResult] {.async, gcsafe.} = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The change to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it's public, so it can be used - this is a bit of a messy aspect of Nim, the module system is shifty - a first litmus test would be to check if nimbus-eth1 and nimbus-eth2 still compile after the change - if these to projects compile, it's mostly fine for a library like json-rpc where we don't yet carefully maintain backwards compat |
||
| if node{"jsonrpc"}.getStr() != "2.0": | ||
| return wrapError(INVALID_REQUEST, "'jsonrpc' missing or invalid") | ||
| return some(wrapError(INVALID_REQUEST, "'jsonrpc' missing or invalid")) | ||
|
|
||
| let id = node{"id"} | ||
| if id == nil: | ||
| return wrapError(INVALID_REQUEST, "'id' missing or invalid") | ||
|
|
||
| let methodName = node{"method"}.getStr() | ||
| if methodName.len == 0: | ||
| return wrapError(INVALID_REQUEST, "'method' missing or invalid") | ||
| return some(wrapError(INVALID_REQUEST, "'method' missing or invalid")) | ||
|
|
||
| let rpcProc = router.procs.getOrDefault(methodName) | ||
| let params = node.getOrDefault("params") | ||
|
|
||
| if rpcProc == nil: | ||
| return wrapError(METHOD_NOT_FOUND, "'" & methodName & "' is not a registered RPC method", id) | ||
| return some(wrapError(METHOD_NOT_FOUND, "'" & methodName & "' is not a registered RPC method", id)) | ||
| else: | ||
| try: | ||
| let params = if router.fullParams: | ||
| node | ||
| else: | ||
| node.getOrDefault("params") | ||
|
|
||
| let res = await rpcProc(if params == nil: newJArray() else: params) | ||
| return wrapReply(id, res) | ||
|
|
||
| return res.map((s) => wrapReply(id, s)); | ||
| except InvalidRequest as err: | ||
| return wrapError(err.code, err.msg) | ||
| debug "Error occurred within RPC", methodName = methodName, err = err.msg | ||
| return some(wrapError(err.code, err.msg)) | ||
| except CatchableError as err: | ||
| debug "Error occurred within RPC", methodName = methodName, err = err.msg | ||
| return wrapError( | ||
| SERVER_ERROR, methodName & " raised an exception", id, newJString(err.msg)) | ||
| return some(wrapError( | ||
| SERVER_ERROR, methodName & " raised an exception", id, newJString(err.msg))) | ||
|
|
||
| proc route*(router: RpcRouter, data: string): Future[string] {.async, gcsafe.} = | ||
| proc route*(router: RpcRouter, data: string): Future[RpcResult] {.async, gcsafe.} = | ||
| ## Route to RPC from string data. Data is expected to be able to be converted to Json. | ||
| ## Returns string of Json from RPC result/error node | ||
| let node = | ||
| try: parseJson(data) | ||
| except CatchableError as err: | ||
| return string(wrapError(JSON_PARSE_ERROR, err.msg)) | ||
| return some(wrapError(JSON_PARSE_ERROR, err.msg)) | ||
| except Exception as err: | ||
| # TODO https://github.com/status-im/nimbus-eth2/issues/2430 | ||
| return string(wrapError(JSON_PARSE_ERROR, err.msg)) | ||
| return some(wrapError(JSON_PARSE_ERROR, err.msg)) | ||
|
|
||
| return string(await router.route(node)) | ||
| return await router.route(node); | ||
|
|
||
| proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[StringOfJson]): bool = | ||
| proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[RpcResult]): bool = | ||
| ## Route to RPC, returns false if the method or params cannot be found. | ||
| ## Expects json input and returns json output. | ||
| let | ||
|
|
@@ -116,16 +127,6 @@ proc tryRoute*(router: RpcRouter, data: JsonNode, fut: var Future[StringOfJson]) | |
| fut = rpc(jParams) | ||
| return true | ||
|
|
||
| proc makeProcName(s: string): string = | ||
| result = "" | ||
| for c in s: | ||
| if c.isAlphaNumeric: result.add c | ||
|
|
||
| proc hasReturnType(params: NimNode): bool = | ||
| if params != nil and params.len > 0 and params[0] != nil and | ||
| params[0].kind != nnkEmpty: | ||
| result = true | ||
|
|
||
| macro rpc*(server: RpcRouter, path: string, body: untyped): untyped = | ||
| ## Define a remote procedure call. | ||
| ## Input and return parameters are defined using the ``do`` notation. | ||
|
|
@@ -159,16 +160,16 @@ macro rpc*(server: RpcRouter, path: string, body: untyped): untyped = | |
| if ReturnType == ident"JsonNode": | ||
| # `JsonNode` results don't need conversion | ||
| result.add quote do: | ||
| proc `rpcProcWrapper`(`paramsIdent`: JsonNode): Future[StringOfJson] {.async, gcsafe.} = | ||
| return StringOfJson($(await `rpcProcImpl`(`paramsIdent`))) | ||
| proc `rpcProcWrapper`(`paramsIdent`: JsonNode): Future[RpcResult] {.async, raises: [Defect, CatchableError, Exception].} = | ||
| return some(StringOfJson($(await `rpcProcImpl`(`paramsIdent`)))) | ||
| elif ReturnType == ident"StringOfJson": | ||
| result.add quote do: | ||
| proc `rpcProcWrapper`(`paramsIdent`: JsonNode): Future[StringOfJson] {.async, gcsafe.} = | ||
| return await `rpcProcImpl`(`paramsIdent`) | ||
| proc `rpcProcWrapper`(`paramsIdent`: JsonNode): Future[RpcResult] {.async, raises: [Defect, CatchableError, Exception].} = | ||
| return some(await `rpcProcImpl`(`paramsIdent`)) | ||
| else: | ||
| result.add quote do: | ||
| proc `rpcProcWrapper`(`paramsIdent`: JsonNode): Future[StringOfJson] {.async, gcsafe.} = | ||
| return StringOfJson($(%(await `rpcProcImpl`(`paramsIdent`)))) | ||
| proc `rpcProcWrapper`(`paramsIdent`: JsonNode): Future[RpcResult] {.async, raises: [Defect, CatchableError, Exception].} = | ||
| return some(StringOfJson($(%(await `rpcProcImpl`(`paramsIdent`))))) | ||
|
|
||
| result.add quote do: | ||
| `server`.register(`path`, `rpcProcWrapper`) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,3 @@ | ||
| import server | ||
| import server, chronos | ||
| import servers/[socketserver, httpserver, websocketserver] | ||
| export server, socketserver, httpserver, websocketserver | ||
| export server, socketserver, httpserver, websocketserver, chronos |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,10 @@ | ||
| import | ||
| std/tables, | ||
| chronos, | ||
| faststreams/async_backend, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why would fs be needed here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to support the ability to compile in chronos/asyncdispatch mode(i. e. providing async)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but faststreams is not used in this module? perhaps you need an export somewhere else
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. faststream/async_backend is used only to export conditionally
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right - so with |
||
| ./router, | ||
| ./jsonmarshal | ||
|
|
||
| export chronos, jsonmarshal, router | ||
| export jsonmarshal, router | ||
|
|
||
| type | ||
| RpcServer* = ref object of RootRef | ||
|
|
@@ -23,12 +23,12 @@ template hasMethod*(server: RpcServer, methodName: string): bool = | |
|
|
||
| proc executeMethod*(server: RpcServer, | ||
| methodName: string, | ||
| args: JsonNode): Future[StringOfJson] = | ||
| server.router.procs[methodName](args) | ||
| args: JsonNode): Future[StringOfJson] {.async} = | ||
| return (await server.router.procs[methodName](args)).get | ||
|
|
||
| # Wrapper for message processing | ||
|
|
||
| proc route*(server: RpcServer, line: string): Future[string] {.gcsafe.} = | ||
| proc route*(server: RpcServer, line: string): Future[RpcResult] {.gcsafe.} = | ||
| server.router.route(line) | ||
|
|
||
| # Server registration | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be added.