-
Notifications
You must be signed in to change notification settings - Fork 73
Ignore SIGPIPE #195
Comments
That PR to ignore SIGPIPE is to disable the sending of the SIGPIPE signal to the process, the EPIPE is still returned either way (signals just usually cause the whole process to abort). What are you seeing the error from? From a specific RPC future? Or from the background connection future? |
@seanmonstar The error, pub fn dummy_server<D>(db: D) {
// See https://github.com/tower-rs/tower-grpc/issues/195
unsafe {
signal(Signal::SIGPIPE, SigHandler::SigIgn).unwrap();
}
let service = PipelineReadServiceServer::new(DbServer::new(db));
let mut server = Server::new(service);
let http = Http::new().http2_only(true).clone();
let addr = "[::1]:50051".parse().unwrap();
let bind = TcpListener::bind(&addr).expect(&format!("Bind failure at {:?}", addr));
let serve = bind
.incoming()
.take(1)
.for_each(move |sock| {
if let Err(e) = sock.set_nodelay(true) {
return Err(e);
}
let serve = server.serve_with(sock, http.clone());
tokio::spawn(serve.map_err(|e| match e {
Error::Protocol(e) => {
// Ignore SIGPIPE?
if e.kind() != hyper::error::Kind::Io {
eprintln!("Protocol error: {}", e)
}
}
_ => eprintln!("Serve error: {}", e),
}));
Ok(())
})
.map_err(|e| eprintln!("Accept error: {}", e));
tokio::run(serve);
} The client, in turn, looks like this pub fn dummy_client() {
let uri: http::Uri = format!("http://[::1]:50051").parse().unwrap();
let dst = Destination::try_from_uri(uri.clone()).unwrap();
let connector = util::Connector::new(HttpConnector::new(4));
let settings = client::Builder::new().http2_only(true).clone();
let mut make_client = client::Connect::with_builder(connector, settings);
let reader = make_client
.make_service(dst)
.map_err(|e| panic!("Connection error: {:?}", e))
.and_then(move |conn| {
let conn = tower_request_modifier::Builder::new()
.set_origin(uri)
.build(conn)
.unwrap();
PipelineReadService::new(conn).ready()
})
.and_then(|mut client| {
// Some unrelated code here
client.range_read(Request::new(...))
})
.and_then(|_response| Ok(()))
.map_err(|e| eprintln!("Error: {:?}", e));
tokio::run(reader);
} |
I'll add that the actual test procedure is this: #[test]
fn basic_server_client_memdb() {
let db = db::dummy_memdb();
let server = std::thread::Builder::new()
.name("dummy_grpc_server".to_owned())
.spawn(|| server::dummy_server(db))
.unwrap();
let client = std::thread::Builder::new()
.name("dummy_grpc_client".to_owned())
.spawn(|| client::dummy_client())
.unwrap();
client.join().unwrap();
server.join().unwrap();
} |
So it appears that the server connection future itself is reporting the error. It could be trying to reply to the GOAWAY from the client. (Note, you don't need to do anything about SIGPIPE signals, Rust disables them automatically in its prelude before |
I see, so I can just ignore the error (i.e. not print it out) and it's working as expected? |
I've been using tower-grpc at work, and after writing a test for a server/client comm I found that it would fail sporadically with a broken pipe error:
Now, the test would work, the data seemed to go through, and everything was fine if not for that annoying broken pipe. I used strace to see what was going on, and found the problem area:
I will also note the flags set when opening the socket:
I think this happens because, on the client side, the HTTP client is moved into the future that is passed onto the Tokio executor, which then drops the connection once that is completed. The canonical GRPC implementation circumvents this issue, I think, by ignoring SIGPIPE which I believe we should so as well.
Have I identified the issue correctly?
The text was updated successfully, but these errors were encountered: