Skip to content

Commit

Permalink
Use iggy byte size where appropriate (#1334)
Browse files Browse the repository at this point in the history
- introduce `IggyByteSize` where number of bytes are represented
- move `Sizeable` to the SDK and its implementations for a few types
- unfortunately I had to keep a sizeable in server because of the generic instance for `Deref<Target = RetainedMessage>` for `SmartCache`
  • Loading branch information
BenFradet authored Nov 14, 2024
1 parent 6aa4335 commit eebc6a4
Show file tree
Hide file tree
Showing 72 changed files with 509 additions and 322 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions bench/src/benchmark_result.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::args::simple::BenchmarkKind;
use colored::Colorize;
use iggy::utils::byte_size::IggyByteSize;
use std::collections::HashSet;
use std::{
fmt::{Display, Formatter},
Expand All @@ -14,7 +15,7 @@ pub struct BenchmarkResult {
pub end_timestamp: Instant,
pub average_latency: Duration,
pub latency_percentiles: LatencyPercentiles,
pub total_size_bytes: u64,
pub total_size_bytes: IggyByteSize,
pub total_messages: u64,
}

Expand Down Expand Up @@ -80,7 +81,7 @@ impl BenchmarkResults {
.iter()
.filter(&mut predicate)
.map(|r| r.total_size_bytes)
.sum::<u64>();
.sum::<IggyByteSize>();
let total_duration = (self
.results
.iter()
Expand Down Expand Up @@ -149,9 +150,11 @@ impl BenchmarkResults {
/ self.results.len() as u32)
.as_secs_f64()
* 1000.0;
let average_throughput =
total_size_bytes as f64 / total_duration / 1e6 / self.results.len() as f64;
let total_throughput = total_size_bytes as f64 / total_duration / 1e6;
let average_throughput = total_size_bytes.as_bytes_u64() as f64
/ total_duration
/ 1e6
/ self.results.len() as f64;
let total_throughput = total_size_bytes.as_bytes_u64() as f64 / total_duration / 1e6;
let messages_per_second = total_messages as f64 / total_duration;

BenchmarkStatistics {
Expand Down
39 changes: 21 additions & 18 deletions bench/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use iggy::clients::client::IggyClient;
use iggy::consumer::Consumer as IggyConsumer;
use iggy::error::IggyError;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;
use iggy::utils::sizeable::Sizeable;
use integration::test_server::{login_root, ClientFactory};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -73,7 +75,7 @@ impl Consumer {
};

let mut latencies: Vec<Duration> = Vec::with_capacity(self.message_batches as usize);
let mut total_size_bytes = 0;
let mut total_size_bytes = IggyByteSize::default();
let mut current_iteration: u64 = 0;
let mut received_messages = 0;
let mut topic_not_found_counter = 0;
Expand Down Expand Up @@ -187,7 +189,7 @@ impl Consumer {
latencies.push(latency_end);
received_messages += polled_messages.messages.len() as u64;
for message in polled_messages.messages {
total_size_bytes += message.get_size_bytes() as u64;
total_size_bytes += message.get_size_bytes();
}
current_iteration += 1;
}
Expand All @@ -210,24 +212,25 @@ impl Consumer {

let duration = end_timestamp - start_timestamp;
let average_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6;
let average_throughput =
total_size_bytes.as_bytes_u64() as f64 / duration.as_secs_f64() / 1e6;

info!(
"Consumer #{} → polled {} messages {} batches of {} messages in {:.2} s, total size: {} bytes, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms",
self.consumer_id,
total_messages,
self.message_batches,
self.messages_per_batch,
duration.as_secs_f64(),
total_size_bytes,
average_throughput,
p50.as_secs_f64() * 1000.0,
p90.as_secs_f64() * 1000.0,
p95.as_secs_f64() * 1000.0,
p99.as_secs_f64() * 1000.0,
p999.as_secs_f64() * 1000.0,
average_latency.as_secs_f64() * 1000.0
);
"Consumer #{} → polled {} messages {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms",
self.consumer_id,
total_messages,
self.message_batches,
self.messages_per_batch,
duration.as_secs_f64(),
total_size_bytes.as_human_string(),
average_throughput,
p50.as_secs_f64() * 1000.0,
p90.as_secs_f64() * 1000.0,
p95.as_secs_f64() * 1000.0,
p99.as_secs_f64() * 1000.0,
p999.as_secs_f64() * 1000.0,
average_latency.as_secs_f64() * 1000.0
);

Ok(BenchmarkResult {
kind: BenchmarkKind::Poll,
Expand Down
36 changes: 19 additions & 17 deletions bench/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use iggy::client::MessageClient;
use iggy::clients::client::IggyClient;
use iggy::error::IggyError;
use iggy::messages::send_messages::{Message, Partitioning};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::duration::IggyDuration;
use integration::test_server::{login_root, ClientFactory};
use std::str::FromStr;
Expand Down Expand Up @@ -117,25 +118,26 @@ impl Producer {

let duration = end_timestamp - start_timestamp;
let average_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
let total_size_bytes = total_messages * self.message_size as u64;
let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6;
let total_size_bytes = IggyByteSize::from(total_messages * self.message_size as u64);
let average_throughput =
total_size_bytes.as_bytes_u64() as f64 / duration.as_secs_f64() / 1e6;

info!(
"Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {} bytes, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms",
self.producer_id,
total_messages,
self.message_batches,
self.messages_per_batch,
duration.as_secs_f64(),
total_size_bytes,
average_throughput,
p50.as_secs_f64() * 1000.0,
p90.as_secs_f64() * 1000.0,
p95.as_secs_f64() * 1000.0,
p99.as_secs_f64() * 1000.0,
p999.as_secs_f64() * 1000.0,
average_latency.as_secs_f64() * 1000.0
);
"Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms",
self.producer_id,
total_messages,
self.message_batches,
self.messages_per_batch,
duration.as_secs_f64(),
total_size_bytes.as_human_string(),
average_throughput,
p50.as_secs_f64() * 1000.0,
p90.as_secs_f64() * 1000.0,
p95.as_secs_f64() * 1000.0,
p99.as_secs_f64() * 1000.0,
p999.as_secs_f64() * 1000.0,
average_latency.as_secs_f64() * 1000.0
);

Ok(BenchmarkResult {
kind: BenchmarkKind::Send,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl IggyCmdTestCase for TestMessagePollToFileCmd<'_> {
self.topic_name, self.stream_name, self.topic_name, self.stream_name);
let message_file = format!("Storing messages to {} binary file", self.output_file);
let message_count = format!(
"Stored {} of total size [0-9]+ B to {} binary file",
"Stored {} of total size [0-9.]+ K?B to {} binary file",
match self.message_count {
1 => "1 message".into(),
_ => format!("{} messages", self.message_count),
Expand Down
16 changes: 12 additions & 4 deletions integration/tests/streaming/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use bytes::Bytes;
use iggy::bytes_serializable::BytesSerializable;
use iggy::messages::send_messages::Message;
use iggy::models::header::{HeaderKey, HeaderValue};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::sizeable::Sizeable;
use iggy::utils::timestamp::IggyTimestamp;
use server::configs::system::{PartitionConfig, SystemConfig};
use server::state::system::PartitionState;
Expand Down Expand Up @@ -103,14 +105,17 @@ async fn should_persist_messages_and_then_load_them_by_timestamp() {
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
let appendable_batch_info = AppendableBatchInfo::new(
messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(),
messages
.iter()
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>(),
partition.partition_id,
);
let appendable_batch_info_two = AppendableBatchInfo::new(
messages_two
.iter()
.map(|msg| msg.get_size_bytes() as u64)
.sum(),
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>(),
partition.partition_id,
);
partition
Expand Down Expand Up @@ -218,7 +223,10 @@ async fn should_persist_messages_and_then_load_them_from_disk() {
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
let appendable_batch_info = AppendableBatchInfo::new(
messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(),
messages
.iter()
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>(),
partition.partition_id,
);
partition
Expand Down
7 changes: 6 additions & 1 deletion integration/tests/streaming/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::streaming::common::test_setup::TestSetup;
use crate::streaming::create_messages;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::sizeable::Sizeable;
use iggy::utils::timestamp::IggyTimestamp;
use server::state::system::PartitionState;
use server::streaming::batching::appendable_batch_info::AppendableBatchInfo;
Expand Down Expand Up @@ -176,7 +178,10 @@ async fn should_purge_existing_partition_on_disk() {
let messages = create_messages();
let messages_count = messages.len();
let appendable_batch_info = AppendableBatchInfo::new(
messages.iter().map(|msg| msg.get_size_bytes() as u64).sum(),
messages
.iter()
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>(),
partition.partition_id,
);
partition
Expand Down
15 changes: 8 additions & 7 deletions integration/tests/streaming/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
use iggy::bytes_serializable::BytesSerializable;
use iggy::models::messages::{MessageState, PolledMessage};
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::{checksum, timestamp::IggyTimestamp};
use server::streaming::local_sizeable::LocalSizeable;
use server::streaming::models::messages::RetainedMessage;
use server::streaming::segments::segment;
use server::streaming::segments::segment::{INDEX_EXTENSION, LOG_EXTENSION};
use server::streaming::sizeable::Sizeable;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::fs;
Expand Down Expand Up @@ -151,7 +152,7 @@ async fn should_persist_and_load_segment_with_messages() {
.await;
let messages_count = 10;
let mut messages = Vec::new();
let mut batch_size = 0u64;
let mut batch_size = IggyByteSize::default();
for i in 0..messages_count {
let message = create_message(i, "test", IggyTimestamp::now());

Expand All @@ -164,7 +165,7 @@ async fn should_persist_and_load_segment_with_messages() {
headers: message.headers.map(|headers| headers.to_bytes()),
payload: message.payload.clone(),
});
batch_size += retained_message.get_size_bytes() as u64;
batch_size += retained_message.get_size_bytes();
messages.push(retained_message);
}

Expand Down Expand Up @@ -235,7 +236,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
let messages_count = 10;
let now = IggyTimestamp::now();
let mut expired_timestamp = (now.as_micros() - 2 * message_expiry_ms).into();
let mut batch_size = 0u64;
let mut batch_size = IggyByteSize::default();
let mut messages = Vec::new();
for i in 0..messages_count {
let message = create_message(i, "test", expired_timestamp);
Expand All @@ -250,7 +251,7 @@ async fn given_all_expired_messages_segment_should_be_expired() {
headers: message.headers.map(|headers| headers.to_bytes()),
payload: message.payload.clone(),
});
batch_size += retained_message.get_size_bytes() as u64;
batch_size += retained_message.get_size_bytes();
messages.push(retained_message);
}
segment
Expand Down Expand Up @@ -316,7 +317,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
payload: expired_message.payload.clone(),
});
let mut expired_messages = Vec::new();
let expired_message_size = expired_retained_message.get_size_bytes() as u64;
let expired_message_size = expired_retained_message.get_size_bytes();
expired_messages.push(expired_retained_message);

let mut not_expired_messages = Vec::new();
Expand All @@ -331,7 +332,7 @@ async fn given_at_least_one_not_expired_message_segment_should_not_be_expired()
.map(|headers| headers.to_bytes()),
payload: not_expired_message.payload.clone(),
});
let not_expired_message_size = not_expired_retained_message.get_size_bytes() as u64;
let not_expired_message_size = not_expired_retained_message.get_size_bytes();
not_expired_messages.push(not_expired_retained_message);

segment
Expand Down
7 changes: 6 additions & 1 deletion integration/tests/streaming/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::streaming::create_messages;
use iggy::identifier::Identifier;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::messages::send_messages::Partitioning;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::sizeable::Sizeable;
use iggy::utils::timestamp::IggyTimestamp;
use iggy::utils::topic_size::MaxTopicSize;
use server::state::system::StreamState;
Expand Down Expand Up @@ -127,7 +129,10 @@ async fn should_purge_existing_stream_on_disk() {
let topic = stream
.get_topic(&Identifier::numeric(topic_id).unwrap())
.unwrap();
let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum();
let batch_size = messages
.iter()
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>();
topic
.append_messages(batch_size, Partitioning::partition_id(1), messages)
.await
Expand Down
7 changes: 6 additions & 1 deletion integration/tests/streaming/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use crate::streaming::create_messages;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::messages::poll_messages::PollingStrategy;
use iggy::messages::send_messages::Partitioning;
use iggy::utils::byte_size::IggyByteSize;
use iggy::utils::expiry::IggyExpiry;
use iggy::utils::sizeable::Sizeable;
use iggy::utils::timestamp::IggyTimestamp;
use iggy::utils::topic_size::MaxTopicSize;
use server::state::system::{PartitionState, TopicState};
Expand Down Expand Up @@ -203,7 +205,10 @@ async fn should_purge_existing_topic_on_disk() {

let messages = create_messages();
let messages_count = messages.len();
let batch_size = messages.iter().map(|msg| msg.get_size_bytes() as u64).sum();
let batch_size = messages
.iter()
.map(|msg| msg.get_size_bytes())
.sum::<IggyByteSize>();
topic
.append_messages(batch_size, Partitioning::partition_id(1), messages)
.await
Expand Down
Loading

0 comments on commit eebc6a4

Please sign in to comment.