Skip to content

Commit 485d25d

Browse files
committed
feat(mux): reusable streams
1 parent 049a6e4 commit 485d25d

File tree

9 files changed

+502
-548
lines changed

9 files changed

+502
-548
lines changed

mux/SPEC.md

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ MUX is a symmetric stream multiplexing protocol designed to run over reliable, o
88

99
- Lightweight framing with minimal overhead (14-byte headers)
1010
- Per-stream flow control with automatic window tuning
11-
- Graceful and abrupt stream termination
11+
- Fast stream open/close with ID reuse
1212
- Connection-level resource limits
1313
- Symmetric operation (no client/server distinction)
1414
- Deterministic stream IDs derived from user-defined identifiers
@@ -98,15 +98,14 @@ Flags modify frame behavior. Multiple flags MAY be set simultaneously by combini
9898

9999
| Flag | Value | Applicable Types | Description |
100100
|------|-------|------------------|-------------|
101-
| FIN | `0x01` | Data, Window Update | Half-closes the stream in the sender's direction. |
102-
| RST | `0x02` | Data, Window Update | Immediately resets (terminates) the stream. |
101+
| FIN | `0x01` | Data | Signals end-of-stream marker (see Section 5.4). |
103102
| SYN | `0x04` | Ping | Ping request. |
104103
| ACK | `0x08` | Ping | Ping response. |
105104

106105
### 4.1 Flag Constraints
107106

108-
- FIN and RST MUST NOT be set together. If both are set, RST takes precedence.
109107
- SYN and ACK are only valid on Ping frames.
108+
- FIN is only valid on Data frames.
110109

111110
## 5. Stream Management
112111

@@ -141,46 +140,45 @@ Streams are created implicitly when the first frame for a Stream ID is sent or r
141140

142141
### 5.3 Stream Lifecycle
143142

144-
```
145-
+-------+
146-
| Open |
147-
+-------+
148-
/ | \
149-
FIN← / | \ →FIN
150-
/ | \
151-
+----------+ | +----------+
152-
|RecvClosed| | |SendClosed|
153-
+----------+ | +----------+
154-
\ | /
155-
FIN→ \ | / ←FIN
156-
\ | /
157-
+-------+
158-
|Closed |
159-
+-------+
160-
161-
RST (any state)
162-
```
143+
Streams have a simplified lifecycle optimized for fast open/close with ID reuse:
144+
145+
**Local stream states:**
146+
- **Active**: Application holds a handle to the stream. Can read/write.
147+
- **Buffering**: No handle, but buffered data exists. Data continues to be received.
148+
- **Removed**: No handle and no buffered data. Stream ID available for reuse.
149+
150+
**Opening a stream:**
151+
1. Compute Stream ID from user identifier.
152+
2. If stream exists in Buffering state, transition to Active (access buffered data).
153+
3. If stream does not exist, create new stream in Active state.
163154

164-
| State | Description |
165-
|-------|-------------|
166-
| Open | Bidirectional data flow. |
167-
| SendClosed | Local side sent FIN. Can still receive data. |
168-
| RecvClosed | Remote side sent FIN. Can still send data. |
169-
| Closed | Both directions closed. Stream resources may be released. |
155+
**Closing a stream:**
156+
1. Application drops the stream handle.
157+
2. If buffer is empty, remove stream (ID available for reuse).
158+
3. If buffer is non-empty, transition to Buffering state.
170159

171-
### 5.4 Half-Close (FIN)
160+
**Key properties:**
161+
- Stream IDs can be reused after the stream is removed.
162+
- Dropping a handle does NOT send any protocol message.
163+
- Remote peer is not notified when local handle is dropped.
164+
- Flow control remains active for Buffering streams.
172165

173-
Sending FIN indicates the sender will transmit no more data on this stream. The stream transitions:
174-
- `Open``SendClosed`
175-
- `RecvClosed``Closed`
166+
### 5.4 End-of-Stream Marker (FIN)
176167

177-
Receiving FIN transitions:
178-
- `Open``RecvClosed`
179-
- `SendClosed``Closed`
168+
FIN is an in-band marker signaling the end of a logical message or stream segment. Unlike traditional half-close:
180169

181-
### 5.5 Reset (RST)
170+
- FIN is buffered in-order with data frames.
171+
- Reading FIN returns EOF (0 bytes).
172+
- **Data MAY be sent after FIN.** FIN does not prevent further transmission.
173+
- Applications use FIN for framing; the protocol does not enforce termination.
174+
175+
**Example usage:**
176+
```
177+
Sender: Data("hello") → FIN → Data("world") → FIN
178+
Reader: reads "hello" → reads EOF → reads "world" → reads EOF
179+
```
182180

183-
RST immediately terminates a stream from any state. Both sides SHOULD release stream resources upon sending or receiving RST. No further frames SHOULD be sent on a reset stream.
181+
This allows FIN to delimit messages within a long-lived stream.
184182

185183
## 6. Flow Control
186184

@@ -195,6 +193,7 @@ Each stream maintains an independent receive window representing the number of b
195193
**Behavior:**
196194
- Senders MUST NOT send more data than the receiver's advertised window.
197195
- Each byte of Data payload consumes one byte of window.
196+
- Each FIN marker consumes 32 bytes of window (to prevent FIN spam attacks).
198197
- Window Update frames replenish the window.
199198

200199
### 6.2 Window Updates
@@ -267,7 +266,7 @@ Upon detecting a protocol violation, implementations MUST:
267266

268267
### 8.2 Stream Errors vs Connection Errors
269268

270-
- **Stream errors** (e.g., application-level errors) SHOULD be handled with RST on that stream.
269+
- **Stream errors** (e.g., application-level errors) are handled by the application dropping the stream handle. The remote peer is not explicitly notified.
271270
- **Connection errors** (e.g., protocol violations) MUST be handled with GoAway and connection closure.
272271

273272
## 9. Constants Summary

mux/mux/src/chunks.rs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,21 @@
1212

1313
use std::{collections::VecDeque, io};
1414

15-
/// A sequence of [`Chunk`] values.
15+
/// An element in the buffer - either data or a FIN marker.
16+
#[derive(Debug)]
17+
pub(crate) enum ChunkOrFin {
18+
Chunk(Chunk),
19+
Fin,
20+
}
21+
22+
/// A sequence of [`ChunkOrFin`] values.
1623
///
1724
/// [`Chunks::len`] considers all [`Chunk`] elements and computes the total
1825
/// result, i.e. the length of all bytes, by summing up the lengths of all
19-
/// [`Chunk`] elements.
26+
/// [`Chunk`] elements. FIN markers don't contribute to length.
2027
#[derive(Debug)]
2128
pub(crate) struct Chunks {
22-
seq: VecDeque<Chunk>,
29+
seq: VecDeque<ChunkOrFin>,
2330
len: usize,
2431
}
2532

@@ -34,29 +41,57 @@ impl Chunks {
3441

3542
/// The total length of bytes yet-to-be-read in all `Chunk`s.
3643
pub(crate) fn len(&self) -> usize {
37-
self.len - self.seq.front().map(|c| c.offset()).unwrap_or(0)
44+
let front_offset = self.seq.front().and_then(|e| match e {
45+
ChunkOrFin::Chunk(c) => Some(c.offset()),
46+
ChunkOrFin::Fin => None,
47+
}).unwrap_or(0);
48+
self.len - front_offset
49+
}
50+
51+
/// Returns true if there is no data in the buffer.
52+
///
53+
/// Note: A buffer with only FIN markers is considered empty.
54+
pub(crate) fn is_empty(&self) -> bool {
55+
self.len() == 0
3856
}
3957

4058
/// Add another chunk of bytes to the end.
4159
pub(crate) fn push(&mut self, x: Vec<u8>) {
4260
self.len += x.len();
4361
if !x.is_empty() {
44-
self.seq.push_back(Chunk {
62+
self.seq.push_back(ChunkOrFin::Chunk(Chunk {
4563
cursor: io::Cursor::new(x),
46-
})
64+
}))
4765
}
4866
}
4967

50-
/// Remove and return the first chunk.
51-
pub(crate) fn pop(&mut self) -> Option<Chunk> {
52-
let chunk = self.seq.pop_front();
53-
self.len -= chunk.as_ref().map(|c| c.len() + c.offset()).unwrap_or(0);
54-
chunk
68+
/// Add a FIN marker to the end.
69+
pub(crate) fn push_fin(&mut self) {
70+
self.seq.push_back(ChunkOrFin::Fin);
5571
}
5672

57-
/// Get a mutable reference to the first chunk.
58-
pub(crate) fn front_mut(&mut self) -> Option<&mut Chunk> {
59-
self.seq.front_mut()
73+
/// Remove and return the first element.
74+
pub(crate) fn pop(&mut self) -> Option<ChunkOrFin> {
75+
let elem = self.seq.pop_front();
76+
if let Some(ChunkOrFin::Chunk(ref c)) = elem {
77+
self.len -= c.len() + c.offset();
78+
}
79+
elem
80+
}
81+
82+
/// Get a reference to the first element.
83+
pub(crate) fn front(&self) -> Option<&ChunkOrFin> {
84+
self.seq.front()
85+
}
86+
87+
/// Get a mutable reference to the first chunk, if it is a chunk.
88+
///
89+
/// Returns None if buffer is empty or front is a FIN marker.
90+
pub(crate) fn front_chunk_mut(&mut self) -> Option<&mut Chunk> {
91+
match self.seq.front_mut() {
92+
Some(ChunkOrFin::Chunk(c)) => Some(c),
93+
_ => None,
94+
}
6095
}
6196
}
6297

mux/mux/src/connection.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
8484
matches!(self.inner, ConnectionState::Closed(_))
8585
}
8686

87-
/// Get a handle for creating streams concurrently.
87+
/// Get a handle for obtaining streams concurrently.
8888
///
8989
/// The handle can be cloned and used from multiple tasks while the
9090
/// Connection is being polled.
@@ -95,17 +95,20 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
9595
}
9696
}
9797

98-
/// Create a new stream with the given user ID.
98+
/// Get a stream handle for the given user ID.
99+
///
100+
/// Streams are implicit - if data has already been received for this ID,
101+
/// returns a handle to the existing stream with buffered data. Otherwise,
102+
/// creates a new stream.
99103
///
100104
/// The stream ID is computed from the user ID using BLAKE3.
101-
/// Either side can create streams with the same user ID - they will
102-
/// automatically merge into the same stream.
105+
/// Either side can get streams with the same user ID - they will
106+
/// automatically refer to the same stream.
103107
///
104-
/// The `user_id` parameter is a required user-defined stream identifier
105-
/// (1-256 bytes). User IDs must be unique within the session.
106-
pub fn new_stream(&mut self, user_id: &[u8]) -> Result<Stream> {
108+
/// The `user_id` parameter is a user-defined stream identifier (1-256 bytes).
109+
pub fn get_stream(&mut self, user_id: &[u8]) -> Result<Stream> {
107110
match &mut self.inner {
108-
ConnectionState::Active(active) => active.new_stream(user_id),
111+
ConnectionState::Active(active) => active.get_stream(user_id),
109112
_ => Err(ConnectionError::Closed),
110113
}
111114
}

0 commit comments

Comments
 (0)