Skip to content

Commit 9921286

Browse files
authored
refactor(client): use native async fn in traits instead async_trait crate (#1551)
* refactor(client): ClientT/SubscribeClienT use native async fns * fix ui tests * refactor(client): use async native traits in transports * macros(client): use native async fn in traits * fix ui tests
1 parent e64bccf commit 9921286

File tree

9 files changed

+308
-295
lines changed

9 files changed

+308
-295
lines changed

client/http-client/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ publish = true
1717
workspace = true
1818

1919
[dependencies]
20-
async-trait = { workspace = true }
2120
base64 = { workspace = true }
2221
hyper = { workspace = true, features = ["client", "http1", "http2"] }
2322
hyper-rustls = { workspace = true, features = ["http1", "http2", "tls12", "logging", "ring"], optional = true }

client/http-client/src/client.rs

+102-96
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use std::time::Duration;
3232
use crate::rpc_service::RpcService;
3333
use crate::transport::{self, Error as TransportError, HttpBackend, HttpTransportClientBuilder};
3434
use crate::{HttpRequest, HttpResponse};
35-
use async_trait::async_trait;
3635
use hyper::body::Bytes;
3736
use hyper::http::{Extensions, HeaderMap};
3837
use jsonrpsee_core::client::{
@@ -350,150 +349,157 @@ impl HttpClient<HttpBackend> {
350349
}
351350
}
352351

353-
#[async_trait]
354352
impl<S> ClientT for HttpClient<S>
355353
where
356354
S: RpcServiceT<Error = Error, Response = MethodResponse> + Send + Sync,
357355
{
358-
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
356+
fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
359357
where
360358
Params: ToRpcParams + Send,
361359
{
362-
let _permit = match self.request_guard.as_ref() {
363-
Some(permit) => permit.acquire().await.ok(),
364-
None => None,
365-
};
366-
let params = params.to_rpc_params()?.map(StdCow::Owned);
367-
368-
run_future_until_timeout(
369-
self.service.notification(Notification::new(method.into(), params)),
370-
self.request_timeout,
371-
)
372-
.await
373-
.map_err(|e| Error::Transport(e.into()))?;
374-
Ok(())
360+
async {
361+
let _permit = match self.request_guard.as_ref() {
362+
Some(permit) => permit.acquire().await.ok(),
363+
None => None,
364+
};
365+
let params = params.to_rpc_params()?.map(StdCow::Owned);
366+
367+
run_future_until_timeout(
368+
self.service.notification(Notification::new(method.into(), params)),
369+
self.request_timeout,
370+
)
371+
.await
372+
.map_err(|e| Error::Transport(e.into()))?;
373+
Ok(())
374+
}
375375
}
376376

377-
async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
377+
fn request<R, Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<R, Error>> + Send
378378
where
379379
R: DeserializeOwned,
380380
Params: ToRpcParams + Send,
381381
{
382-
let _permit = match self.request_guard.as_ref() {
383-
Some(permit) => permit.acquire().await.ok(),
384-
None => None,
385-
};
386-
let id = self.id_manager.next_request_id();
387-
let params = params.to_rpc_params()?;
388-
389-
let method_response = run_future_until_timeout(
390-
self.service.call(Request::borrowed(method, params.as_deref(), id.clone())),
391-
self.request_timeout,
392-
)
393-
.await?
394-
.into_method_call()
395-
.expect("Method call must return a method call reponse; qed");
396-
397-
let rp = ResponseSuccess::try_from(method_response.into_inner())?;
398-
399-
let result = serde_json::from_str(rp.result.get()).map_err(Error::ParseError)?;
400-
if rp.id == id { Ok(result) } else { Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()).into()) }
382+
async {
383+
let _permit = match self.request_guard.as_ref() {
384+
Some(permit) => permit.acquire().await.ok(),
385+
None => None,
386+
};
387+
let id = self.id_manager.next_request_id();
388+
let params = params.to_rpc_params()?;
389+
390+
let method_response = run_future_until_timeout(
391+
self.service.call(Request::borrowed(method, params.as_deref(), id.clone())),
392+
self.request_timeout,
393+
)
394+
.await?
395+
.into_method_call()
396+
.expect("Method call must return a method call reponse; qed");
397+
398+
let rp = ResponseSuccess::try_from(method_response.into_inner())?;
399+
400+
let result = serde_json::from_str(rp.result.get()).map_err(Error::ParseError)?;
401+
if rp.id == id { Ok(result) } else { Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()).into()) }
402+
}
401403
}
402404

403-
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
405+
fn batch_request<'a, R>(
406+
&self,
407+
batch: BatchRequestBuilder<'a>,
408+
) -> impl Future<Output = Result<BatchResponse<'a, R>, Error>> + Send
404409
where
405410
R: DeserializeOwned + fmt::Debug + 'a,
406411
{
407-
let _permit = match self.request_guard.as_ref() {
408-
Some(permit) => permit.acquire().await.ok(),
409-
None => None,
410-
};
411-
let batch = batch.build()?;
412-
let id = self.id_manager.next_request_id();
413-
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
414-
415-
let mut batch_request = Batch::with_capacity(batch.len());
416-
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
417-
let id = self.id_manager.as_id_kind().into_id(id);
418-
let req = Request {
419-
jsonrpc: TwoPointZero,
420-
method: method.into(),
421-
params: params.map(StdCow::Owned),
422-
id,
423-
extensions: Extensions::new(),
412+
async {
413+
let _permit = match self.request_guard.as_ref() {
414+
Some(permit) => permit.acquire().await.ok(),
415+
None => None,
424416
};
425-
batch_request.push(req);
426-
}
427-
428-
let rp = run_future_until_timeout(self.service.batch(batch_request), self.request_timeout).await?;
429-
let json_rps = rp.into_batch().expect("Batch must return a batch reponse; qed");
417+
let batch = batch.build()?;
418+
let id = self.id_manager.next_request_id();
419+
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
420+
421+
let mut batch_request = Batch::with_capacity(batch.len());
422+
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
423+
let id = self.id_manager.as_id_kind().into_id(id);
424+
let req = Request {
425+
jsonrpc: TwoPointZero,
426+
method: method.into(),
427+
params: params.map(StdCow::Owned),
428+
id,
429+
extensions: Extensions::new(),
430+
};
431+
batch_request.push(req);
432+
}
430433

431-
let mut batch_response = Vec::new();
432-
let mut success = 0;
433-
let mut failed = 0;
434+
let rp = run_future_until_timeout(self.service.batch(batch_request), self.request_timeout).await?;
435+
let json_rps = rp.into_batch().expect("Batch must return a batch reponse; qed");
434436

435-
// Fill the batch response with placeholder values.
436-
for _ in 0..json_rps.len() {
437-
batch_response.push(Err(ErrorObject::borrowed(0, "", None)));
438-
}
437+
let mut batch_response = Vec::new();
438+
let mut success = 0;
439+
let mut failed = 0;
439440

440-
for rp in json_rps.into_iter() {
441-
let id = rp.id().try_parse_inner_as_number()?;
441+
// Fill the batch response with placeholder values.
442+
for _ in 0..json_rps.len() {
443+
batch_response.push(Err(ErrorObject::borrowed(0, "", None)));
444+
}
442445

443-
let res = match ResponseSuccess::try_from(rp.into_inner()) {
444-
Ok(r) => {
445-
let v = serde_json::from_str(r.result.get()).map_err(Error::ParseError)?;
446-
success += 1;
447-
Ok(v)
448-
}
449-
Err(err) => {
450-
failed += 1;
451-
Err(err)
446+
for rp in json_rps.into_iter() {
447+
let id = rp.id().try_parse_inner_as_number()?;
448+
449+
let res = match ResponseSuccess::try_from(rp.into_inner()) {
450+
Ok(r) => {
451+
let v = serde_json::from_str(r.result.get()).map_err(Error::ParseError)?;
452+
success += 1;
453+
Ok(v)
454+
}
455+
Err(err) => {
456+
failed += 1;
457+
Err(err)
458+
}
459+
};
460+
461+
let maybe_elem = id
462+
.checked_sub(id_range.start)
463+
.and_then(|p| p.try_into().ok())
464+
.and_then(|p: usize| batch_response.get_mut(p));
465+
466+
if let Some(elem) = maybe_elem {
467+
*elem = res;
468+
} else {
469+
return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
452470
}
453-
};
454-
455-
let maybe_elem = id
456-
.checked_sub(id_range.start)
457-
.and_then(|p| p.try_into().ok())
458-
.and_then(|p: usize| batch_response.get_mut(p));
459-
460-
if let Some(elem) = maybe_elem {
461-
*elem = res;
462-
} else {
463-
return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
464471
}
465-
}
466472

467-
Ok(BatchResponse::new(success, batch_response, failed))
473+
Ok(BatchResponse::new(success, batch_response, failed))
474+
}
468475
}
469476
}
470477

471-
#[async_trait]
472478
impl<S> SubscriptionClientT for HttpClient<S>
473479
where
474480
S: RpcServiceT<Error = Error, Response = MethodResponse> + Send + Sync,
475481
{
476482
/// Send a subscription request to the server. Not implemented for HTTP; will always return
477483
/// [`Error::HttpNotImplemented`].
478-
async fn subscribe<'a, N, Params>(
484+
fn subscribe<'a, N, Params>(
479485
&self,
480486
_subscribe_method: &'a str,
481487
_params: Params,
482488
_unsubscribe_method: &'a str,
483-
) -> Result<Subscription<N>, Error>
489+
) -> impl Future<Output = Result<Subscription<N>, Error>>
484490
where
485491
Params: ToRpcParams + Send,
486492
N: DeserializeOwned,
487493
{
488-
Err(Error::HttpNotImplemented)
494+
async { Err(Error::HttpNotImplemented) }
489495
}
490496

491497
/// Subscribe to a specific method. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`].
492-
async fn subscribe_to_method<'a, N>(&self, _method: &'a str) -> Result<Subscription<N>, Error>
498+
fn subscribe_to_method<N>(&self, _method: &str) -> impl Future<Output = Result<Subscription<N>, Error>>
493499
where
494500
N: DeserializeOwned,
495501
{
496-
Err(Error::HttpNotImplemented)
502+
async { Err(Error::HttpNotImplemented) }
497503
}
498504
}
499505

client/transport/src/web.rs

+15-14
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use futures_channel::mpsc;
44
use futures_util::sink::SinkExt;
55
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
66
use gloo_net::websocket::{Message, WebSocketError, futures::WebSocket};
7-
use jsonrpsee_core::async_trait;
87
use jsonrpsee_core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
98

109
/// Web-sys transport error that can occur.
@@ -45,28 +44,30 @@ impl fmt::Debug for Receiver {
4544
}
4645
}
4746

48-
#[async_trait(?Send)]
4947
impl TransportSenderT for Sender {
5048
type Error = Error;
5149

52-
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
53-
self.0.send(Message::Text(msg)).await.map_err(|e| Error::WebSocket(e))?;
54-
Ok(())
50+
fn send(&mut self, msg: String) -> impl Future<Output = Result<(), Self::Error>> {
51+
async {
52+
self.0.send(Message::Text(msg)).await.map_err(|e| Error::WebSocket(e))?;
53+
Ok(())
54+
}
5555
}
5656
}
5757

58-
#[async_trait(?Send)]
5958
impl TransportReceiverT for Receiver {
6059
type Error = Error;
6160

62-
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
63-
match self.0.next().await {
64-
Some(Ok(msg)) => match msg {
65-
Message::Bytes(bytes) => Ok(ReceivedMessage::Bytes(bytes)),
66-
Message::Text(txt) => Ok(ReceivedMessage::Text(txt)),
67-
},
68-
Some(Err(err)) => Err(Error::WebSocket(err)),
69-
None => Err(Error::SenderDisconnected),
61+
fn receive(&mut self) -> impl Future<Output = Result<ReceivedMessage, Self::Error>> {
62+
async {
63+
match self.0.next().await {
64+
Some(Ok(msg)) => match msg {
65+
Message::Bytes(bytes) => Ok(ReceivedMessage::Bytes(bytes)),
66+
Message::Text(txt) => Ok(ReceivedMessage::Text(txt)),
67+
},
68+
Some(Err(err)) => Err(Error::WebSocket(err)),
69+
None => Err(Error::SenderDisconnected),
70+
}
7071
}
7172
}
7273
}

0 commit comments

Comments
 (0)