From 84aaeebcbe53686650eac231daaab0b7b9c3d649 Mon Sep 17 00:00:00 2001 From: RandyLambert Date: Fri, 8 Jul 2022 18:27:29 +0800 Subject: [PATCH] [Feat] Change to unix domain socket * Use use tokio::net::UnixListener instead of tokio::net::Unix for unix socket communication Signed-off-by: shouxunsun Co-authored-by: Ti Chi Robot Co-authored-by: Yang Keao Co-authored-by: xixi --- .../src/cmd/interactive/handler.rs | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/chaos-tproxy-controller/src/cmd/interactive/handler.rs b/chaos-tproxy-controller/src/cmd/interactive/handler.rs index 202ede2..9c9c527 100644 --- a/chaos-tproxy-controller/src/cmd/interactive/handler.rs +++ b/chaos-tproxy-controller/src/cmd/interactive/handler.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use anyhow::Error; use futures::TryStreamExt; use http::{Method, Request, Response, StatusCode}; -use hyper::server::conn::{Connection, Http}; +use hyper::server::conn::{Http}; use hyper::service::Service; use hyper::Body; use tokio::select; @@ -16,7 +16,9 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::instrument; -use crate::cmd::interactive::stdio::StdStream; +use tokio::net::{UnixListener}; +#[cfg(unix)] +use std::os::unix::io::{FromRawFd}; use crate::proxy::config::Config; use crate::proxy::exec::Proxy; use crate::raw_config::RawConfig; @@ -43,21 +45,27 @@ impl ConfigServer { pub fn serve_interactive(&mut self) { let mut rx = self.rx.take().unwrap(); let mut service = ConfigService(self.proxy.clone()); + self.task = Some(tokio::spawn(async move { - let rx_mut = &mut rx; + let rx_mut = &mut rx; + let unix_listener = UnixListener::from_std(unsafe {std::os::unix::net::UnixListener::from_raw_fd(3)}).unwrap(); + loop { - let stream = StdStream::default(); - let mut conn = Http::new().serve_connection(stream, &mut service); - let conn_mut = &mut conn; select! { _ = &mut *rx_mut => { tracing::trace!("catch signal in config server."); - Connection::graceful_shutdown(Pin::new(conn_mut)); return Ok(()); }, - ret = &mut *conn_mut => if let Err(e) = ret { - tracing::error!("{}",e); - } + stream = unix_listener.accept() => { + let (stream, _) = stream.unwrap(); + + let http = Http::new(); + let conn = http.serve_connection(stream, &mut service); + if let Err(e) = conn.await { + tracing::error!("{}",e); + return Err(anyhow::anyhow!("{}",e)); + } + }, }; } }));