Skip to content

Commit 6a3b790

Browse files
committed
Message length predicting parser
1 parent acd0a71 commit 6a3b790

File tree

5 files changed

+437
-1
lines changed

5 files changed

+437
-1
lines changed

rmp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "rmp"
33
version = "0.8.14"
4-
authors = ["Evgeny Safronov <[email protected]>"]
4+
authors = ["Evgeny Safronov <[email protected]>", "Kornel <[email protected]>"]
55
license = "MIT"
66
description = "Pure Rust MessagePack serialization implementation"
77
repository = "https://github.com/3Hren/msgpack-rust"

rmp/src/decode/est.rs

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
use std::num::{NonZeroU32, NonZeroUsize};
2+
use crate::Marker;
3+
4+
/// Incremental MessagePack parser that can parse incomplete messages,
5+
/// and report their estimated total length.
6+
pub struct MessageLen {
7+
/// The last operation interrupted
8+
wip: Option<WIP>,
9+
/// Max size estimate
10+
max_position: NonZeroUsize,
11+
/// Bytes read so far
12+
position: usize,
13+
/// Stack of open arrays and maps
14+
/// It is not a complete stack. Used only when resumption is needed.
15+
sequences_wip: Vec<Seq>,
16+
/// Nesting of arrays and maps
17+
current_depth: u16,
18+
/// Configured limit
19+
max_depth: u16,
20+
/// Configured limit
21+
max_len: u32,
22+
}
23+
24+
/// [`MessageLen`] result
25+
#[derive(Debug)]
26+
pub enum LenError {
27+
/// The message is truncated, and needs at least this many bytes to parse
28+
Truncated(NonZeroUsize),
29+
/// The message is invalid or exceeded size limits
30+
ParseError,
31+
}
32+
33+
impl LenError {
34+
/// Get expected min length or 0 on error
35+
pub fn len(&self) -> usize {
36+
match *self {
37+
Self::ParseError => 0,
38+
Self::Truncated(l) => l.get(),
39+
}
40+
}
41+
}
42+
43+
impl MessageLen {
44+
/// New parser with default limits
45+
pub fn new() -> Self {
46+
Self::with_limits(1024, (u32::MAX as usize).min(isize::MAX as usize / 2))
47+
}
48+
49+
/// * `max_depth` limits nesting of arrays and maps
50+
///
51+
/// * `max_len` is maximum size of any string, byte string, map, or array.
52+
/// For maps and arrays this is the number of items, not bytes.
53+
///
54+
/// Messages can be both deep and wide, being `max_depth` * `max_len` in size.
55+
/// You should also limit the maximum byte size of the message (outside of this parser).
56+
pub fn with_limits(max_depth: usize, max_len: usize) -> Self {
57+
Self {
58+
max_position: NonZeroUsize::new(1).unwrap(),
59+
position: 0,
60+
current_depth: 0,
61+
max_depth: max_depth.min(u16::MAX as _) as u16,
62+
max_len: max_len.min(u32::MAX as _) as u32,
63+
sequences_wip: Vec::new(),
64+
wip: Some(WIP::NextMarker),
65+
}
66+
}
67+
68+
/// Parse the entire message to find if it's complete, and what is its serialized size in bytes.
69+
///
70+
/// If it returns `Ok(len)`, then the first `len` bytes of the given slice
71+
/// parse as a single MessagePack object.
72+
/// The length may be shorter than the slice given (extra data is gracefully ignored).
73+
///
74+
/// `Err(LenError::Truncated(len))` means that the the object is incomplete, the slice is truncated,
75+
/// and it would need *at least* this many bytes to parse.
76+
/// The `len` is always the lower bound, and never exceeds actual message length.
77+
///
78+
/// `Err(LenError::ParseError)` — the end of the message is unknown.
79+
///
80+
/// Don't call this function in a loop. Use [`MessageLen::incremental_len`] instead.
81+
pub fn len_of(complete_message: &[u8]) -> Result<usize, LenError> {
82+
Self::with_limits(1024, 1<<30).incremental_len(&mut complete_message.as_ref())
83+
}
84+
85+
/// Parse more bytes, and re-evaluate required message length.
86+
///
87+
/// This function is stateful and keeps "appending" the data to its evaluation.
88+
///
89+
/// * `Ok(len)` — this many bytes in total (not just this slice)
90+
/// have been consumed, and contain exactly one MessagePack object. The length
91+
/// may be shorter than the slice (extra data is gracefully ignored).
92+
/// * `Err(LenError::Truncated(len))` — all bytes of this slice have been consumed,
93+
/// and that is still not enough. The object needs at least `len` in total.
94+
/// The `len` is always the lower bound, and never exceeds actual message length.
95+
///
96+
/// * `Err(LenError::ParseError)` — the end of the message is unknown. Any further
97+
/// calls to this function may return nonsense.
98+
pub fn incremental_len(&mut self, mut next_message_fragment: &[u8]) -> Result<usize, LenError> {
99+
let data = &mut next_message_fragment;
100+
let wip = match self.wip.take() {
101+
Some(wip) => wip,
102+
None => return Ok(self.position), // must have succeded already
103+
};
104+
match wip {
105+
WIP::Data(Data { bytes_left }) => self.skip_data(data, bytes_left.get()),
106+
WIP::MarkerLen(wip) => self.read_marker_with_len(data, wip),
107+
WIP::NextMarker => self.read_one_item(data),
108+
WIP::LimitExceeded => {
109+
self.wip = Some(WIP::LimitExceeded); // put it back!
110+
return Err(LenError::ParseError);
111+
},
112+
}.ok_or(LenError::Truncated(self.max_position))?;
113+
114+
while let Some(seq) = self.sequences_wip.pop() {
115+
self.current_depth = seq.depth;
116+
debug_assert!(self.wip.is_none());
117+
self.read_sequence(data, seq.items_left.get() - 1).ok_or(LenError::Truncated(self.max_position))?;
118+
}
119+
debug_assert!(self.wip.is_none());
120+
debug_assert!(self.max_position.get() <= self.position);
121+
Ok(self.position)
122+
}
123+
124+
/// Forget all the state. The next call to `incremental_len` will assume it's the start of a new message.
125+
pub fn reset(&mut self) {
126+
self.max_position = NonZeroUsize::new(1).unwrap();
127+
self.position = 0;
128+
self.current_depth = 0;
129+
self.sequences_wip.clear();
130+
self.wip = Some(WIP::NextMarker);
131+
}
132+
133+
fn read_one_item(&mut self, data: &mut &[u8]) -> Option<()> {
134+
debug_assert!(self.wip.is_none());
135+
let marker = self.read_marker(data)?;
136+
match marker {
137+
Marker::FixPos(_) => Some(()),
138+
Marker::FixMap(len) => self.read_sequence(data, u32::from(len) * 2),
139+
Marker::FixArray(len) => self.read_sequence(data, u32::from(len)),
140+
Marker::FixStr(len) => self.skip_data(data, len.into()),
141+
Marker::Null |
142+
Marker::Reserved |
143+
Marker::False |
144+
Marker::True => Some(()),
145+
Marker::Str8 |
146+
Marker::Str16 |
147+
Marker::Str32 |
148+
Marker::Bin8 |
149+
Marker::Bin16 |
150+
Marker::Bin32 |
151+
Marker::Array16 |
152+
Marker::Array32 |
153+
Marker::Map16 |
154+
Marker::Map32 => self.read_marker_with_len(data, MarkerLen { marker, buf: [0; 4], has: 0 }),
155+
Marker::Ext8 |
156+
Marker::Ext16 |
157+
Marker::Ext32 => todo!(),
158+
Marker::F32 => self.skip_data(data, 4),
159+
Marker::F64 => self.skip_data(data, 8),
160+
Marker::U8 => self.skip_data(data, 1),
161+
Marker::U16 => self.skip_data(data, 2),
162+
Marker::U32 => self.skip_data(data, 4),
163+
Marker::U64 => self.skip_data(data, 8),
164+
Marker::I8 => self.skip_data(data, 1),
165+
Marker::I16 => self.skip_data(data, 2),
166+
Marker::I32 => self.skip_data(data, 4),
167+
Marker::I64 => self.skip_data(data, 8),
168+
Marker::FixExt1 |
169+
Marker::FixExt2 |
170+
Marker::FixExt4 |
171+
Marker::FixExt8 |
172+
Marker::FixExt16 => todo!(),
173+
Marker::FixNeg(_) => Some(()),
174+
}
175+
}
176+
177+
fn read_marker_with_len(&mut self, data: &mut &[u8], mut wip: MarkerLen) -> Option<()> {
178+
let size = wip.size();
179+
debug_assert!(wip.has < size && size > 0 && size <= 4);
180+
let dest = &mut wip.buf[0..size as usize];
181+
let wanted = dest.len().checked_sub(wip.has as _)?;
182+
183+
let taken = self.take_bytes(data, wanted as u32);
184+
dest[wip.has as usize..][..taken.len()].copy_from_slice(taken);
185+
wip.has += taken.len() as u8;
186+
if wip.has < size {
187+
return self.fail(WIP::MarkerLen(wip));
188+
}
189+
let len = match dest.len() {
190+
1 => dest[0].into(),
191+
2 => u16::from_be_bytes(dest.try_into().unwrap()).into(),
192+
4 => u32::from_be_bytes(dest.try_into().unwrap()),
193+
_ => {
194+
debug_assert!(false);
195+
return None
196+
},
197+
};
198+
if len >= self.max_len {
199+
return self.fail(WIP::LimitExceeded);
200+
}
201+
match wip.marker {
202+
Marker::Bin8 |
203+
Marker::Bin16 |
204+
Marker::Bin32 |
205+
Marker::Str8 |
206+
Marker::Str16 |
207+
Marker::Str32 => self.skip_data(data, len),
208+
Marker::Ext8 |
209+
Marker::Ext16 |
210+
Marker::Ext32 => todo!(),
211+
Marker::Array16 |
212+
Marker::Array32 => self.read_sequence(data, len),
213+
Marker::Map16 |
214+
Marker::Map32 => {
215+
if let Some(len) = len.checked_mul(2).filter(|&l| l < self.max_len) {
216+
self.read_sequence(data, len)
217+
} else {
218+
self.fail(WIP::LimitExceeded)
219+
}
220+
},
221+
_ => {
222+
debug_assert!(false);
223+
None
224+
}
225+
}
226+
}
227+
228+
fn read_sequence(&mut self, data: &mut &[u8], mut items_left: u32) -> Option<()> {
229+
self.current_depth += 1;
230+
if self.current_depth > self.max_depth {
231+
return self.fail(WIP::LimitExceeded);
232+
}
233+
while let Some(non_zero) = NonZeroU32::new(items_left) {
234+
let position_before_item = self.position;
235+
self.read_one_item(data).or_else(|| {
236+
self.set_max_position(position_before_item + items_left as usize);
237+
// -1, because it will increase depth again when resumed
238+
self.sequences_wip.push(Seq { items_left: non_zero, depth: self.current_depth-1 });
239+
None
240+
})?;
241+
items_left -= 1;
242+
}
243+
debug_assert!(self.current_depth > 0);
244+
self.current_depth -= 1;
245+
Some(())
246+
}
247+
248+
fn skip_data(&mut self, data: &mut &[u8], wanted: u32) -> Option<()> {
249+
let taken = self.take_bytes(data, wanted);
250+
if let Some(bytes_left) = NonZeroU32::new(wanted - taken.len() as u32) {
251+
debug_assert!(data.is_empty());
252+
self.fail(WIP::Data(Data { bytes_left }))
253+
} else {
254+
Some(())
255+
}
256+
}
257+
258+
fn read_marker(&mut self, data: &mut &[u8]) -> Option<Marker> {
259+
let Some((&b, rest)) = data.split_first() else {
260+
debug_assert!(data.is_empty());
261+
return self.fail(WIP::NextMarker);
262+
};
263+
self.position += 1;
264+
*data = rest;
265+
Some(Marker::from_u8(b))
266+
}
267+
268+
fn set_max_position(&mut self, position: usize) {
269+
self.max_position = NonZeroUsize::new(self.max_position.get().max(position)).unwrap();
270+
}
271+
272+
/// May return less than requested
273+
fn take_bytes<'data>(&mut self, data: &mut &'data [u8], wanted: u32) -> &'data [u8] {
274+
let (taken, rest) = data.split_at(data.len().min(wanted as usize));
275+
self.position += taken.len();
276+
*data = rest;
277+
taken
278+
}
279+
280+
#[inline(always)]
281+
fn fail<T>(&mut self, wip: WIP) -> Option<T> {
282+
debug_assert!(self.wip.is_none());
283+
let pos = match self.wip.insert(wip) {
284+
WIP::NextMarker => self.position + 1,
285+
WIP::Data(Data { bytes_left }) => self.position + bytes_left.get() as usize,
286+
WIP::MarkerLen(m) => self.position + (m.size() - m.has) as usize,
287+
WIP::LimitExceeded => 0,
288+
};
289+
self.set_max_position(pos);
290+
None
291+
}
292+
}
293+
294+
enum WIP {
295+
NextMarker,
296+
Data(Data),
297+
MarkerLen(MarkerLen),
298+
LimitExceeded,
299+
}
300+
301+
struct Seq { items_left: NonZeroU32, depth: u16 }
302+
struct Data { bytes_left: NonZeroU32 }
303+
struct MarkerLen { marker: Marker, buf: [u8; 4], has: u8 }
304+
305+
impl MarkerLen {
306+
fn size(&self) -> u8 {
307+
match self.marker {
308+
Marker::Bin8 => 1,
309+
Marker::Bin16 => 2,
310+
Marker::Bin32 => 4,
311+
Marker::Ext8 => 1,
312+
Marker::Ext16 => 2,
313+
Marker::Ext32 => 4,
314+
Marker::Str8 => 1,
315+
Marker::Str16 => 2,
316+
Marker::Str32 => 4,
317+
Marker::Array16 => 2,
318+
Marker::Array32 => 4,
319+
Marker::Map16 => 2,
320+
Marker::Map32 => 4,
321+
_ => unimplemented!(),
322+
}
323+
}
324+
}

rmp/src/decode/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ mod sint;
1515
mod str;
1616
mod uint;
1717

18+
#[cfg(feature = "std")]
19+
mod est;
20+
#[cfg(feature = "std")]
21+
pub use est::{MessageLen, LenError};
22+
1823
pub use self::dec::{read_f32, read_f64};
1924
pub use self::ext::{
2025
read_ext_meta, read_fixext1, read_fixext16, read_fixext2, read_fixext4, read_fixext8, ExtMeta,

0 commit comments

Comments
 (0)