Skip to content

Commit

Permalink
[Feat] Change to unix domain socket
Browse files Browse the repository at this point in the history
* Use use tokio::net::UnixListener instead of tokio::net::Unix for unix socket communication

Signed-off-by: shouxunsun <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
Co-authored-by: Yang Keao <[email protected]>
Co-authored-by: xixi <[email protected]>
  • Loading branch information
4 people committed Jul 8, 2022
1 parent a7c97cd commit 84aaeeb
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions chaos-tproxy-controller/src/cmd/interactive/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::task::{Context, Poll};
use anyhow::Error;
use futures::TryStreamExt;
use http::{Method, Request, Response, StatusCode};
use hyper::server::conn::{Connection, Http};
use hyper::server::conn::{Http};
use hyper::service::Service;
use hyper::Body;
use tokio::select;
Expand All @@ -16,7 +16,9 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::instrument;

use crate::cmd::interactive::stdio::StdStream;
use tokio::net::{UnixListener};
#[cfg(unix)]
use std::os::unix::io::{FromRawFd};
use crate::proxy::config::Config;
use crate::proxy::exec::Proxy;
use crate::raw_config::RawConfig;
Expand All @@ -43,21 +45,27 @@ impl ConfigServer {
pub fn serve_interactive(&mut self) {
let mut rx = self.rx.take().unwrap();
let mut service = ConfigService(self.proxy.clone());

self.task = Some(tokio::spawn(async move {
let rx_mut = &mut rx;
let rx_mut = &mut rx;
let unix_listener = UnixListener::from_std(unsafe {std::os::unix::net::UnixListener::from_raw_fd(3)}).unwrap();

loop {
let stream = StdStream::default();
let mut conn = Http::new().serve_connection(stream, &mut service);
let conn_mut = &mut conn;
select! {
_ = &mut *rx_mut => {
tracing::trace!("catch signal in config server.");
Connection::graceful_shutdown(Pin::new(conn_mut));
return Ok(());
},
ret = &mut *conn_mut => if let Err(e) = ret {
tracing::error!("{}",e);
}
stream = unix_listener.accept() => {
let (stream, _) = stream.unwrap();

let http = Http::new();
let conn = http.serve_connection(stream, &mut service);
if let Err(e) = conn.await {
tracing::error!("{}",e);
return Err(anyhow::anyhow!("{}",e));
}
},
};
}
}));
Expand Down

0 comments on commit 84aaeeb

Please sign in to comment.