Skip to content

Commit da992b4

Browse files
committed
refactor: make RpcCall an IntoFuture
1 parent 614d6db commit da992b4

File tree

1 file changed

+42
-176
lines changed

1 file changed

+42
-176
lines changed

crates/rpc-client/src/call.rs

+42-176
Original file line numberDiff line numberDiff line change
@@ -1,116 +1,15 @@
11
use alloy_json_rpc::{
2-
transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcParam,
3-
RpcResult, RpcReturn,
2+
transform_response, try_deserialize_ok, Request, ResponsePacket, RpcParam, RpcReturn,
43
};
54
use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult};
65
use core::panic;
7-
use futures::FutureExt;
8-
use serde_json::value::RawValue;
96
use std::{
107
fmt,
11-
future::Future,
8+
future::{Future, IntoFuture},
129
marker::PhantomData,
13-
pin::Pin,
14-
task::{self, ready, Poll::Ready},
1510
};
1611
use tower::Service;
1712

18-
/// The states of the [`RpcCall`] future.
19-
#[must_use = "futures do nothing unless you `.await` or poll them"]
20-
#[pin_project::pin_project(project = CallStateProj)]
21-
enum CallState<Params>
22-
where
23-
Params: RpcParam,
24-
{
25-
Prepared {
26-
request: Option<Request<Params>>,
27-
connection: BoxTransport,
28-
},
29-
AwaitingResponse {
30-
#[pin]
31-
fut: <BoxTransport as Service<RequestPacket>>::Future,
32-
},
33-
Complete,
34-
}
35-
36-
impl<Params> Clone for CallState<Params>
37-
where
38-
Params: RpcParam,
39-
{
40-
fn clone(&self) -> Self {
41-
match self {
42-
Self::Prepared { request, connection } => {
43-
Self::Prepared { request: request.clone(), connection: connection.clone() }
44-
}
45-
_ => panic!("cloned after dispatch"),
46-
}
47-
}
48-
}
49-
50-
impl<Params> fmt::Debug for CallState<Params>
51-
where
52-
Params: RpcParam,
53-
{
54-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55-
f.write_str(match self {
56-
Self::Prepared { .. } => "Prepared",
57-
Self::AwaitingResponse { .. } => "AwaitingResponse",
58-
Self::Complete => "Complete",
59-
})
60-
}
61-
}
62-
63-
impl<Params> Future for CallState<Params>
64-
where
65-
Params: RpcParam,
66-
{
67-
type Output = TransportResult<Box<RawValue>>;
68-
69-
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
70-
loop {
71-
match self.as_mut().project() {
72-
CallStateProj::Prepared { connection, request } => {
73-
if let Err(e) =
74-
task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
75-
{
76-
self.set(Self::Complete);
77-
return Ready(RpcResult::Err(e));
78-
}
79-
80-
let request = request.take().expect("no request");
81-
debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
82-
trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
83-
let request = request.serialize();
84-
let fut = match request {
85-
Ok(request) => {
86-
trace!(request=%request.serialized(), "serialized request");
87-
connection.call(request.into())
88-
}
89-
Err(err) => {
90-
trace!(?err, "failed to serialize request");
91-
self.set(Self::Complete);
92-
return Ready(RpcResult::Err(TransportError::ser_err(err)));
93-
}
94-
};
95-
self.set(Self::AwaitingResponse { fut });
96-
}
97-
CallStateProj::AwaitingResponse { fut } => {
98-
let res = match task::ready!(fut.poll(cx)) {
99-
Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
100-
Err(e) => Ready(RpcResult::Err(e)),
101-
_ => panic!("received batch response from single request"),
102-
};
103-
self.set(Self::Complete);
104-
return res;
105-
}
106-
CallStateProj::Complete => {
107-
panic!("Polled after completion");
108-
}
109-
}
110-
}
111-
}
112-
}
113-
11413
/// A prepared, but unsent, RPC call.
11514
///
11615
/// This is a future that will send the request when polled. It contains a
@@ -130,26 +29,21 @@ where
13029
/// batch request must immediately erase the `Param` type to allow batching of
13130
/// requests with different `Param` types, while the `RpcCall` may do so lazily.
13231
#[must_use = "futures do nothing unless you `.await` or poll them"]
133-
#[pin_project::pin_project]
13432
#[derive(Clone)]
135-
pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
136-
where
137-
Params: RpcParam,
138-
Map: FnOnce(Resp) -> Output,
139-
{
140-
#[pin]
141-
state: CallState<Params>,
142-
map: Option<Map>,
33+
pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output> {
34+
request: Request<Params>,
35+
connection: BoxTransport,
36+
map: Map,
14337
_pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
14438
}
14539

146-
impl<Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Params, Resp, Output, Map>
40+
impl<Params, Resp, Output, Map> fmt::Debug for RpcCall<Params, Resp, Output, Map>
14741
where
14842
Params: RpcParam,
14943
Map: FnOnce(Resp) -> Output,
15044
{
151-
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
152-
f.debug_struct("RpcCall").field("state", &self.state).finish()
45+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46+
f.debug_struct("RpcCall").finish_non_exhaustive()
15347
}
15448
}
15549

@@ -158,13 +52,11 @@ where
15852
Params: RpcParam,
15953
{
16054
#[doc(hidden)]
161-
pub fn new(req: Request<Params>, connection: impl IntoBoxTransport) -> Self {
55+
pub fn new(request: Request<Params>, connection: impl IntoBoxTransport) -> Self {
16256
Self {
163-
state: CallState::Prepared {
164-
request: Some(req),
165-
connection: connection.into_box_transport(),
166-
},
167-
map: Some(std::convert::identity),
57+
request,
58+
connection: connection.into_box_transport(),
59+
map: std::convert::identity,
16860
_pd: PhantomData,
16961
}
17062
}
@@ -193,24 +85,16 @@ where
19385
where
19486
NewMap: FnOnce(Resp) -> NewOutput,
19587
{
196-
RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
88+
RpcCall { request: self.request, connection: self.connection, map, _pd: PhantomData }
19789
}
19890

19991
/// Returns `true` if the request is a subscription.
200-
///
201-
/// # Panics
202-
///
203-
/// Panics if called after the request has been sent.
20492
pub fn is_subscription(&self) -> bool {
20593
self.request().meta.is_subscription()
20694
}
20795

20896
/// Set the request to be a non-standard subscription (i.e. not
20997
/// "eth_subscribe").
210-
///
211-
/// # Panics
212-
///
213-
/// Panics if called after the request has been sent.
21498
pub fn set_is_subscription(&mut self) {
21599
self.request_mut().meta.set_is_subscription();
216100
}
@@ -224,49 +108,28 @@ where
224108
///
225109
/// This is useful for modifying the params after the request has been
226110
/// prepared.
227-
///
228-
/// # Panics
229-
///
230-
/// Panics if called after the request has been sent.
231111
pub fn params(&mut self) -> &mut Params {
232112
&mut self.request_mut().params
233113
}
234114

235115
/// Returns a reference to the request.
236-
///
237-
/// # Panics
238-
///
239-
/// Panics if called after the request has been sent.
240116
pub fn request(&self) -> &Request<Params> {
241-
let CallState::Prepared { request, .. } = &self.state else {
242-
panic!("Cannot get request after request has been sent");
243-
};
244-
request.as_ref().expect("no request in prepared")
117+
&self.request
245118
}
246119

247120
/// Returns a mutable reference to the request.
248-
///
249-
/// # Panics
250-
///
251-
/// Panics if called after the request has been sent.
252121
pub fn request_mut(&mut self) -> &mut Request<Params> {
253-
let CallState::Prepared { request, .. } = &mut self.state else {
254-
panic!("Cannot get request after request has been sent");
255-
};
256-
request.as_mut().expect("no request in prepared")
122+
&mut self.request
257123
}
258124

259125
/// Map the params of the request into a new type.
260126
pub fn map_params<NewParams: RpcParam>(
261127
self,
262-
map: impl Fn(Params) -> NewParams,
128+
map: impl FnOnce(Params) -> NewParams,
263129
) -> RpcCall<NewParams, Resp, Output, Map> {
264-
let CallState::Prepared { request, connection } = self.state else {
265-
panic!("Cannot get request after request has been sent");
266-
};
267-
let request = request.expect("no request in prepared").map_params(map);
268130
RpcCall {
269-
state: CallState::Prepared { request: Some(request), connection },
131+
request: self.request.map_params(map),
132+
connection: self.connection,
270133
map: self.map,
271134
_pd: PhantomData,
272135
}
@@ -285,13 +148,9 @@ where
285148
///
286149
/// Panics if called after the request has been polled.
287150
pub fn into_owned_params(self) -> RpcCall<Params::Owned, Resp, Output, Map> {
288-
let CallState::Prepared { request, connection } = self.state else {
289-
panic!("Cannot get params after request has been sent");
290-
};
291-
let request = request.expect("no request in prepared").into_owned_params();
292-
293151
RpcCall {
294-
state: CallState::Prepared { request: Some(request), connection },
152+
request: self.request.into_owned_params(),
153+
connection: self.connection,
295154
map: self.map,
296155
_pd: PhantomData,
297156
}
@@ -302,30 +161,37 @@ impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
302161
where
303162
Params: RpcParam + 'a,
304163
Resp: RpcReturn,
305-
Output: 'static,
164+
Output: 'a,
306165
Map: FnOnce(Resp) -> Output + Send + 'a,
307166
{
308167
/// Convert this future into a boxed, pinned future, erasing its type.
309168
pub fn boxed(self) -> RpcFut<'a, Output> {
310-
Box::pin(self)
169+
self.into_future()
170+
}
171+
172+
async fn do_call(self) -> TransportResult<Output> {
173+
let Self { request, mut connection, map, _pd: PhantomData } = self;
174+
std::future::poll_fn(|cx| connection.poll_ready(cx)).await?;
175+
let serialized_request = request.serialize().map_err(TransportError::ser_err)?;
176+
let response_packet = connection.call(serialized_request.into()).await?;
177+
let ResponsePacket::Single(response) = response_packet else {
178+
panic!("received batch response from single request")
179+
};
180+
try_deserialize_ok(transform_response(response)).map(map)
311181
}
312182
}
313183

314-
impl<Params, Resp, Output, Map> Future for RpcCall<Params, Resp, Output, Map>
184+
impl<'a, Params, Resp, Output, Map> IntoFuture for RpcCall<Params, Resp, Output, Map>
315185
where
316-
Params: RpcParam,
186+
Params: RpcParam + 'a,
317187
Resp: RpcReturn,
318-
Output: 'static,
319-
Map: FnOnce(Resp) -> Output,
188+
Output: 'a,
189+
Map: FnOnce(Resp) -> Output + Send + 'a,
320190
{
321-
type Output = TransportResult<Output>;
322-
323-
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
324-
trace!(?self.state, "polling RpcCall");
325-
326-
let this = self.get_mut();
327-
let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));
191+
type IntoFuture = RpcFut<'a, Output>;
192+
type Output = <Self::IntoFuture as Future>::Output;
328193

329-
Ready(resp.map(this.map.take().expect("polled after completion")))
194+
fn into_future(self) -> Self::IntoFuture {
195+
Box::pin(self.do_call())
330196
}
331197
}

0 commit comments

Comments
 (0)