Skip to content

Commit cc77231

Browse files
committed
refactor: remove signal_rx parameter from session.run() and handle migration signals internally
1 parent 3057c06 commit cc77231

File tree

11 files changed

+97
-72
lines changed

11 files changed

+97
-72
lines changed

moq-clock-ietf/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async fn main() -> anyhow::Result<()> {
6464
let clock_publisher = clock::Publisher::new_datagram(track_writer.datagrams()?);
6565

6666
tokio::select! {
67-
res = session.run(None) => res.context("session error")?,
67+
res = session.run() => res.context("session error")?,
6868
res = clock_publisher.run() => res.context("clock error")?,
6969
res = publisher.announce(tracks_reader) => res.context("failed to serve tracks")?,
7070
}
@@ -80,7 +80,7 @@ async fn main() -> anyhow::Result<()> {
8080
let clock_publisher = clock::Publisher::new(track_writer.subgroups()?);
8181

8282
tokio::select! {
83-
res = session.run(None) => res.context("session error")?,
83+
res = session.run() => res.context("session error")?,
8484
res = clock_publisher.run() => res.context("clock error")?,
8585
res = publisher.announce(tracks_reader) => res.context("failed to serve tracks")?,
8686
}
@@ -104,7 +104,7 @@ async fn main() -> anyhow::Result<()> {
104104
let clock_subscriber = clock::Subscriber::new(track_reader);
105105

106106
tokio::select! {
107-
res = session.run(None) => res.context("session error")?,
107+
res = session.run() => res.context("session error")?,
108108
res = clock_subscriber.run() => res.context("clock error")?,
109109
res = subscriber.subscribe(track_writer) => res.context("failed to subscribe to track")?,
110110
}

moq-pub/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> {
7676
.context("failed to create MoQ Transport publisher")?;
7777

7878
tokio::select! {
79-
res = session.run(None) => res.context("session error")?,
79+
res = session.run() => res.context("session error")?,
8080
res = run_media(media) => {
8181
res.context("media error")?
8282
},

moq-relay-ietf/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ pub struct Cli {
6868
/// Requires --dev to enable the web server. Only serves files by exact CID - no index.
6969
#[arg(long)]
7070
pub mlog_serve: bool,
71+
72+
/// The public URL we advertise to other origins.
73+
/// The provided certificate must be valid for this address.
74+
#[arg(long)]
75+
#[arg(default_value = "https://localhost:4443")]
76+
pub public_url: Option<Url>,
7177
}
7278

7379
#[tokio::main]
@@ -105,6 +111,7 @@ async fn main() -> anyhow::Result<()> {
105111

106112
// Create a QUIC server for media.
107113
let relay = Relay::new(RelayConfig {
114+
public_url: cli.public_url,
108115
tls: tls.clone(),
109116
bind: cli.bind,
110117
qlog_dir: qlog_dir_for_relay,

moq-relay-ietf/src/relay.rs

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,14 @@ pub struct RelayConfig {
3333
/// Our hostname which we advertise to other origins.
3434
/// We use QUIC, so the certificate must be valid for this address.
3535
pub node: Option<Url>,
36+
37+
/// The public URL we advertise to other origins.
38+
pub public_url: Option<Url>,
3639
}
3740

3841
/// MoQ Relay server.
3942
pub struct Relay {
43+
public_url: Option<Url>,
4044
quic: quic::Endpoint,
4145
announce_url: Option<Url>,
4246
mlog_dir: Option<PathBuf>,
@@ -85,6 +89,7 @@ impl Relay {
8589
});
8690

8791
Ok(Self {
92+
public_url: config.public_url,
8893
quic,
8994
announce_url: config.announce,
9095
mlog_dir: config.mlog_dir,
@@ -105,7 +110,7 @@ impl Relay {
105110
let mut signal_int =
106111
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
107112

108-
let (signal_tx, signal_rx) = broadcast::channel::<SessionMigration>(16);
113+
let (signal_tx, mut signal_rx) = broadcast::channel::<SessionMigration>(16);
109114

110115
// Get server address early for the shutdown signal
111116
let server_addr = self
@@ -114,7 +119,12 @@ impl Relay {
114119
.as_ref()
115120
.context("missing TLS certificate")?
116121
.local_addr()?;
117-
let shutdown_uri = format!("https://{}", server_addr);
122+
// FIXME(itzmanish): this gives [::]:4433, which is not a valid URL
123+
let shutdown_uri = if let Some(public_url) = &self.public_url {
124+
public_url.clone().into()
125+
} else {
126+
format!("https://{}", server_addr)
127+
};
118128

119129
// Spawn task to listen for SIGTERM and broadcast shutdown
120130
let signal_tx_clone = signal_tx.clone();
@@ -148,8 +158,7 @@ impl Relay {
148158

149159
// Start the remotes producer task, if any
150160
let remotes = self.remotes.map(|(producer, consumer)| {
151-
let signal_rx = signal_rx.resubscribe();
152-
tasks.push(producer.run(signal_rx).boxed());
161+
tasks.push(producer.run().boxed());
153162
consumer
154163
});
155164

@@ -166,10 +175,13 @@ impl Relay {
166175
.context("failed to establish forward connection")?;
167176

168177
// Create the MoQ session over the connection
169-
let (session, publisher, subscriber) =
170-
moq_transport::session::Session::connect(session, None)
171-
.await
172-
.context("failed to establish forward session")?;
178+
let (session, publisher, subscriber) = moq_transport::session::Session::connect(
179+
session,
180+
None,
181+
Some(signal_tx_clone.subscribe()),
182+
)
183+
.await
184+
.context("failed to establish forward session")?;
173185

174186
// Create a normal looking session, except we never forward or register announces.
175187
let session = Session {
@@ -184,15 +196,7 @@ impl Relay {
184196

185197
let forward_producer = session.producer.clone();
186198

187-
tasks.push(
188-
async move {
189-
session
190-
.run(signal_tx_clone.subscribe())
191-
.await
192-
.context("forwarding failed")
193-
}
194-
.boxed(),
195-
);
199+
tasks.push(async move { session.run().await.context("forwarding failed") }.boxed());
196200

197201
forward_producer
198202
} else {
@@ -202,7 +206,6 @@ impl Relay {
202206
// Start the QUIC server loop
203207
let mut server = self.quic.server.context("missing TLS certificate")?;
204208
log::info!("listening on {}", server.local_addr()?);
205-
let mut cloned_signal_rx = signal_rx.resubscribe();
206209

207210
loop {
208211
tokio::select! {
@@ -218,12 +221,12 @@ impl Relay {
218221
let remotes = remotes.clone();
219222
let forward = forward_producer.clone();
220223
let api = self.api.clone();
221-
let session_signal_rx = signal_rx.resubscribe();
224+
let session_signal_rx = signal_tx_clone.subscribe();
222225

223226
// Spawn a new task to handle the connection
224227
tasks.push(async move {
225228
// Create the MoQ session over the connection (setup handshake etc)
226-
let (session, publisher, subscriber) = match moq_transport::session::Session::accept(conn, mlog_path).await {
229+
let (session, publisher, subscriber) = match moq_transport::session::Session::accept(conn, mlog_path, Some(session_signal_rx)).await {
227230
Ok(session) => session,
228231
Err(err) => {
229232
log::warn!("failed to accept MoQ session: {}", err);
@@ -238,26 +241,35 @@ impl Relay {
238241
consumer: subscriber.map(|subscriber| Consumer::new(subscriber, locals, api, forward)),
239242
};
240243

241-
if let Err(err) = session.run(session_signal_rx).await {
244+
if let Err(err) = session.run().await {
242245
log::warn!("failed to run MoQ session: {}", err);
243246
}
244247

245248
Ok(())
246249
}.boxed());
247250
},
248251
res = tasks.next(), if !tasks.is_empty() => res.unwrap()?,
249-
_ = cloned_signal_rx.recv() => {
250-
log::info!("received shutdown signal, shutting down. Active tasks: {}", tasks.len());
251-
// set a timeout for waiting for tasks to be empty
252-
// FIXME(itzmanish): make this configurable and revisit
253-
let timeout = tokio::time::timeout(tokio::time::Duration::from_secs(20), async move {
254-
while !tasks.is_empty() {
255-
// sleep 500ms before checking again
256-
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
252+
_ = signal_rx.recv() => {
253+
log::info!("received shutdown signal, waiting for {} active tasks to complete", tasks.len());
254+
255+
// Give sessions a moment to send GOAWAY messages
256+
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
257+
258+
// Stop accepting new connections and wait for existing tasks to complete
259+
log::info!("draining {} remaining tasks...", tasks.len());
260+
let shutdown_timeout = tokio::time::Duration::from_secs(20);
261+
let result = tokio::time::timeout(shutdown_timeout, async {
262+
// Actually poll tasks to completion
263+
while let Some(res) = tasks.next().await {
264+
if let Err(e) = res {
265+
log::warn!("task failed during shutdown: {:?}", e);
266+
}
257267
}
258-
});
259-
if let Err(e) = timeout.await {
260-
log::warn!("timed out waiting for tasks to be empty: {}", e);
268+
}).await;
269+
270+
match result {
271+
Ok(_) => log::info!("all tasks completed successfully"),
272+
Err(_) => log::warn!("timed out waiting for tasks after {}s", shutdown_timeout.as_secs()),
261273
}
262274
break Ok(());
263275
}

moq-relay-ietf/src/remote.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,13 @@ impl RemotesProducer {
7474
}
7575

7676
/// Run the remotes producer to serve remote requests.
77-
pub async fn run(
78-
mut self,
79-
signal_rx: broadcast::Receiver<SessionMigration>,
80-
) -> anyhow::Result<()> {
77+
pub async fn run(mut self) -> anyhow::Result<()> {
8178
let mut tasks = FuturesUnordered::new();
8279

8380
loop {
8481
tokio::select! {
8582
Some(mut remote) = self.next() => {
8683
let url = remote.url.clone();
87-
// Each remote task needs its own receiver
88-
let remote_signal_rx = signal_rx.resubscribe();
8984

9085
// Spawn a task to serve the remote
9186
tasks.push(async move {
@@ -94,7 +89,7 @@ impl RemotesProducer {
9489
log::warn!("serving remote: {:?}", info);
9590

9691
// Run the remote producer
97-
if let Err(err) = remote.run(remote_signal_rx).await {
92+
if let Err(err) = remote.run().await {
9893
log::warn!("failed serving remote: {:?}, error: {}", info, err);
9994
}
10095

@@ -232,16 +227,13 @@ impl RemoteProducer {
232227
Self { info, state }
233228
}
234229

235-
pub async fn run(
236-
&mut self,
237-
signal_rx: broadcast::Receiver<SessionMigration>,
238-
) -> anyhow::Result<()> {
230+
pub async fn run(&mut self) -> anyhow::Result<()> {
239231
// TODO reuse QUIC and MoQ sessions
240232
let (session, _quic_client_initial_cid) = self.quic.connect(&self.url).await?;
241233
let (session, subscriber) = moq_transport::session::Subscriber::connect(session).await?;
242234

243235
// Run the session
244-
let mut session = session.run(Some(signal_rx)).boxed();
236+
let mut session = session.run().boxed();
245237
let mut tasks = FuturesUnordered::new();
246238

247239
let mut done = None;

moq-relay-ietf/src/session.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@ pub struct Session {
1111

1212
impl Session {
1313
/// Run the session, producer, and consumer as necessary.
14-
pub async fn run(
15-
self,
16-
signal_rx: broadcast::Receiver<SessionMigration>,
17-
) -> Result<(), SessionError> {
14+
pub async fn run(self) -> Result<(), SessionError> {
1815
let mut tasks = FuturesUnordered::new();
19-
tasks.push(self.session.run(Some(signal_rx)).boxed());
16+
tasks.push(self.session.run().boxed());
2017

2118
if let Some(producer) = self.producer {
2219
tasks.push(producer.run().boxed());

moq-sub/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
4545
let mut media = Media::new(subscriber, tracks, out, config.catalog).await?;
4646

4747
tokio::select! {
48-
res = session.run(None) => res.context("session error")?,
48+
res = session.run() => res.context("session error")?,
4949
res = media.run() => res.context("media error")?,
5050
}
5151

0 commit comments

Comments
 (0)