Skip to content

Commit ce39622

Browse files
committed
Add WriteTo trait and implement it for Message struct
1 parent 442fa84 commit ce39622

File tree

8 files changed

+128
-4
lines changed

8 files changed

+128
-4
lines changed

sdk/src/binary/binary_client.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::client::Client;
22
use crate::error::IggyError;
3+
use crate::write_to::WriteTo;
34
use async_trait::async_trait;
45
use bytes::Bytes;
56

@@ -16,11 +17,25 @@ pub enum ClientState {
1617

1718
/// A client that can send and receive binary messages.
1819
#[async_trait]
19-
pub trait BinaryClient: Client {
20+
pub(crate) trait BinaryClient: Client {
2021
/// Gets the state of the client.
2122
async fn get_state(&self) -> ClientState;
2223
/// Sets the state of the client.
2324
async fn set_state(&self, state: ClientState);
2425
/// Sends a command and returns the response.
25-
async fn send_with_response(&self, command: u32, payload: Bytes) -> Result<Bytes, IggyError>;
26+
async fn send_with_response(
27+
&self,
28+
command_code: u32,
29+
payload: Bytes,
30+
) -> Result<Bytes, IggyError>;
31+
/// Sends a command and returns the response.
32+
///
33+
/// No additional memory allocation is required, and the Command is serialized directly into the transmission stream.
34+
async fn send_with_response_v2(
35+
&self,
36+
_command_code: u32,
37+
_command: &(dyn WriteTo + Send + Sync),
38+
) -> Result<Bytes, IggyError> {
39+
unimplemented!("send_with_response_v2 is not implemented!")
40+
}
2641
}

sdk/src/binary/messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ impl<B: BinaryClient> MessageClient for B {
2020

2121
async fn send_messages(&self, command: &mut SendMessages) -> Result<(), IggyError> {
2222
fail_if_not_authenticated(self).await?;
23-
self.send_with_response(SEND_MESSAGES_CODE, command.as_bytes())
23+
self.send_with_response_v2(SEND_MESSAGES_CODE, command)
2424
.await?;
2525
Ok(())
2626
}

sdk/src/bytes_serializable.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,9 @@ pub trait BytesSerializable {
1111
fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
1212
where
1313
Self: Sized;
14+
15+
/// Computes the size of the struct in bytes.
16+
fn size(&self) -> usize {
17+
unimplemented!("size")
18+
}
1419
}

sdk/src/identifier.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ impl Identifier {
169169
}
170170

171171
impl BytesSerializable for Identifier {
172+
fn size(&self) -> usize {
173+
2 + self.length as usize
174+
}
175+
172176
fn as_bytes(&self) -> Bytes {
173177
let mut bytes = BytesMut::with_capacity(2 + self.length as usize);
174178
bytes.put_u8(self.kind.as_code());

sdk/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ pub mod topics;
3131
pub mod users;
3232
pub mod utils;
3333
pub mod validatable;
34+
pub mod write_to;

sdk/src/messages/send_messages.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use crate::identifier::Identifier;
55
use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE};
66
use crate::models::header;
77
use crate::models::header::{HeaderKey, HeaderValue};
8+
use crate::tcp::client::ConnectionStream;
89
use crate::validatable::Validatable;
10+
use crate::write_to::WriteTo;
11+
use async_trait::async_trait;
912
use bytes::{BufMut, Bytes, BytesMut};
1013
use serde::{Deserialize, Serialize};
1114
use serde_with::base64::Base64;
@@ -293,6 +296,10 @@ impl Display for Message {
293296
}
294297

295298
impl BytesSerializable for Partitioning {
299+
fn size(&self) -> usize {
300+
2 + self.length as usize
301+
}
302+
296303
fn as_bytes(&self) -> Bytes {
297304
let mut bytes = BytesMut::with_capacity(2 + self.length as usize);
298305
bytes.put_u8(self.kind.as_code());
@@ -324,6 +331,25 @@ impl BytesSerializable for Partitioning {
324331
}
325332
}
326333

334+
#[async_trait]
335+
impl WriteTo for Message {
336+
async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError> {
337+
stream.write(self.id.to_le_bytes().as_ref()).await?;
338+
if let Some(headers) = &self.headers {
339+
let headers_bytes = headers.as_bytes();
340+
stream
341+
.write((headers_bytes.len() as u32).to_le_bytes().as_ref())
342+
.await?;
343+
stream.write(&headers_bytes).await?;
344+
} else {
345+
stream.write(0u32.to_le_bytes().as_ref()).await?;
346+
}
347+
stream.write(self.length.to_le_bytes().as_ref()).await?;
348+
stream.write(self.payload.as_ref()).await?;
349+
Ok(())
350+
}
351+
}
352+
327353
impl BytesSerializable for Message {
328354
fn as_bytes(&self) -> Bytes {
329355
let mut bytes = BytesMut::with_capacity(self.get_size_bytes() as usize);
@@ -397,7 +423,32 @@ impl FromStr for Message {
397423
}
398424
}
399425

426+
#[async_trait]
427+
impl WriteTo for SendMessages {
428+
async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError> {
429+
stream.write(self.stream_id.as_bytes().as_ref()).await?;
430+
stream.write(self.topic_id.as_bytes().as_ref()).await?;
431+
stream.write(self.partitioning.as_bytes().as_ref()).await?;
432+
for message in &self.messages {
433+
message.write_to(stream).await?;
434+
}
435+
Ok(())
436+
}
437+
}
438+
400439
impl BytesSerializable for SendMessages {
440+
fn size(&self) -> usize {
441+
let messages_size = self
442+
.messages
443+
.iter()
444+
.map(Message::get_size_bytes)
445+
.sum::<u32>();
446+
messages_size as usize
447+
+ self.partitioning.size()
448+
+ self.stream_id.size()
449+
+ self.topic_id.size()
450+
}
451+
401452
fn as_bytes(&self) -> Bytes {
402453
let messages_size = self
403454
.messages

sdk/src/tcp/client.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::binary::binary_client::{BinaryClient, ClientState};
22
use crate::client::Client;
33
use crate::error::{IggyError, IggyErrorDiscriminants};
44
use crate::tcp::config::TcpClientConfig;
5+
use crate::write_to::WriteTo;
56
use async_trait::async_trait;
67
use bytes::{BufMut, Bytes, BytesMut};
78
use std::fmt::Debug;
@@ -36,7 +37,7 @@ unsafe impl Send for TcpClient {}
3637
unsafe impl Sync for TcpClient {}
3738

3839
#[async_trait]
39-
pub(crate) trait ConnectionStream: Debug + Sync + Send {
40+
pub(crate) trait ConnectionStream: Debug + Sync + Send + Unpin {
4041
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, IggyError>;
4142
async fn write(&mut self, buf: &[u8]) -> Result<(), IggyError>;
4243
async fn flush(&mut self) -> Result<(), IggyError>;
@@ -214,6 +215,41 @@ impl BinaryClient for TcpClient {
214215
*self.state.lock().await = state;
215216
}
216217

218+
async fn send_with_response_v2(
219+
&self,
220+
command_code: u32,
221+
command: &(dyn WriteTo + Send + Sync),
222+
) -> Result<Bytes, IggyError> {
223+
if self.get_state().await == ClientState::Disconnected {
224+
return Err(IggyError::NotConnected);
225+
}
226+
227+
let mut stream = self.stream.lock().await;
228+
if let Some(stream) = stream.as_mut() {
229+
let payload_length = command.size() + REQUEST_INITIAL_BYTES_LENGTH;
230+
trace!("Sending a TCP request...");
231+
stream.write(&(payload_length as u32).to_le_bytes()).await?;
232+
stream.write(&command_code.to_le_bytes()).await?;
233+
command.write_to(stream.as_mut()).await?;
234+
stream.flush().await?;
235+
trace!("Sent a TCP request, waiting for a response...");
236+
237+
let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH];
238+
let read_bytes = stream.read(&mut response_buffer).await?;
239+
if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH {
240+
error!("Received an invalid or empty response.");
241+
return Err(IggyError::EmptyResponse);
242+
}
243+
244+
let status = u32::from_le_bytes(response_buffer[..4].try_into().unwrap());
245+
let length = u32::from_le_bytes(response_buffer[4..].try_into().unwrap());
246+
return self.handle_response(status, length, stream.as_mut()).await;
247+
}
248+
249+
error!("Cannot send data. Client is not connected.");
250+
Err(IggyError::NotConnected)
251+
}
252+
217253
async fn send_with_response(&self, command: u32, payload: Bytes) -> Result<Bytes, IggyError> {
218254
if self.get_state().await == ClientState::Disconnected {
219255
return Err(IggyError::NotConnected);

sdk/src/write_to.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
use async_trait::async_trait;
2+
3+
use crate::{
4+
bytes_serializable::BytesSerializable, error::IggyError, tcp::client::ConnectionStream,
5+
};
6+
7+
/// The trait for serializing a struct into a stream.
8+
#[async_trait]
9+
pub(crate) trait WriteTo: BytesSerializable {
10+
/// Serialize the struct into the stream.
11+
async fn write_to(&self, stream: &mut dyn ConnectionStream) -> Result<(), IggyError>;
12+
}

0 commit comments

Comments
 (0)