Skip to content

Commit eab5778

Browse files
authored
fix: fix potential UB caused by unsafe and future movement (#154)
* fix: fix potential UB caused by unsafe and future movement * fix: fix clippy
1 parent 4f1483b commit eab5778

File tree

3 files changed

+140
-85
lines changed

3 files changed

+140
-85
lines changed

monolake-services/src/common/cancel/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl Waiter {
8686
pub fn cancelled(&self) -> bool {
8787
self.handler
8888
.upgrade()
89-
.map_or(true, |handler| unsafe { &*handler.get() }.cancelled)
89+
.is_none_or(|handler| unsafe { &*handler.get() }.cancelled)
9090
}
9191
}
9292

monolake-services/src/http/core.rs

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
//! - Implements connection keep-alive for HTTP/1.1 to reduce connection overhead
6464
//! - Supports HTTP/2 multiplexing for efficient handling of concurrent requests
6565
//! - Automatic protocol detection allows for optimized handling based on the client's capabilities
66-
use std::{convert::Infallible, fmt::Debug, pin::Pin, time::Duration};
66+
use std::{convert::Infallible, fmt::Debug, time::Duration};
6767

6868
use bytes::Bytes;
6969
use certain_map::{Attach, Fork};
@@ -92,7 +92,7 @@ use service_async::{
9292
};
9393
use tracing::{error, info, warn};
9494

95-
use super::{generate_response, util::AccompanyPair};
95+
use super::{generate_response, util::AccompanyPairBase};
9696

9797
/// Core HTTP service handler supporting both HTTP/1.1 and HTTP/2 protocols.
9898
///
@@ -172,51 +172,44 @@ impl<H> HttpCoreService<H> {
172172
// fork ctx
173173
let (mut store, state) = ctx.fork();
174174
let forked_ctx = unsafe { state.attach(&mut store) };
175+
let mut fut_base = std::pin::pin!(AccompanyPairBase::new(decoder.fill_payload()));
175176

176177
// handle request and reply response
177178
// 1. do these things simultaneously: read body and send + handle request
178-
let mut acc_fut = AccompanyPair::new(
179-
self.handler_chain.handle(req, forked_ctx),
180-
decoder.fill_payload(),
181-
);
182-
let res = unsafe { Pin::new_unchecked(&mut acc_fut) }.await;
183-
match res {
179+
let s1 = fut_base
180+
.as_mut()
181+
.stage1(self.handler_chain.handle(req, forked_ctx));
182+
match s1.await {
184183
Ok((resp, should_cont)) => {
185184
// 2. do these things simultaneously: read body and send + handle response
186-
let mut f = acc_fut.replace(encoder.send_and_flush(resp));
185+
let s2 = fut_base.as_mut().stage2(encoder.send_and_flush(resp));
187186
match self.http_timeout.read_body_timeout {
188187
None => {
189-
if let Err(e) = unsafe { Pin::new_unchecked(&mut f) }.await {
188+
if let Err(e) = s2.await {
190189
warn!("error when encode and write response: {e}");
191190
break;
192191
}
193192
}
194-
Some(body_timeout) => {
195-
match monoio::time::timeout(body_timeout, unsafe {
196-
Pin::new_unchecked(&mut f)
197-
})
198-
.await
199-
{
200-
Err(_) => {
201-
info!(
202-
"Connection {:?} write timed out",
203-
ParamRef::<PeerAddr>::param_ref(&ctx),
204-
);
205-
break;
206-
}
207-
Ok(Err(e)) => {
208-
warn!("error when encode and write response: {e}");
209-
break;
210-
}
211-
_ => (),
193+
Some(body_timeout) => match monoio::time::timeout(body_timeout, s2).await {
194+
Err(_) => {
195+
info!(
196+
"Connection {:?} write timed out",
197+
ParamRef::<PeerAddr>::param_ref(&ctx),
198+
);
199+
break;
212200
}
213-
}
201+
Ok(Err(e)) => {
202+
warn!("error when encode and write response: {e}");
203+
break;
204+
}
205+
_ => (),
206+
},
214207
}
215208

216209
if !should_cont {
217210
break;
218211
}
219-
if let Err(e) = f.into_accompany().await {
212+
if let Err(e) = fut_base.as_mut().stage3().await {
220213
warn!("error when decode request body: {e}");
221214
break;
222215
}

monolake-services/src/http/util.rs

Lines changed: 116 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,152 @@
1-
use std::{future::Future, task::Poll};
1+
use std::{
2+
future::Future,
3+
mem::{ManuallyDrop, MaybeUninit},
4+
ops::DerefMut,
5+
pin::Pin,
6+
task::Poll,
7+
};
28

39
use http::{HeaderValue, Request, Response, StatusCode};
410
use monoio_http::common::body::FixedBody;
511
use monolake_core::http::{HttpError, HttpHandler, ResponseWithContinue};
612
use service_async::Service;
713

8-
pin_project_lite::pin_project! {
9-
/// AccompanyPair for http decoder and processor.
10-
/// We have to fill payload when process request
11-
/// since inner logic may read chunked body; also
12-
/// fill payload when process response since we
13-
/// may use the request body stream in response
14-
/// body stream.
15-
pub(crate) struct AccompanyPair<FMAIN, FACC, T> {
16-
#[pin]
17-
main: FMAIN,
18-
#[pin]
19-
accompany: FACC,
20-
accompany_slot: Option<T>
14+
union Union<A, B> {
15+
a: ManuallyDrop<A>,
16+
b: ManuallyDrop<B>,
17+
}
18+
19+
/// AccompanyPair for http decoder and processor.
20+
/// We have to fill payload when process request
21+
/// since inner logic may read chunked body; also
22+
/// fill payload when process response since we
23+
/// may use the request body stream in response
24+
/// body stream.
25+
pub(crate) struct AccompanyPairBase<F1, F2, FACC, T> {
26+
main: MaybeUninit<Union<F1, F2>>,
27+
accompany: FACC,
28+
accompany_slot: Option<T>,
29+
}
30+
31+
impl<F1, F2, FACC, T> AccompanyPairBase<F1, F2, FACC, T> {
32+
pub(crate) const fn new(accompany: FACC) -> Self {
33+
Self {
34+
main: MaybeUninit::uninit(),
35+
accompany,
36+
accompany_slot: None,
37+
}
38+
}
39+
40+
pub(crate) fn stage1(
41+
mut self: Pin<&mut Self>,
42+
future1: F1,
43+
) -> AccompanyPairS1<F1, F2, FACC, T> {
44+
unsafe {
45+
self.as_mut().get_unchecked_mut().main.assume_init_mut().a = ManuallyDrop::new(future1);
46+
}
47+
AccompanyPairS1(self)
48+
}
49+
50+
pub(crate) fn stage2(
51+
mut self: Pin<&mut Self>,
52+
future2: F2,
53+
) -> AccompanyPairS2<F1, F2, FACC, T> {
54+
unsafe {
55+
self.as_mut().get_unchecked_mut().main.assume_init_mut().b = ManuallyDrop::new(future2);
56+
}
57+
AccompanyPairS2(self)
58+
}
59+
60+
pub(crate) const fn stage3(self: Pin<&mut Self>) -> AccompanyPairS3<F1, F2, FACC, T> {
61+
AccompanyPairS3(self)
2162
}
2263
}
2364

24-
pin_project_lite::pin_project! {
25-
/// Accompany for http decoder and processor.
26-
pub(crate) struct Accompany<FACC, T> {
27-
#[pin]
28-
accompany: FACC,
29-
accompany_slot: Option<T>
65+
#[repr(transparent)]
66+
pub(crate) struct AccompanyPairS1<'a, F1, F2, FACC, T>(
67+
Pin<&'a mut AccompanyPairBase<F1, F2, FACC, T>>,
68+
);
69+
70+
impl<F1, F2, FACC, T> Drop for AccompanyPairS1<'_, F1, F2, FACC, T> {
71+
fn drop(&mut self) {
72+
unsafe {
73+
ManuallyDrop::drop(&mut self.0.as_mut().get_unchecked_mut().main.assume_init_mut().a);
74+
}
3075
}
3176
}
3277

33-
impl<FMAIN, FACC, T> Future for AccompanyPair<FMAIN, FACC, T>
78+
impl<F1, F2, FACC, T> Future for AccompanyPairS1<'_, F1, F2, FACC, T>
3479
where
35-
FMAIN: Future,
80+
F1: Future,
3681
FACC: Future<Output = T>,
3782
{
38-
type Output = FMAIN::Output;
83+
type Output = F1::Output;
3984

40-
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
41-
let this = self.project();
85+
fn poll(
86+
mut self: std::pin::Pin<&mut Self>,
87+
cx: &mut std::task::Context<'_>,
88+
) -> Poll<Self::Output> {
89+
let this = unsafe { self.0.as_mut().get_unchecked_mut() };
4290
if this.accompany_slot.is_none()
43-
&& let Poll::Ready(t) = this.accompany.poll(cx)
91+
&& let Poll::Ready(t) = unsafe { Pin::new_unchecked(&mut this.accompany) }.poll(cx)
4492
{
45-
*this.accompany_slot = Some(t);
93+
this.accompany_slot = Some(t);
4694
}
47-
this.main.poll(cx)
95+
unsafe { Pin::new_unchecked(this.main.assume_init_mut().a.deref_mut()).poll(cx) }
4896
}
4997
}
5098

51-
impl<FACC, T> Future for Accompany<FACC, T>
99+
#[repr(transparent)]
100+
pub(crate) struct AccompanyPairS2<'a, F1, F2, FACC, T>(
101+
Pin<&'a mut AccompanyPairBase<F1, F2, FACC, T>>,
102+
);
103+
104+
impl<F1, F2, FACC, T> Drop for AccompanyPairS2<'_, F1, F2, FACC, T> {
105+
fn drop(&mut self) {
106+
unsafe {
107+
ManuallyDrop::drop(&mut self.0.as_mut().get_unchecked_mut().main.assume_init_mut().b);
108+
}
109+
}
110+
}
111+
112+
impl<F1, F2, FACC, T> Future for AccompanyPairS2<'_, F1, F2, FACC, T>
52113
where
114+
F2: Future,
53115
FACC: Future<Output = T>,
54116
{
55-
type Output = T;
117+
type Output = F2::Output;
56118

57-
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
58-
let this = self.project();
59-
if let Some(t) = this.accompany_slot.take() {
60-
return Poll::Ready(t);
119+
fn poll(
120+
mut self: std::pin::Pin<&mut Self>,
121+
cx: &mut std::task::Context<'_>,
122+
) -> Poll<Self::Output> {
123+
let this = unsafe { self.0.as_mut().get_unchecked_mut() };
124+
if this.accompany_slot.is_none()
125+
&& let Poll::Ready(t) = unsafe { Pin::new_unchecked(&mut this.accompany) }.poll(cx)
126+
{
127+
this.accompany_slot = Some(t);
61128
}
62-
this.accompany.poll(cx)
129+
unsafe { Pin::new_unchecked(this.main.assume_init_mut().b.deref_mut()).poll(cx) }
63130
}
64131
}
65132

66-
impl<FMAIN, FACC, T> AccompanyPair<FMAIN, FACC, T> {
67-
pub(crate) fn new(main: FMAIN, accompany: FACC) -> Self {
68-
Self {
69-
main,
70-
accompany,
71-
accompany_slot: None,
72-
}
73-
}
133+
#[repr(transparent)]
134+
pub(crate) struct AccompanyPairS3<'a, F1, F2, FACC, T>(
135+
Pin<&'a mut AccompanyPairBase<F1, F2, FACC, T>>,
136+
);
74137

75-
pub(crate) fn replace<FMAIN2>(self, main: FMAIN2) -> AccompanyPair<FMAIN2, FACC, T> {
76-
AccompanyPair {
77-
main,
78-
accompany: self.accompany,
79-
accompany_slot: self.accompany_slot,
80-
}
81-
}
138+
impl<F1, F2, FACC: Future<Output = T>, T> Future for AccompanyPairS3<'_, F1, F2, FACC, T> {
139+
type Output = FACC::Output;
82140

83-
pub(crate) fn into_accompany(self) -> Accompany<FACC, T> {
84-
Accompany {
85-
accompany: self.accompany,
86-
accompany_slot: self.accompany_slot,
141+
fn poll(
142+
mut self: std::pin::Pin<&mut Self>,
143+
cx: &mut std::task::Context<'_>,
144+
) -> Poll<Self::Output> {
145+
let this = unsafe { self.0.as_mut().get_unchecked_mut() };
146+
if let Some(t) = this.accompany_slot.take() {
147+
return Poll::Ready(t);
87148
}
149+
unsafe { Pin::new_unchecked(&mut this.accompany) }.poll(cx)
88150
}
89151
}
90152

0 commit comments

Comments
 (0)