Skip to content

Commit

Permalink
feat: remove cancel task, which was causing a circular reference (#65)
Browse files Browse the repository at this point in the history
Provide user with a weak reference to Requester, when in Server mode, to allow socket to close properly
  • Loading branch information
Xylaant authored Jun 26, 2024
1 parent fd43000 commit ebbafca
Show file tree
Hide file tree
Showing 4 changed files with 326 additions and 230 deletions.
31 changes: 16 additions & 15 deletions rsocket/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use crate::payload::{Payload, SetupPayload, SetupPayloadBuilder};
use crate::runtime;
use crate::spi::{ClientResponder, Flux, RSocket};
use crate::transport::{
self, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport,
self, Connection, DuplexSocket, FrameSink, FrameStream, ClientRequester, Splitter, Transport,
};
use crate::Result;

#[derive(Clone)]
pub struct Client {
closed: Arc<Notify>,
socket: DuplexSocket,
requester: ClientRequester,
closing: mpsc::Sender<()>,
}

Expand Down Expand Up @@ -130,9 +130,9 @@ where

let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::<Frame>();
let cloned_snd_tx = snd_tx.clone();
let mut socket = DuplexSocket::new(1, snd_tx, splitter).await;
let mut socket = DuplexSocket::new(1, snd_tx, splitter);

let mut cloned_socket = socket.clone();
let requester = socket.client_requester();

if let Some(f) = self.responder {
let responder = f();
Expand Down Expand Up @@ -211,10 +211,13 @@ where
}
});


socket.setup(setup).await?;

// process frames
runtime::spawn(async move {
while let Some(next) = read_rx.recv().await {
if let Err(e) = cloned_socket.dispatch(next, None).await {
if let Err(e) = socket.dispatch(next, None).await {
error!("dispatch frame failed: {}", e);
break;
}
Expand All @@ -237,16 +240,14 @@ where
}
});

socket.setup(setup).await?;

Ok(Client::new(socket, close_notify, closing))
Ok(Client::new(requester, close_notify, closing))
}
}

impl Client {
fn new(socket: DuplexSocket, closed: Arc<Notify>, closing: mpsc::Sender<()>) -> Client {
fn new(requester: ClientRequester, closed: Arc<Notify>, closing: mpsc::Sender<()>) -> Client {
Client {
socket,
requester,
closed,
closing,
}
Expand All @@ -260,22 +261,22 @@ impl Client {
#[async_trait]
impl RSocket for Client {
async fn metadata_push(&self, req: Payload) -> Result<()> {
self.socket.metadata_push(req).await
self.requester.metadata_push(req).await
}

async fn fire_and_forget(&self, req: Payload) -> Result<()> {
self.socket.fire_and_forget(req).await
self.requester.fire_and_forget(req).await
}

async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
self.socket.request_response(req).await
self.requester.request_response(req).await
}

fn request_stream(&self, req: Payload) -> Flux<Result<Payload>> {
self.socket.request_stream(req)
self.requester.request_stream(req)
}

fn request_channel(&self, reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
self.socket.request_channel(reqs)
self.requester.request_channel(reqs)
}
}
3 changes: 1 addition & 2 deletions rsocket/src/core/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where

// Init duplex socket.
let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::<Frame>();
let mut socket = DuplexSocket::new(0, snd_tx, splitter).await;
let mut socket = DuplexSocket::new(0, snd_tx, splitter);

// Begin loop for writing frames.
runtime::spawn(async move {
Expand Down Expand Up @@ -154,7 +154,6 @@ where
break;
}
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion rsocket/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ mod socket;
mod spi;

pub(crate) use fragmentation::{Joiner, Splitter, MIN_MTU};
pub(crate) use socket::DuplexSocket;
pub(crate) use socket::{ClientRequester,DuplexSocket};
pub use spi::*;
Loading

0 comments on commit ebbafca

Please sign in to comment.