From cb8add2e888a56538e9c872f891cc134702fc287 Mon Sep 17 00:00:00 2001 From: Thomas BESSOU Date: Mon, 28 Oct 2019 19:57:36 +0100 Subject: [PATCH] Keep the Channel in clients receivers & senders Channel contains the Arc so that ```rust let streaming_stuff = Client::new(init_channel()).some_call(); ``` does not die due to channel being dropped at the end of the line. Signed-off-by: Thomas Bessou --- src/call/client.rs | 50 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/src/call/client.rs b/src/call/client.rs index e9aec9e86..061e22564 100644 --- a/src/call/client.rs +++ b/src/call/client.rs @@ -129,7 +129,12 @@ impl Call { tag, ) }); - Ok(ClientUnaryReceiver::new(call, cq_f, method.resp_de())) + Ok(ClientUnaryReceiver::new( + call, + cq_f, + method.resp_de(), + channel, + )) } pub fn client_streaming( @@ -151,11 +156,12 @@ impl Call { }); let share_call = Arc::new(SpinLock::new(ShareCall::new(call, cq_f))); - let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser()); + let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser(), channel); let recv = ClientCStreamReceiver { call: share_call, resp_de: method.resp_de(), finished: false, + _channel_keepalive: channel.clone(), }; Ok((sink, recv)) } @@ -189,7 +195,12 @@ impl Call { grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag) }); - Ok(ClientSStreamReceiver::new(call, cq_f, method.resp_de())) + Ok(ClientSStreamReceiver::new( + call, + cq_f, + method.resp_de(), + channel, + )) } pub fn duplex_streaming( @@ -216,8 +227,8 @@ impl Call { }); let share_call = Arc::new(SpinLock::new(ShareCall::new(call, cq_f))); - let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser()); - let recv = ClientDuplexReceiver::new(share_call, method.resp_de()); + let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser(), channel); + let recv = ClientDuplexReceiver::new(share_call, method.resp_de(), channel); Ok((sink, recv)) } } @@ -230,14 +241,21 @@ pub struct ClientUnaryReceiver { call: Call, resp_f: BatchFuture, resp_de: DeserializeFn, + _channel_keepalive: Channel, } impl ClientUnaryReceiver { - fn new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn) -> ClientUnaryReceiver { + fn new( + call: Call, + resp_f: BatchFuture, + resp_de: DeserializeFn, + _channel_keepalive: &Channel, + ) -> ClientUnaryReceiver { ClientUnaryReceiver { call, resp_f, resp_de, + _channel_keepalive: _channel_keepalive.clone(), } } @@ -276,6 +294,7 @@ pub struct ClientCStreamReceiver { call: Arc>, resp_de: DeserializeFn, finished: bool, + _channel_keepalive: Channel, } impl ClientCStreamReceiver { @@ -326,15 +345,21 @@ pub struct StreamingCallSink { sink_base: SinkBase, close_f: Option, req_ser: SerializeFn, + _channel_keepalive: Channel, } impl StreamingCallSink { - fn new(call: Arc>, req_ser: SerializeFn) -> StreamingCallSink { + fn new( + call: Arc>, + req_ser: SerializeFn, + _channel_keepalive: &Channel, + ) -> StreamingCallSink { StreamingCallSink { call, sink_base: SinkBase::new(false), close_f: None, req_ser, + _channel_keepalive: _channel_keepalive.clone(), } } @@ -490,6 +515,7 @@ impl ResponseStreamImpl { #[must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC"] pub struct ClientSStreamReceiver { imp: ResponseStreamImpl, + _channel_keepalive: Channel, } impl ClientSStreamReceiver { @@ -497,10 +523,12 @@ impl ClientSStreamReceiver { call: Call, finish_f: BatchFuture, de: DeserializeFn, + _channel_keepalive: &Channel, ) -> ClientSStreamReceiver { let share_call = ShareCall::new(call, finish_f); ClientSStreamReceiver { imp: ResponseStreamImpl::new(share_call, de), + _channel_keepalive: _channel_keepalive.clone(), } } @@ -528,12 +556,18 @@ impl Stream for ClientSStreamReceiver { #[must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC"] pub struct ClientDuplexReceiver { imp: ResponseStreamImpl>, Resp>, + _channel_keepalive: Channel, } impl ClientDuplexReceiver { - fn new(call: Arc>, de: DeserializeFn) -> ClientDuplexReceiver { + fn new( + call: Arc>, + de: DeserializeFn, + _channel_keepalive: &Channel, + ) -> ClientDuplexReceiver { ClientDuplexReceiver { imp: ResponseStreamImpl::new(call, de), + _channel_keepalive: _channel_keepalive.clone(), } }