diff --git a/sdk/src/binary/binary_client.rs b/sdk/src/binary/binary_client.rs index 8204ce679..1a8b5a014 100644 --- a/sdk/src/binary/binary_client.rs +++ b/sdk/src/binary/binary_client.rs @@ -1,5 +1,6 @@ use crate::client::Client; use crate::error::IggyError; +use crate::write_to::WriteTo; use async_trait::async_trait; use bytes::Bytes; @@ -16,11 +17,25 @@ pub enum ClientState { /// A client that can send and receive binary messages. #[async_trait] -pub trait BinaryClient: Client { +pub(crate) trait BinaryClient: Client { /// Gets the state of the client. async fn get_state(&self) -> ClientState; /// Sets the state of the client. async fn set_state(&self, state: ClientState); /// Sends a command and returns the response. - async fn send_with_response(&self, command: u32, payload: Bytes) -> Result; + async fn send_with_response( + &self, + command_code: u32, + payload: Bytes, + ) -> Result; + /// Sends a command and returns the response. + /// + /// No additional memory allocation is required, and the Command is serialized directly into the transmission stream. + async fn send_with_response_v2( + &self, + _command_code: u32, + _command: &(dyn WriteTo + Send + Sync), + ) -> Result { + unimplemented!("send_with_response_v2 is not implemented!") + } } diff --git a/sdk/src/binary/messages.rs b/sdk/src/binary/messages.rs index 890835e53..28c4a366c 100644 --- a/sdk/src/binary/messages.rs +++ b/sdk/src/binary/messages.rs @@ -20,7 +20,7 @@ impl MessageClient for B { async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError> { fail_if_not_authenticated(self).await?; - self.send_with_response(SEND_MESSAGES_CODE, command.as_bytes()) + self.send_with_response_v2(SEND_MESSAGES_CODE, command) .await?; Ok(()) } diff --git a/sdk/src/bytes_serializable.rs b/sdk/src/bytes_serializable.rs index 2b10a6df8..fb52c3605 100644 --- a/sdk/src/bytes_serializable.rs +++ b/sdk/src/bytes_serializable.rs @@ -11,4 +11,9 @@ pub trait BytesSerializable { fn from_bytes(bytes: Bytes) -> Result where Self: Sized; + + /// Computes the size of the struct in bytes. + fn size(&self) -> usize { + unimplemented!("size") + } } diff --git a/sdk/src/identifier.rs b/sdk/src/identifier.rs index 380a6083e..954bc3b33 100644 --- a/sdk/src/identifier.rs +++ b/sdk/src/identifier.rs @@ -169,6 +169,10 @@ impl Identifier { } impl BytesSerializable for Identifier { + fn size(&self) -> usize { + 2 + self.length as usize + } + fn as_bytes(&self) -> Bytes { let mut bytes = BytesMut::with_capacity(2 + self.length as usize); bytes.put_u8(self.kind.as_code()); diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 2347f76dd..420fc2d4f 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -31,3 +31,4 @@ pub mod topics; pub mod users; pub mod utils; pub mod validatable; +pub mod write_to; diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index e5de46d7b..5249c6e51 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -5,7 +5,10 @@ use crate::identifier::Identifier; use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE}; use crate::models::header; use crate::models::header::{HeaderKey, HeaderValue}; +use crate::tcp::client::ConnectionStream; use crate::validatable::Validatable; +use crate::write_to::WriteTo; +use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use serde_with::base64::Base64; @@ -293,6 +296,10 @@ impl Display for Message { } impl BytesSerializable for Partitioning { + fn size(&self) -> usize { + 2 + self.length as usize + } + fn as_bytes(&self) -> Bytes { let mut bytes = BytesMut::with_capacity(2 + self.length as usize); bytes.put_u8(self.kind.as_code()); @@ -324,6 +331,25 @@ impl BytesSerializable for Partitioning { } } +#[async_trait] +impl WriteTo for Message { + async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError> { + stream.write(self.id.to_le_bytes().as_ref()).await?; + if let Some(headers) = &self.headers { + let headers_bytes = headers.as_bytes(); + stream + .write((headers_bytes.len() as u32).to_le_bytes().as_ref()) + .await?; + stream.write(&headers_bytes).await?; + } else { + stream.write(0u32.to_le_bytes().as_ref()).await?; + } + stream.write(self.length.to_le_bytes().as_ref()).await?; + stream.write(self.payload.as_ref()).await?; + Ok(()) + } +} + impl BytesSerializable for Message { fn as_bytes(&self) -> Bytes { let mut bytes = BytesMut::with_capacity(self.get_size_bytes() as usize); @@ -397,7 +423,32 @@ impl FromStr for Message { } } +#[async_trait] +impl WriteTo for SendMessages { + async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError> { + stream.write(self.stream_id.as_bytes().as_ref()).await?; + stream.write(self.topic_id.as_bytes().as_ref()).await?; + stream.write(self.partitioning.as_bytes().as_ref()).await?; + for message in &self.messages { + message.write_to(stream).await?; + } + Ok(()) + } +} + impl BytesSerializable for SendMessages { + fn size(&self) -> usize { + let messages_size = self + .messages + .iter() + .map(Message::get_size_bytes) + .sum::(); + messages_size as usize + + self.partitioning.size() + + self.stream_id.size() + + self.topic_id.size() + } + fn as_bytes(&self) -> Bytes { let messages_size = self .messages diff --git a/sdk/src/tcp/client.rs b/sdk/src/tcp/client.rs index a82f2b123..43983a2d7 100644 --- a/sdk/src/tcp/client.rs +++ b/sdk/src/tcp/client.rs @@ -2,6 +2,7 @@ use crate::binary::binary_client::{BinaryClient, ClientState}; use crate::client::Client; use crate::error::{IggyError, IggyErrorDiscriminants}; use crate::tcp::config::TcpClientConfig; +use crate::write_to::WriteTo; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use std::fmt::Debug; @@ -36,7 +37,7 @@ unsafe impl Send for TcpClient {} unsafe impl Sync for TcpClient {} #[async_trait] -pub(crate) trait ConnectionStream: Debug + Sync + Send { +pub(crate) trait ConnectionStream: Debug + Sync + Send + Unpin { async fn read(&mut self, buf: &mut [u8]) -> Result; async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError>; async fn flush(&mut self) -> Result<(), IggyError>; @@ -214,6 +215,41 @@ impl BinaryClient for TcpClient { *self.state.lock().await = state; } + async fn send_with_response_v2( + &self, + command_code: u32, + command: &(dyn WriteTo + Send + Sync), + ) -> Result { + if self.get_state().await == ClientState::Disconnected { + return Err(IggyError::NotConnected); + } + + let mut stream = self.stream.lock().await; + if let Some(stream) = stream.as_mut() { + let payload_length = command.size() + REQUEST_INITIAL_BYTES_LENGTH; + trace!("Sending a TCP request..."); + stream.write(&(payload_length as u32).to_le_bytes()).await?; + stream.write(&command_code.to_le_bytes()).await?; + command.write_to(stream.as_mut()).await?; + stream.flush().await?; + trace!("Sent a TCP request, waiting for a response..."); + + let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + let read_bytes = stream.read(&mut response_buffer).await?; + if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { + error!("Received an invalid or empty response."); + return Err(IggyError::EmptyResponse); + } + + let status = u32::from_le_bytes(response_buffer[..4].try_into().unwrap()); + let length = u32::from_le_bytes(response_buffer[4..].try_into().unwrap()); + return self.handle_response(status, length, stream.as_mut()).await; + } + + error!("Cannot send data. Client is not connected."); + Err(IggyError::NotConnected) + } + async fn send_with_response(&self, command: u32, payload: Bytes) -> Result { if self.get_state().await == ClientState::Disconnected { return Err(IggyError::NotConnected); diff --git a/sdk/src/write_to.rs b/sdk/src/write_to.rs new file mode 100644 index 000000000..8965b5210 --- /dev/null +++ b/sdk/src/write_to.rs @@ -0,0 +1,12 @@ +use async_trait::async_trait; + +use crate::{ + bytes_serializable::BytesSerializable, error::IggyError, tcp::client::ConnectionStream, +}; + +/// The trait for serializing a struct into a stream. +#[async_trait] +pub(crate) trait WriteTo: BytesSerializable { + /// Serialize the struct into the stream. + async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError>; +}