Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Circular Reference issues #65

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions rsocket/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use crate::payload::{Payload, SetupPayload, SetupPayloadBuilder};
use crate::runtime;
use crate::spi::{ClientResponder, Flux, RSocket};
use crate::transport::{
self, Connection, DuplexSocket, FrameSink, FrameStream, Splitter, Transport,
self, Connection, DuplexSocket, FrameSink, FrameStream, ClientRequester, Splitter, Transport,
};
use crate::Result;

#[derive(Clone)]
pub struct Client {
closed: Arc<Notify>,
socket: DuplexSocket,
requester: ClientRequester,
closing: mpsc::Sender<()>,
}

Expand Down Expand Up @@ -130,9 +130,9 @@ where

let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::<Frame>();
let cloned_snd_tx = snd_tx.clone();
let mut socket = DuplexSocket::new(1, snd_tx, splitter).await;
let mut socket = DuplexSocket::new(1, snd_tx, splitter);

let mut cloned_socket = socket.clone();
let requester = socket.client_requester();

if let Some(f) = self.responder {
let responder = f();
Expand Down Expand Up @@ -211,10 +211,13 @@ where
}
});


socket.setup(setup).await?;

// process frames
runtime::spawn(async move {
while let Some(next) = read_rx.recv().await {
if let Err(e) = cloned_socket.dispatch(next, None).await {
if let Err(e) = socket.dispatch(next, None).await {
error!("dispatch frame failed: {}", e);
break;
}
Expand All @@ -237,16 +240,14 @@ where
}
});

socket.setup(setup).await?;

Ok(Client::new(socket, close_notify, closing))
Ok(Client::new(requester, close_notify, closing))
}
}

impl Client {
fn new(socket: DuplexSocket, closed: Arc<Notify>, closing: mpsc::Sender<()>) -> Client {
fn new(requester: ClientRequester, closed: Arc<Notify>, closing: mpsc::Sender<()>) -> Client {
Client {
socket,
requester,
closed,
closing,
}
Expand All @@ -260,22 +261,22 @@ impl Client {
#[async_trait]
impl RSocket for Client {
async fn metadata_push(&self, req: Payload) -> Result<()> {
self.socket.metadata_push(req).await
self.requester.metadata_push(req).await
}

async fn fire_and_forget(&self, req: Payload) -> Result<()> {
self.socket.fire_and_forget(req).await
self.requester.fire_and_forget(req).await
}

async fn request_response(&self, req: Payload) -> Result<Option<Payload>> {
self.socket.request_response(req).await
self.requester.request_response(req).await
}

fn request_stream(&self, req: Payload) -> Flux<Result<Payload>> {
self.socket.request_stream(req)
self.requester.request_stream(req)
}

fn request_channel(&self, reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> {
self.socket.request_channel(reqs)
self.requester.request_channel(reqs)
}
}
3 changes: 1 addition & 2 deletions rsocket/src/core/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where

// Init duplex socket.
let (snd_tx, mut snd_rx) = mpsc::unbounded_channel::<Frame>();
let mut socket = DuplexSocket::new(0, snd_tx, splitter).await;
let mut socket = DuplexSocket::new(0, snd_tx, splitter);

// Begin loop for writing frames.
runtime::spawn(async move {
Expand Down Expand Up @@ -154,7 +154,6 @@ where
break;
}
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion rsocket/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ mod socket;
mod spi;

pub(crate) use fragmentation::{Joiner, Splitter, MIN_MTU};
pub(crate) use socket::DuplexSocket;
pub(crate) use socket::{ClientRequester,DuplexSocket};
pub use spi::*;
Loading
Loading