1+ use http_body_util:: { combinators, BodyExt , Full } ;
12use hyper:: {
2- service:: { make_service_fn, service_fn} ,
3- Body , Request , Response , Server ,
3+ body:: { Bytes , Incoming } ,
4+ server:: conn:: http1,
5+ service:: service_fn,
6+ Request , Response ,
47} ;
8+ use hyper_util:: rt:: TokioIo ;
59use prometheus_client:: { encoding:: text:: encode, metrics:: counter:: Counter , registry:: Registry } ;
610use std:: {
711 future:: Future ,
@@ -10,7 +14,11 @@ use std::{
1014 pin:: Pin ,
1115 sync:: Arc ,
1216} ;
13- use tokio:: signal:: unix:: { signal, SignalKind } ;
17+ use tokio:: {
18+ net:: TcpListener ,
19+ pin,
20+ signal:: unix:: { signal, SignalKind } ,
21+ } ;
1422
1523#[ tokio:: main]
1624async fn main ( ) {
@@ -31,39 +39,48 @@ async fn main() {
3139
3240/// Start a HTTP server to report metrics.
3341pub async fn start_metrics_server ( metrics_addr : SocketAddr , registry : Registry ) {
34- let mut shutdown_stream = signal ( SignalKind :: terminate ( ) ) . unwrap ( ) ;
35-
3642 eprintln ! ( "Starting metrics server on {metrics_addr}" ) ;
3743
3844 let registry = Arc :: new ( registry) ;
39- Server :: bind ( & metrics_addr)
40- . serve ( make_service_fn ( move |_conn| {
41- let registry = registry. clone ( ) ;
42- async move {
43- let handler = make_handler ( registry) ;
44- Ok :: < _ , io:: Error > ( service_fn ( handler) )
45+
46+ let tcp_listener = TcpListener :: bind ( metrics_addr) . await . unwrap ( ) ;
47+ let server = http1:: Builder :: new ( ) ;
48+ while let Ok ( ( stream, _) ) = tcp_listener. accept ( ) . await {
49+ let mut shutdown_stream = signal ( SignalKind :: terminate ( ) ) . unwrap ( ) ;
50+ let io = TokioIo :: new ( stream) ;
51+ let server_clone = server. clone ( ) ;
52+ let registry_clone = registry. clone ( ) ;
53+ tokio:: task:: spawn ( async move {
54+ let conn = server_clone. serve_connection ( io, service_fn ( make_handler ( registry_clone) ) ) ;
55+ pin ! ( conn) ;
56+ tokio:: select! {
57+ _ = conn. as_mut( ) => { }
58+ _ = shutdown_stream. recv( ) => {
59+ conn. as_mut( ) . graceful_shutdown( ) ;
60+ }
4561 }
46- } ) )
47- . with_graceful_shutdown ( async move {
48- shutdown_stream. recv ( ) . await ;
49- } )
50- . await
51- . unwrap ( ) ;
62+ } ) ;
63+ }
5264}
5365
66+ /// Boxed HTTP body for responses
67+ type BoxBody = combinators:: BoxBody < Bytes , hyper:: Error > ;
68+
5469/// This function returns a HTTP handler (i.e. another function)
5570pub fn make_handler (
5671 registry : Arc < Registry > ,
57- ) -> impl Fn ( Request < Body > ) -> Pin < Box < dyn Future < Output = io:: Result < Response < Body > > > + Send > > {
72+ ) -> impl Fn ( Request < Incoming > ) -> Pin < Box < dyn Future < Output = io:: Result < Response < BoxBody > > > + Send > >
73+ {
5874 // This closure accepts a request and responds with the OpenMetrics encoding of our metrics.
59- move |_req : Request < Body > | {
75+ move |_req : Request < Incoming > | {
6076 let reg = registry. clone ( ) ;
77+
6178 Box :: pin ( async move {
6279 let mut buf = String :: new ( ) ;
6380 encode ( & mut buf, & reg. clone ( ) )
6481 . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e) )
6582 . map ( |_| {
66- let body = Body :: from ( buf) ;
83+ let body = full ( Bytes :: from ( buf) ) ;
6784 Response :: builder ( )
6885 . header (
6986 hyper:: header:: CONTENT_TYPE ,
@@ -75,3 +92,8 @@ pub fn make_handler(
7592 } )
7693 }
7794}
95+
96+ /// helper function to build a full boxed body
97+ pub fn full ( body : Bytes ) -> BoxBody {
98+ Full :: new ( body) . map_err ( |never| match never { } ) . boxed ( )
99+ }
0 commit comments