diff --git a/Cargo.toml b/Cargo.toml index 7e919c71..06385a2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,9 @@ rand = "0.8.4" tide = "0.16" actix-web = "4" tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "signal"] } -hyper = { version = "0.14.16", features = ["server", "http1", "tcp"] } +hyper = { version = "1.3.1", features = ["server", "http1"] } +hyper-util = { version = "0.1.3", features = ["tokio"] } +http-body-util = "0.1.1" [build-dependencies] prost-build = { version = "0.11.0", optional = true } diff --git a/examples/hyper.rs b/examples/hyper.rs index f5a4009d..82ee121b 100644 --- a/examples/hyper.rs +++ b/examples/hyper.rs @@ -1,7 +1,11 @@ +use http_body_util::{combinators, BodyExt, Full}; use hyper::{ - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, + body::{Bytes, Incoming}, + server::conn::http1, + service::service_fn, + Request, Response, }; +use hyper_util::rt::TokioIo; use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry}; use std::{ future::Future, @@ -10,7 +14,11 @@ use std::{ pin::Pin, sync::Arc, }; -use tokio::signal::unix::{signal, SignalKind}; +use tokio::{ + net::TcpListener, + pin, + signal::unix::{signal, SignalKind}, +}; #[tokio::main] async fn main() { @@ -31,39 +39,48 @@ async fn main() { /// Start a HTTP server to report metrics. pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) { - let mut shutdown_stream = signal(SignalKind::terminate()).unwrap(); - eprintln!("Starting metrics server on {metrics_addr}"); let registry = Arc::new(registry); - Server::bind(&metrics_addr) - .serve(make_service_fn(move |_conn| { - let registry = registry.clone(); - async move { - let handler = make_handler(registry); - Ok::<_, io::Error>(service_fn(handler)) + + let tcp_listener = TcpListener::bind(metrics_addr).await.unwrap(); + let server = http1::Builder::new(); + while let Ok((stream, _)) = tcp_listener.accept().await { + let mut shutdown_stream = signal(SignalKind::terminate()).unwrap(); + let io = TokioIo::new(stream); + let server_clone = server.clone(); + let registry_clone = registry.clone(); + tokio::task::spawn(async move { + let conn = server_clone.serve_connection(io, service_fn(make_handler(registry_clone))); + pin!(conn); + tokio::select! { + _ = conn.as_mut() => {} + _ = shutdown_stream.recv() => { + conn.as_mut().graceful_shutdown(); + } } - })) - .with_graceful_shutdown(async move { - shutdown_stream.recv().await; - }) - .await - .unwrap(); + }); + } } +/// Boxed HTTP body for responses +type BoxBody = combinators::BoxBody; + /// This function returns a HTTP handler (i.e. another function) pub fn make_handler( registry: Arc, -) -> impl Fn(Request) -> Pin>> + Send>> { +) -> impl Fn(Request) -> Pin>> + Send>> +{ // This closure accepts a request and responds with the OpenMetrics encoding of our metrics. - move |_req: Request| { + move |_req: Request| { let reg = registry.clone(); + Box::pin(async move { let mut buf = String::new(); encode(&mut buf, ®.clone()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) .map(|_| { - let body = Body::from(buf); + let body = full(Bytes::from(buf)); Response::builder() .header( hyper::header::CONTENT_TYPE, @@ -75,3 +92,8 @@ pub fn make_handler( }) } } + +/// helper function to build a full boxed body +pub fn full(body: Bytes) -> BoxBody { + Full::new(body).map_err(|never| match never {}).boxed() +}