Skip to content

Commit ad26147

Browse files
committed
Remove cancel task, which was causing a circular reference
Provide user with a weak reference to Requester, when in Server mode, to allow socket to close properly
1 parent fd43000 commit ad26147

File tree

4 files changed

+326
-230
lines changed

4 files changed

+326
-230
lines changed

rsocket/src/core/client.rs

+16-15
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ use crate::payload::{Payload, SetupPayload, SetupPayloadBuilder};
1414
use crate::runtime;
1515
use crate::spi::{ClientResponder, Flux, RSocket};
1616
use crate::transport::{
17-
self, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport,
17+
self, Connection, DuplexSocket, FrameSink, FrameStream, ClientRequester, Splitter, Transport,
1818
};
1919
use crate::Result;
2020

2121
#[derive(Clone)]
2222
pub struct Client {
2323
closed: Arc<Notify>,
24-
socket: DuplexSocket,
24+
requester: ClientRequester,
2525
closing: mpsc::Sender<()>,
2626
}
2727

@@ -130,9 +130,9 @@ where
130130

131131
let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::<Frame>();
132132
let cloned_snd_tx = snd_tx.clone();
133-
let mut socket = DuplexSocket::new(1, snd_tx, splitter).await;
133+
let mut socket = DuplexSocket::new(1, snd_tx, splitter);
134134

135-
let mut cloned_socket = socket.clone();
135+
let requester = socket.client_requester();
136136

137137
if let Some(f) = self.responder {
138138
let responder = f();
@@ -211,10 +211,13 @@ where
211211
}
212212
});
213213

214+
215+
socket.setup(setup).await?;
216+
214217
// process frames
215218
runtime::spawn(async move {
216219
while let Some(next) = read_rx.recv().await {
217-
if let Err(e) = cloned_socket.dispatch(next, None).await {
220+
if let Err(e) = socket.dispatch(next, None).await {
218221
error!("dispatch frame failed: {}", e);
219222
break;
220223
}
@@ -237,16 +240,14 @@ where
237240
}
238241
});
239242

240-
socket.setup(setup).await?;
241-
242-
Ok(Client::new(socket, close_notify, closing))
243+
Ok(Client::new(requester, close_notify, closing))
243244
}
244245
}
245246

246247
impl Client {
247-
fn new(socket: DuplexSocket, closed: Arc<Notify>, closing: mpsc::Sender<()>) -> Client {
248+
fn new(requester: ClientRequester, closed: Arc<Notify>, closing: mpsc::Sender<()>) -> Client {
248249
Client {
249-
socket,
250+
requester,
250251
closed,
251252
closing,
252253
}
@@ -260,22 +261,22 @@ impl Client {
260261
#[async_trait]
261262
impl RSocket for Client {
262263
async fn metadata_push(&self, req: Payload) -> Result<()> {
263-
self.socket.metadata_push(req).await
264+
self.requester.metadata_push(req).await
264265
}
265266

266267
async fn fire_and_forget(&self, req: Payload) -> Result<()> {
267-
self.socket.fire_and_forget(req).await
268+
self.requester.fire_and_forget(req).await
268269
}
269270

270271
async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
271-
self.socket.request_response(req).await
272+
self.requester.request_response(req).await
272273
}
273274

274275
fn request_stream(&self, req: Payload) -> Flux<Result<Payload>> {
275-
self.socket.request_stream(req)
276+
self.requester.request_stream(req)
276277
}
277278

278279
fn request_channel(&self, reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
279-
self.socket.request_channel(reqs)
280+
self.requester.request_channel(reqs)
280281
}
281282
}

rsocket/src/core/server.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ where
114114

115115
// Init duplex socket.
116116
let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::<Frame>();
117-
let mut socket = DuplexSocket::new(0, snd_tx, splitter).await;
117+
let mut socket = DuplexSocket::new(0, snd_tx, splitter);
118118

119119
// Begin loop for writing frames.
120120
runtime::spawn(async move {
@@ -154,7 +154,6 @@ where
154154
break;
155155
}
156156
}
157-
158157
Ok(())
159158
}
160159
}

rsocket/src/transport/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ mod socket;
44
mod spi;
55

66
pub(crate) use fragmentation::{Joiner, Splitter, MIN_MTU};
7-
pub(crate) use socket::DuplexSocket;
7+
pub(crate) use socket::{ClientRequester,DuplexSocket};
88
pub use spi::*;

0 commit comments

Comments
 (0)