@@ -17,10 +17,20 @@ use pin_project_lite::pin_project;
1717use tokio:: sync:: watch;
1818
1919/// A graceful shutdown utility
20+ // Purposefully not `Clone`, see `watcher()` method for why.
2021pub struct GracefulShutdown {
2122 tx : watch:: Sender < ( ) > ,
2223}
2324
25+ /// A watcher side of the graceful shutdown.
26+ ///
27+ /// This type can only watch a connection, it cannot trigger a shutdown.
28+ ///
29+ /// Call [`GracefulShutdown::watcher()`] to construct one of these.
30+ pub struct Watcher {
31+ rx : watch:: Receiver < ( ) > ,
32+ }
33+
2434impl GracefulShutdown {
2535 /// Create a new graceful shutdown helper.
2636 pub fn new ( ) -> Self {
@@ -30,12 +40,20 @@ impl GracefulShutdown {
3040
3141 /// Wrap a future for graceful shutdown watching.
3242 pub fn watch < C : GracefulConnection > ( & self , conn : C ) -> impl Future < Output = C :: Output > {
33- let mut rx = self . tx . subscribe ( ) ;
34- GracefulConnectionFuture :: new ( conn, async move {
35- let _ = rx. changed ( ) . await ;
36- // hold onto the rx until the watched future is completed
37- rx
38- } )
43+ self . watcher ( ) . watch ( conn)
44+ }
45+
46+ /// Create an owned type that can watch a connection.
47+ ///
48+ /// This method allows created an owned type that can be sent onto another
49+ /// task before calling [`Watcher::watch()`].
50+ // Internal: this function exists because `Clone` allows footguns.
51+ // If the `tx` were cloned (or the `rx`), race conditions can happens where
52+ // one task starting a shutdown is scheduled and interwined with a task
53+ // starting to watch a connection, and the "watch version" is one behind.
54+ pub fn watcher ( & self ) -> Watcher {
55+ let rx = self . tx . subscribe ( ) ;
56+ Watcher { rx }
3957 }
4058
4159 /// Signal shutdown for all watched connections.
@@ -64,6 +82,24 @@ impl Default for GracefulShutdown {
6482 }
6583}
6684
85+ impl Watcher {
86+ /// Wrap a future for graceful shutdown watching.
87+ pub fn watch < C : GracefulConnection > ( self , conn : C ) -> impl Future < Output = C :: Output > {
88+ let Watcher { mut rx } = self ;
89+ GracefulConnectionFuture :: new ( conn, async move {
90+ let _ = rx. changed ( ) . await ;
91+ // hold onto the rx until the watched future is completed
92+ rx
93+ } )
94+ }
95+ }
96+
97+ impl Debug for Watcher {
98+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
99+ f. debug_struct ( "GracefulWatcher" ) . finish ( )
100+ }
101+ }
102+
67103pin_project ! {
68104 struct GracefulConnectionFuture <C , F : Future > {
69105 #[ pin]
0 commit comments