Skip to content

Commit

Permalink
reduce total count of write states
Browse files Browse the repository at this point in the history
  • Loading branch information
Xaeroxe committed Feb 7, 2023
1 parent 14a1e7c commit 6b522bd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 103 deletions.
4 changes: 1 addition & 3 deletions src/duplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ impl<RW: AsyncRead + AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin
}
}
match self.write_state {
AsyncWriteState::WritingLen { .. } | AsyncWriteState::WritingValue { .. } => {
self.write_buffer.shrink_to_fit()
}
AsyncWriteState::WritingValue { .. } => self.write_buffer.shrink_to_fit(),
_ => {
self.write_buffer = Vec::new();
}
Expand Down
131 changes: 31 additions & 100 deletions src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,10 @@ pub struct AsyncWriteTyped<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwne

#[derive(Debug)]
pub(crate) enum AsyncWriteState {
WritingVersion {
version: [u8; 8],
len_sent: usize,
},
WritingVersion { version: [u8; 8], len_sent: usize },
WritingChecksumEnabled,
Idle,
WritingLen {
current_len: [u8; 9],
len_to_be_sent: usize,
len_sent: usize,
},
WritingValue {
bytes_sent: usize,
},
WritingChecksum {
checksum: [u8; 8],
len_sent: usize,
},
WritingValue { bytes_sent: usize },
Closing,
Closed,
}
Expand Down Expand Up @@ -163,55 +149,38 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
AsyncWriteState::Idle => {
if let Some(item) = primed_values.pop_back() {
write_buffer.clear();
crate::bincode_options(size_limit)
.serialize_into(&mut *write_buffer, &item)
let length = crate::bincode_options(size_limit)
.serialized_size(&item)
.map_err(Error::Bincode)?;
if write_buffer.len() as u64 > size_limit {
if length > size_limit {
return Poll::Ready(Err(Error::SentMessageTooLarge));
}
let (new_current_len, to_be_sent) = if write_buffer.is_empty() {
([ZST_MARKER, 0, 0, 0, 0, 0, 0, 0, 0], 1)
} else if write_buffer.len() < U16_MARKER as usize {
let bytes = (write_buffer.len() as u8).to_le_bytes();
([bytes[0], 0, 0, 0, 0, 0, 0, 0, 0], 1)
} else if (write_buffer.len() as u64) < 2_u64.pow(16) {
let bytes = (write_buffer.len() as u16).to_le_bytes();
([U16_MARKER, bytes[0], bytes[1], 0, 0, 0, 0, 0, 0], 3)
} else if (write_buffer.len() as u64) < 2_u64.pow(32) {
let bytes = (write_buffer.len() as u32).to_le_bytes();
(
[
U32_MARKER, bytes[0], bytes[1], bytes[2], bytes[3], 0, 0, 0, 0,
],
5,
)
if length == 0 {
write_buffer.push(ZST_MARKER);
} else if length < U16_MARKER as u64 {
write_buffer.extend((length as u8).to_le_bytes());
} else if length < 2_u64.pow(16) {
write_buffer.push(U16_MARKER);
write_buffer.extend((length as u16).to_le_bytes());
} else if length < 2_u64.pow(32) {
write_buffer.push(U32_MARKER);
write_buffer.extend((length as u32).to_le_bytes());
} else {
let bytes = (write_buffer.len() as u64).to_le_bytes();
(
[
U64_MARKER, bytes[0], bytes[1], bytes[2], bytes[3], bytes[4],
bytes[5], bytes[6], bytes[7],
],
9,
)
};
*state = AsyncWriteState::WritingLen {
current_len: new_current_len,
len_to_be_sent: to_be_sent,
len_sent: 0,
};
let len = futures_core::ready!(
Pin::new(&mut *raw).poll_write(cx, &new_current_len[0..to_be_sent])
)?;
*state = if len == to_be_sent {
AsyncWriteState::WritingValue { bytes_sent: 0 }
} else {
AsyncWriteState::WritingLen {
current_len: new_current_len,
len_to_be_sent: to_be_sent,
len_sent: len,
}
};
write_buffer.push(U64_MARKER);
write_buffer.extend(length.to_le_bytes());
}
// Save the length... of the length value.
let length_length = write_buffer.len();
crate::bincode_options(size_limit)
.serialize_into(&mut *write_buffer, &item)
.map_err(Error::Bincode)?;
if checksum_enabled {
let mut hasher = SipHasher::new();
hasher.write(&write_buffer[length_length..]);
let checksum = hasher.finish();
write_buffer.extend(checksum.to_le_bytes());
}
*state = AsyncWriteState::WritingValue { bytes_sent: 0 };
continue;
} else if closing {
*state = AsyncWriteState::Closing;
Expand All @@ -220,49 +189,13 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
Poll::Ready(Ok(Some(())))
}
}
AsyncWriteState::WritingLen {
ref current_len,
ref len_to_be_sent,
ref mut len_sent,
} => {
while *len_sent < *len_to_be_sent {
let len = futures_core::ready!(Pin::new(&mut *raw)
.poll_write(cx, &current_len[(*len_sent)..(*len_to_be_sent)]))?;
*len_sent += len;
}
*state = AsyncWriteState::WritingValue { bytes_sent: 0 };
continue;
}
AsyncWriteState::WritingValue { bytes_sent } => {
while *bytes_sent < write_buffer.len() {
let len = futures_core::ready!(
Pin::new(&mut *raw).poll_write(cx, &write_buffer[*bytes_sent..])
)?;
*bytes_sent += len;
}
if checksum_enabled {
let mut hasher = SipHasher::new();
hasher.write(write_buffer);
let checksum = hasher.finish();
*state = AsyncWriteState::WritingChecksum {
checksum: checksum.to_le_bytes(),
len_sent: 0,
};
} else {
*state = AsyncWriteState::Idle;
if primed_values.is_empty() {
return Poll::Ready(Ok(Some(())));
}
}
continue;
}
AsyncWriteState::WritingChecksum { checksum, len_sent } => {
while *len_sent < size_of::<u64>() {
let len = futures_core::ready!(
Pin::new(&mut *raw).poll_write(cx, &checksum[*len_sent..])
)?;
*len_sent += len;
}
*state = AsyncWriteState::Idle;
if primed_values.is_empty() {
return Poll::Ready(Ok(Some(())));
Expand Down Expand Up @@ -330,9 +263,7 @@ impl<W: AsyncWrite + Unpin, T: Serialize + DeserializeOwned + Unpin> AsyncWriteT
/// needs to grow.
pub fn optimize_memory_usage(&mut self) {
match self.state {
AsyncWriteState::WritingLen { .. } | AsyncWriteState::WritingValue { .. } => {
self.write_buffer.shrink_to_fit()
}
AsyncWriteState::WritingValue { .. } => self.write_buffer.shrink_to_fit(),
_ => {
self.write_buffer = Vec::new();
}
Expand Down

0 comments on commit 6b522bd

Please sign in to comment.