Skip to content

Commit 660aa4c

Browse files
authored
fix!: Rework embedded io implementations. (#86)
1 parent b91fe0c commit 660aa4c

File tree

6 files changed

+158
-77
lines changed

6 files changed

+158
-77
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ embedded-io-async = "0.7.0"
3232
[[example]]
3333
name = "async"
3434
path = "examples/local_async.rs"
35-
required-features = ["async"]
35+
required-features = ["embedded_io_async"]
3636
test = false
3737

3838

examples/local_async.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::process::exit;
44
use std::time::Duration;
55

66
use tokio_serial::SerialPortBuilderExt;
7-
use uf_crsf::CrsfParser;
7+
use uf_crsf::async_io::AsyncCrsfReader;
88

99
#[tokio::main]
1010
async fn main() {
@@ -33,12 +33,12 @@ async fn main() {
3333
exit(1);
3434
});
3535

36-
let mut adapted_port = FromTokio::new(&mut port);
37-
let mut parser = CrsfParser::new();
36+
let adapted_port = FromTokio::new(&mut port);
37+
let mut reader = AsyncCrsfReader::new(adapted_port);
3838
println!("Reading from serial port '{}'...", path);
3939

4040
loop {
41-
match parser.read_packet(&mut adapted_port).await {
41+
match reader.read_packet().await {
4242
Ok(packet) => {
4343
println!("{:?}", packet);
4444
}

src/async_io.rs

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,52 @@
11
use crate::error::CrsfStreamError;
22
use crate::packets::{write_packet_to_buffer, CrsfPacket, Packet, PacketAddress};
33
use crate::parser::CrsfParser;
4-
use embedded_io_async::{Error, Read, Write};
5-
6-
impl CrsfParser {
7-
/// Asynchronously reads a complete CRSF packet from an `embedded_io_async::Read` stream.
8-
///
9-
/// This function reads bytes in chunks from the provided `reader` and pushes them
10-
/// into the parser one byte at time.
11-
pub async fn read_packet<R: Read>(
12-
&mut self,
13-
reader: &mut R,
14-
) -> Result<Packet, CrsfStreamError> {
15-
let mut buf = [0; 64]; // 64 is max packet size for CRSF
4+
use embedded_io_async::{Error, Write};
5+
use heapless::Deque;
6+
7+
const ASYNC_IO_BUFFER_SIZE: usize = crate::constants::CRSF_MAX_PACKET_SIZE * 2;
8+
9+
pub struct AsyncCrsfReader<R> {
10+
parser: CrsfParser,
11+
reader: R,
12+
input_buffer: Deque<u8, ASYNC_IO_BUFFER_SIZE>,
13+
}
14+
15+
/// Asynchronously writes a CRSF packet to an `embedded_io_async::Write` stream.
16+
impl<R: embedded_io_async::Read> AsyncCrsfReader<R> {
17+
pub fn new(reader: R) -> Self {
18+
Self {
19+
parser: CrsfParser::new(),
20+
reader,
21+
input_buffer: Deque::new(),
22+
}
23+
}
24+
25+
pub async fn read_packet(&mut self) -> Result<Packet, CrsfStreamError> {
26+
let mut temp_read_buf = [0; crate::constants::CRSF_MAX_PACKET_SIZE];
27+
1628
loop {
17-
let n = reader
18-
.read(&mut buf)
29+
while let Some(byte) = self.input_buffer.pop_front() {
30+
match self.parser.push_byte(byte) {
31+
Ok(Some(packet)) => return Ok(packet),
32+
Ok(None) => (),
33+
Err(e) => return Err(e),
34+
}
35+
}
36+
let bytes_read = self
37+
.reader
38+
.read(&mut temp_read_buf)
1939
.await
2040
.map_err(|e| CrsfStreamError::Io(e.kind()))?;
21-
if n == 0 {
22-
// This indicates a stream has closed.
41+
42+
if bytes_read == 0 {
2343
return Err(CrsfStreamError::UnexpectedEof);
2444
}
2545

26-
for b in &buf[0..n] {
27-
match self.push_byte(*b) {
28-
Ok(Some(packet)) => return Ok(packet),
29-
Ok(None) => continue,
30-
Err(e) => return Err(e),
31-
}
46+
for byte in &temp_read_buf[..bytes_read] {
47+
self.input_buffer
48+
.push_back(*byte)
49+
.map_err(|_| CrsfStreamError::InputBufferTooSmall)?;
3250
}
3351
}
3452
}

src/blocking_io.rs

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,49 @@ use crate::error::CrsfStreamError;
22
use crate::packets::{write_packet_to_buffer, CrsfPacket, Packet, PacketAddress};
33
use crate::parser::CrsfParser;
44
use embedded_io::{Error, Read, Write};
5+
use heapless::Deque;
6+
7+
const BLOCKING_IO_BUFFER_SIZE: usize = crate::constants::CRSF_MAX_PACKET_SIZE * 2;
8+
9+
pub struct BlockingCrsfReader<R> {
10+
parser: CrsfParser,
11+
reader: R,
12+
input_buffer: Deque<u8, BLOCKING_IO_BUFFER_SIZE>,
13+
}
14+
15+
impl<R: Read> BlockingCrsfReader<R> {
16+
pub fn new(reader: R) -> Self {
17+
Self {
18+
parser: CrsfParser::new(),
19+
reader,
20+
input_buffer: Deque::new(),
21+
}
22+
}
23+
24+
pub fn read_packet(&mut self) -> Result<Packet, CrsfStreamError> {
25+
let mut temp_read_buf = [0; crate::constants::CRSF_MAX_PACKET_SIZE];
526

6-
impl CrsfParser {
7-
/// Synchronously reads a complete CRSF packet from an `embedded_io::Read` stream.
8-
///
9-
/// This function reads bytes in chunks from the provided `reader` and pushes them
10-
/// into the parser one byte at time.
11-
pub fn read_packet_blocking<R: Read>(
12-
&mut self,
13-
reader: &mut R,
14-
) -> Result<Packet, CrsfStreamError> {
15-
let mut buf = [0; 64]; // 64 is max packet size for CRSF
1627
loop {
17-
let n = reader
18-
.read(&mut buf)
28+
while let Some(byte) = self.input_buffer.pop_front() {
29+
match self.parser.push_byte(byte) {
30+
Ok(Some(packet)) => return Ok(packet),
31+
Ok(None) => (),
32+
Err(e) => return Err(e),
33+
}
34+
}
35+
let bytes_read = self
36+
.reader
37+
.read(&mut temp_read_buf)
1938
.map_err(|e| CrsfStreamError::Io(e.kind()))?;
20-
if n == 0 {
21-
// This indicates a stream has closed.
39+
40+
if bytes_read == 0 {
2241
return Err(CrsfStreamError::UnexpectedEof);
2342
}
2443

25-
for b in &buf[0..n] {
26-
match self.push_byte(*b) {
27-
Ok(Some(packet)) => return Ok(packet),
28-
Ok(None) => continue,
29-
Err(e) => return Err(e),
30-
}
44+
for byte in &temp_read_buf[..bytes_read] {
45+
self.input_buffer
46+
.push_back(*byte)
47+
.map_err(|_| CrsfStreamError::InputBufferTooSmall)?;
3148
}
3249
}
3350
}
@@ -37,7 +54,7 @@ impl CrsfParser {
3754
///
3855
/// This function serializes the given `packet` into a temporary buffer and then
3956
/// writes the entire buffer to the `writer` synchronously.
40-
pub fn write_packet_blocking<W: Write, P: CrsfPacket>(
57+
pub fn write_packet<W: Write, P: CrsfPacket>(
4158
writer: &mut W,
4259
dest: PacketAddress,
4360
packet: &P,

tests/async_io_test.rs

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
#![cfg(test)]
33
extern crate std;
44

5-
use uf_crsf::async_io::write_packet;
5+
use uf_crsf::async_io::{write_packet, AsyncCrsfReader};
66
use uf_crsf::packets::{LinkStatistics, Packet, PacketAddress};
7-
use uf_crsf::parser::CrsfParser;
87
use uf_crsf::CrsfStreamError;
98

10-
async fn build_link_statistics_packet_bytes() -> std::vec::Vec<u8> {
9+
async fn build_link_statistics_packet_bytes(uplink_rssi_1: u8) -> std::vec::Vec<u8> {
1110
let packet = LinkStatistics {
12-
uplink_rssi_1: 10,
11+
uplink_rssi_1: uplink_rssi_1,
1312
uplink_rssi_2: 20,
1413
uplink_link_quality: 95,
1514
uplink_snr: -80,
@@ -29,14 +28,14 @@ async fn build_link_statistics_packet_bytes() -> std::vec::Vec<u8> {
2928

3029
#[tokio::test]
3130
async fn test_write_and_read_packet_async() {
32-
let packet_bytes = build_link_statistics_packet_bytes().await;
31+
let packet_bytes = build_link_statistics_packet_bytes(10).await;
3332

3433
// Mock reader
35-
let mut reader = &packet_bytes[..];
34+
let reader = &packet_bytes[..];
3635

3736
// Parser
38-
let mut parser = CrsfParser::new();
39-
let result = parser.read_packet(&mut reader).await;
37+
let mut reader = AsyncCrsfReader::new(reader);
38+
let result = reader.read_packet().await;
4039

4140
assert!(result.is_ok());
4241
let parsed_packet = result.unwrap();
@@ -58,27 +57,75 @@ async fn test_write_and_read_packet_async() {
5857

5958
#[tokio::test]
6059
async fn test_read_packet_async_with_no_data() {
61-
let mut reader = &[][..];
62-
let mut parser = CrsfParser::new();
63-
let result = parser.read_packet(&mut reader).await;
60+
let mut reader = AsyncCrsfReader::new(&[][..]);
61+
let result = reader.read_packet().await;
6462
assert!(matches!(result, Err(CrsfStreamError::UnexpectedEof)));
6563
}
6664

6765
#[tokio::test]
6866
async fn test_read_packet_async_with_incomplete_data() {
69-
let packet_bytes = build_link_statistics_packet_bytes().await;
70-
let mut reader = &packet_bytes[..packet_bytes.len() - 1];
71-
let mut parser = CrsfParser::new();
72-
let result = parser.read_packet(&mut reader).await;
67+
let packet_bytes = build_link_statistics_packet_bytes(10).await;
68+
let mut reader = AsyncCrsfReader::new(&packet_bytes[..packet_bytes.len() - 1]);
69+
let result = reader.read_packet().await;
7370
assert!(matches!(result, Err(CrsfStreamError::UnexpectedEof)));
7471
}
7572

7673
#[tokio::test]
7774
async fn test_read_packet_async_with_garbage() {
7875
let garbage = [0x01, 0x02, 0x03];
79-
let mut reader = &garbage[..];
80-
let mut parser = CrsfParser::new();
81-
let result = parser.read_packet(&mut reader).await;
76+
let mut reader = AsyncCrsfReader::new(&garbage[..]);
77+
let result = reader.read_packet().await;
8278
// We expect an InvalidSync error because the first byte is not a valid sync byte.
8379
assert!(matches!(result, Err(CrsfStreamError::InvalidSync(_))));
8480
}
81+
82+
#[tokio::test]
83+
async fn test_read_packet_async_chunked_stream() {
84+
let packet1_bytes = build_link_statistics_packet_bytes(10).await;
85+
let packet2_bytes = build_link_statistics_packet_bytes(50).await;
86+
87+
let mut combined_bytes = std::vec::Vec::new();
88+
combined_bytes.extend_from_slice(&packet1_bytes);
89+
combined_bytes.extend_from_slice(&packet2_bytes);
90+
91+
let mut stream_reader = AsyncCrsfReader::new(&combined_bytes[..]);
92+
93+
let result1 = stream_reader.read_packet().await;
94+
let result2 = stream_reader.read_packet().await;
95+
96+
assert!(result1.is_ok());
97+
let parsed_packet1 = result1.unwrap();
98+
let expected_packet1 = LinkStatistics {
99+
uplink_rssi_1: 10,
100+
uplink_rssi_2: 20,
101+
uplink_link_quality: 95,
102+
uplink_snr: -80,
103+
active_antenna: 1,
104+
rf_mode: 2,
105+
uplink_tx_power: 3,
106+
downlink_rssi: 30,
107+
downlink_link_quality: 98,
108+
downlink_snr: -75,
109+
};
110+
assert!(matches!(&parsed_packet1, Packet::LinkStatistics(p) if p == &expected_packet1));
111+
112+
assert!(
113+
result2.is_ok(),
114+
"Second packet parsing failed: {:?}",
115+
result2.err()
116+
);
117+
let parsed_packet2 = result2.unwrap();
118+
let expected_packet2 = LinkStatistics {
119+
uplink_rssi_1: 50,
120+
uplink_rssi_2: 20,
121+
uplink_link_quality: 95,
122+
uplink_snr: -80,
123+
active_antenna: 1,
124+
rf_mode: 2,
125+
uplink_tx_power: 3,
126+
downlink_rssi: 30,
127+
downlink_link_quality: 98,
128+
downlink_snr: -75,
129+
};
130+
assert!(matches!(parsed_packet2, Packet::LinkStatistics(p) if p == expected_packet2));
131+
}

tests/blocking_io_test.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
#![cfg(test)]
33
extern crate std;
44

5-
use uf_crsf::blocking_io::write_packet_blocking;
5+
use uf_crsf::blocking_io::{write_packet, BlockingCrsfReader};
66
use uf_crsf::packets::{LinkStatistics, Packet, PacketAddress};
7-
use uf_crsf::parser::CrsfParser;
87
use uf_crsf::CrsfStreamError;
98

109
fn build_link_statistics_packet_bytes() -> std::vec::Vec<u8> {
@@ -21,7 +20,7 @@ fn build_link_statistics_packet_bytes() -> std::vec::Vec<u8> {
2120
downlink_snr: -75,
2221
};
2322
let mut buffer = std::vec::Vec::new();
24-
write_packet_blocking(&mut buffer, PacketAddress::FlightController, &packet).unwrap();
23+
write_packet(&mut buffer, PacketAddress::FlightController, &packet).unwrap();
2524
buffer
2625
}
2726

@@ -32,8 +31,8 @@ fn test_write_and_read_packet_blocking() {
3231
// Mock reader
3332
let mut reader = &packet_bytes[..];
3433

35-
let mut parser = CrsfParser::new();
36-
let result = parser.read_packet_blocking(&mut reader);
34+
let mut crsf_reader = BlockingCrsfReader::new(&mut reader);
35+
let result = crsf_reader.read_packet();
3736

3837
let parsed_packet = result.unwrap();
3938

@@ -55,26 +54,26 @@ fn test_write_and_read_packet_blocking() {
5554
#[test]
5655
fn test_read_packet_blocking_with_no_data() {
5756
let mut reader = &[][..];
58-
let mut parser = CrsfParser::new();
59-
let result = parser.read_packet_blocking(&mut reader);
57+
let mut crsf_reader = BlockingCrsfReader::new(&mut reader);
58+
let result = crsf_reader.read_packet();
6059
assert!(matches!(result, Err(CrsfStreamError::UnexpectedEof)));
6160
}
6261

6362
#[test]
6463
fn test_read_packet_blocking_with_incomplete_data() {
6564
let packet_bytes = build_link_statistics_packet_bytes();
6665
let mut reader = &packet_bytes[..packet_bytes.len() - 1];
67-
let mut parser = CrsfParser::new();
68-
let result = parser.read_packet_blocking(&mut reader);
66+
let mut crsf_reader = BlockingCrsfReader::new(&mut reader);
67+
let result = crsf_reader.read_packet();
6968
assert!(matches!(result, Err(CrsfStreamError::UnexpectedEof)));
7069
}
7170

7271
#[test]
7372
fn test_read_packet_blocking_with_garbage() {
7473
let garbage = [0x01, 0x02, 0x03];
7574
let mut reader = &garbage[..];
76-
let mut parser = CrsfParser::new();
77-
let result = parser.read_packet_blocking(&mut reader);
75+
let mut crsf_reader = BlockingCrsfReader::new(&mut reader);
76+
let result = crsf_reader.read_packet();
7877
// We expect an InvalidSync error because the first byte is not a valid sync byte.
7978
assert!(matches!(result, Err(CrsfStreamError::InvalidSync(_))));
8079
}

0 commit comments

Comments
 (0)