Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions moq-relay-ietf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub struct Cli {
/// Requires --dev to enable the web server. Only serves files by exact CID - no index.
#[arg(long)]
pub mlog_serve: bool,

/// The public URL we advertise to other origins.
/// The provided certificate must be valid for this address.
#[arg(long)]
#[arg(default_value = "https://localhost:4443")]
pub public_url: Option<Url>,
}

#[tokio::main]
Expand Down Expand Up @@ -105,6 +111,7 @@ async fn main() -> anyhow::Result<()> {

// Create a QUIC server for media.
let relay = Relay::new(RelayConfig {
public_url: cli.public_url,
tls: tls.clone(),
bind: cli.bind,
qlog_dir: qlog_dir_for_relay,
Expand Down
99 changes: 93 additions & 6 deletions moq-relay-ietf/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use anyhow::Context;

use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_native_ietf::quic;
use moq_transport::session::SessionMigration;
use tokio::sync::broadcast;
use url::Url;

use crate::{Api, Consumer, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, Session};
Expand Down Expand Up @@ -31,10 +33,14 @@ pub struct RelayConfig {
/// Our hostname which we advertise to other origins.
/// We use QUIC, so the certificate must be valid for this address.
pub node: Option<Url>,

/// The public URL we advertise to other origins.
pub public_url: Option<Url>,
}

/// MoQ Relay server.
pub struct Relay {
public_url: Option<Url>,
quic: quic::Endpoint,
announce_url: Option<Url>,
mlog_dir: Option<PathBuf>,
Expand Down Expand Up @@ -83,6 +89,7 @@ impl Relay {
});

Ok(Self {
public_url: config.public_url,
quic,
announce_url: config.announce,
mlog_dir: config.mlog_dir,
Expand All @@ -96,6 +103,59 @@ impl Relay {
pub async fn run(self) -> anyhow::Result<()> {
let mut tasks = FuturesUnordered::new();

// Setup SIGTERM handler and broadcast channel
#[cfg(unix)]
let mut signal_term =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
let mut signal_int =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;

let (signal_tx, mut signal_rx) = broadcast::channel::<SessionMigration>(16);

// Get server address early for the shutdown signal
let server_addr = self
.quic
.server
.as_ref()
.context("missing TLS certificate")?
.local_addr()?;
// FIXME(itzmanish): this gives [::]:4433, which is not a valid URL
let shutdown_uri = if let Some(public_url) = &self.public_url {
public_url.clone().into()
} else {
format!("https://{}", server_addr)
};

// Spawn task to listen for SIGTERM and broadcast shutdown
let signal_tx_clone = signal_tx.clone();
tasks.push(
async move {
log::info!("Listening for SIGTERM");
#[cfg(unix)]
{
tokio::select! {
_ = signal_term.recv() => {
log::info!("Received SIGTERM");
}
_ = signal_int.recv() => {
log::info!("Received SIGINT");
}
}
log::info!("broadcasting shutdown to all sessions");

if let Err(e) = signal_tx.send(SessionMigration { uri: shutdown_uri }) {
log::error!("failed to broadcast shutdown: {}", e);
}
}
#[cfg(not(unix))]
{
std::future::pending::<()>().await;
}
Ok(())
}
.boxed(),
);

// Start the remotes producer task, if any
let remotes = self.remotes.map(|(producer, consumer)| {
tasks.push(producer.run().boxed());
Expand All @@ -115,10 +175,13 @@ impl Relay {
.context("failed to establish forward connection")?;

// Create the MoQ session over the connection
let (session, publisher, subscriber) =
moq_transport::session::Session::connect(session, None)
.await
.context("failed to establish forward session")?;
let (session, publisher, subscriber) = moq_transport::session::Session::connect(
session,
None,
Some(signal_tx_clone.subscribe()),
)
.await
.context("failed to establish forward session")?;

// Create a normal looking session, except we never forward or register announces.
let session = Session {
Expand Down Expand Up @@ -158,12 +221,12 @@ impl Relay {
let remotes = remotes.clone();
let forward = forward_producer.clone();
let api = self.api.clone();
let session_signal_rx = signal_tx_clone.subscribe();

// Spawn a new task to handle the connection
tasks.push(async move {

// Create the MoQ session over the connection (setup handshake etc)
let (session, publisher, subscriber) = match moq_transport::session::Session::accept(conn, mlog_path).await {
let (session, publisher, subscriber) = match moq_transport::session::Session::accept(conn, mlog_path, Some(session_signal_rx)).await {
Ok(session) => session,
Err(err) => {
log::warn!("failed to accept MoQ session: {}", err);
Expand All @@ -186,6 +249,30 @@ impl Relay {
}.boxed());
},
res = tasks.next(), if !tasks.is_empty() => res.unwrap()?,
_ = signal_rx.recv() => {
log::info!("received shutdown signal, waiting for {} active tasks to complete", tasks.len());

// Give sessions a moment to send GOAWAY messages
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;

// Stop accepting new connections and wait for existing tasks to complete
log::info!("draining {} remaining tasks...", tasks.len());
let shutdown_timeout = tokio::time::Duration::from_secs(20);
let result = tokio::time::timeout(shutdown_timeout, async {
// Actually poll tasks to completion
while let Some(res) = tasks.next().await {
if let Err(e) = res {
log::warn!("task failed during shutdown: {:?}", e);
}
}
}).await;

match result {
Ok(_) => log::info!("all tasks completed successfully"),
Err(_) => log::warn!("timed out waiting for tasks after {}s", shutdown_timeout.as_secs()),
}
break Ok(());
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions moq-relay-ietf/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
use moq_native_ietf::quic;
use moq_transport::coding::TrackNamespace;
use moq_transport::serve::{Track, TrackReader, TrackWriter};
use moq_transport::session::SessionMigration;

Check failure on line 15 in moq-relay-ietf/src/remote.rs

View workflow job for this annotation

GitHub Actions / build

unused import: `moq_transport::session::SessionMigration`
use moq_transport::watch::State;
use tokio::sync::broadcast;

Check failure on line 17 in moq-relay-ietf/src/remote.rs

View workflow job for this annotation

GitHub Actions / build

unused import: `tokio::sync::broadcast`
use url::Url;

use crate::Api;
Expand Down
6 changes: 3 additions & 3 deletions moq-relay-ietf/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_transport::session::SessionError;

use crate::{Consumer, Producer};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_transport::session::{SessionError, SessionMigration};

Check failure on line 3 in moq-relay-ietf/src/session.rs

View workflow job for this annotation

GitHub Actions / build

unused import: `SessionMigration`
use tokio::sync::broadcast;

Check failure on line 4 in moq-relay-ietf/src/session.rs

View workflow job for this annotation

GitHub Actions / build

unused import: `tokio::sync::broadcast`

pub struct Session {
pub session: moq_transport::session::Session,
Expand Down
6 changes: 6 additions & 0 deletions moq-transport/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,9 @@ message_types! {
PublishOk = 0x1e,
PublishError = 0x1f,
}

pub enum MiscMessage {
GoAway,
MaxRequestId,
RequestsBlocked,
}
66 changes: 61 additions & 5 deletions moq-transport/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ mod writer;
pub use announce::*;
pub use announced::*;
pub use error::*;
use log::info;
pub use publisher::*;
pub use subscribe::*;
pub use subscribed::*;
pub use subscriber::*;
use tokio::sync::broadcast;
pub use track_status_requested::*;

use reader::*;
Expand All @@ -24,8 +26,8 @@ use writer::*;
use futures::{stream::FuturesUnordered, StreamExt};
use std::sync::{atomic, Arc, Mutex};

use crate::coding::KeyValuePairs;
use crate::message::Message;
use crate::coding::{KeyValuePairs, SessionUri};
use crate::message::{GoAway, Message};
use crate::mlog;
use crate::watch::Queue;
use crate::{message, setup};
Expand All @@ -49,6 +51,14 @@ pub struct Session {
/// Optional mlog writer for MoQ Transport events
/// Wrapped in Arc<Mutex<>> to share across send/recv tasks when enabled
mlog: Option<Arc<Mutex<mlog::MlogWriter>>>,

/// Optional signal receiver for migration events
signal_rx: Option<broadcast::Receiver<SessionMigration>>,
}

#[derive(Clone, Debug)]
pub struct SessionMigration {
pub uri: String,
}

impl Session {
Expand All @@ -66,6 +76,7 @@ impl Session {
recver: Reader,
first_requestid: u64,
mlog: Option<mlog::MlogWriter>,
signal_rx: Option<broadcast::Receiver<SessionMigration>>,
) -> (Self, Option<Publisher>, Option<Subscriber>) {
let next_requestid = Arc::new(atomic::AtomicU64::new(first_requestid));
let outgoing = Queue::default().split();
Expand Down Expand Up @@ -93,6 +104,7 @@ impl Session {
subscriber: subscriber.clone(),
outgoing: outgoing.1,
mlog: mlog_shared,
signal_rx,
};

(session, publisher, subscriber)
Expand All @@ -103,6 +115,7 @@ impl Session {
pub async fn connect(
mut session: web_transport::Session,
mlog_path: Option<PathBuf>,
signal_rx: Option<broadcast::Receiver<SessionMigration>>,
) -> Result<(Session, Publisher, Subscriber), SessionError> {
let mlog = mlog_path.and_then(|path| {
mlog::MlogWriter::new(path)
Expand Down Expand Up @@ -135,7 +148,7 @@ impl Session {
// TODO: emit server_setup_parsed event

// We are the client, so the first request id is 0
let session = Session::new(session, sender, recver, 0, mlog);
let session = Session::new(session, sender, recver, 0, mlog, signal_rx);
Ok((session.0, session.1.unwrap(), session.2.unwrap()))
}

Expand All @@ -144,6 +157,7 @@ impl Session {
pub async fn accept(
mut session: web_transport::Session,
mlog_path: Option<PathBuf>,
signal_rx: Option<broadcast::Receiver<SessionMigration>>,
) -> Result<(Session, Option<Publisher>, Option<Subscriber>), SessionError> {
let mut mlog = mlog_path.and_then(|path| {
mlog::MlogWriter::new(path)
Expand Down Expand Up @@ -188,7 +202,7 @@ impl Session {
sender.encode(&server).await?;

// We are the server, so the first request id is 1
Ok(Session::new(session, sender, recver, 1, mlog))
Ok(Session::new(session, sender, recver, 1, mlog, signal_rx))
} else {
Err(SessionError::Version(client.versions, server_versions))
}
Expand All @@ -198,9 +212,32 @@ impl Session {
/// inbound control messages, receiving and processing new inbound uni-directional QUIC streams,
/// and receiving and processing QUIC datagrams received
pub async fn run(self) -> Result<(), SessionError> {
let mut cloned_outgoing = self.outgoing.clone();

// Spawn a task that waits for shutdown signal and pushes GOAWAY
// This runs independently and doesn't affect the main session tasks
if let Some(mut signal_rx) = self.signal_rx {
tokio::spawn(async move {
if let Ok(info) = signal_rx.recv().await {
log::info!(
"received terminate/interrupt signal, sending GOAWAY: {:#?}",
info
);
let msg = GoAway {
uri: SessionUri(info.uri),
};
if let Err(e) = cloned_outgoing.push(Message::GoAway(msg)) {
log::error!("failed to push GOAWAY: {:#?}", e);
} else {
log::info!("GOAWAY message queued successfully");
}
}
});
}

tokio::select! {
res = Self::run_recv(self.recver, self.publisher, self.subscriber.clone(), self.mlog.clone()) => res,
res = Self::run_send(self.sender, self.outgoing, self.mlog.clone()) => res,
res = Self::run_send(self.sender, self.outgoing, self.subscriber.clone(), self.mlog.clone()) => res,
res = Self::run_streams(self.webtransport.clone(), self.subscriber.clone()) => res,
res = Self::run_datagrams(self.webtransport, self.subscriber) => res,
}
Expand All @@ -210,6 +247,7 @@ impl Session {
async fn run_send(
mut sender: Writer,
mut outgoing: Queue<message::Message>,
mut subscriber: Option<Subscriber>,
mlog: Option<Arc<Mutex<mlog::MlogWriter>>>,
) -> Result<(), SessionError> {
while let Some(msg) = outgoing.pop().await {
Expand Down Expand Up @@ -256,6 +294,12 @@ impl Session {
}
}

if let Message::GoAway(_m) = &msg {
subscriber
.iter_mut()
.for_each(|s| s.handle_go_away().unwrap_or(()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me exactly what's supposed to be happening here or why this is being handled the point where we typically just log the outgoing message and then encode it for the wire. It looks like this handler is supposed to do some subscription cleanup, but I'm not sure this is where we really want to be doing it.

I'm reading this as saying "whenever we are just about to put a GOAWAY on the wire, first unsubscribe from all tracks we're subscribed to, and then put it on the wire"

I don't read the spec as saying that GOAWAY itself should impact subscription state. I can see a case for wanting to ensure a subscriber client unsubscribes from all active subscriptions prior to sending GOAWAY, but this seems like a very intrusive and awkward way to enforce that. I wonder if it might be better to error if a client tries to send a GOAWAY while it still has open subscriptions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me exactly what's supposed to be happening here or why this is being handled the point where we typically just log the outgoing message and then encode it for the wire. It looks like this handler is supposed to do some subscription cleanup, but I'm not sure this is where we really want to be doing it.

run_send independently runs in seperate thread which means we can know if server is sending a go_away only by either intercepting the message or take signal_rx in the parameters.

I'm reading this as saying "whenever we are just about to put a GOAWAY on the wire, first unsubscribe from all tracks we're subscribed to, and then put it on the wire"

This is correct. To unsubscribe from all tracks the current design have one way, which is dropping the Subscribed optimistically which then sends unsubscribe to the downstream and then send go_away. The client makes sure before dropping the connection that publish_done is responded for all unsubscribe message.

I don't read the spec as saying that GOAWAY itself should impact subscription state. I can see a case for wanting to ensure a subscriber client unsubscribes from all active subscriptions prior to sending GOAWAY, but this seems like a very intrusive and awkward way to enforce that. I wonder if it might be better to error if a client tries to send a GOAWAY while it still has open subscriptions?

Even I want to keep the Subscribed state and just put a flag that it's closed and send unsubscribe and when it gets publish done only then the state gets cleared but this requires changing a lot of code. In fact the client side code always first send unsubscribe and then when gets publish_done it closes the state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think for this initial implementation it would be better to punt on strictly enforcing the unsubscribes at this layer and just make it possible to send GOAWAY without any surprising side effects. It'd be nice to provide guardrails for applications, but it sounds like doing that correctly would be a lot more work right now. I'm OK saying that applications should ultimately be responsible for correctly managing their session state (including unsubscribing from tracks before sending GOAWAY). Given that we're already talking about session teardown in this case the worst that can happen is that an application will do something wrong and end up accidentally ending a session with a PROTOCOL_VIOLATION instead.


sender.encode(&msg).await?;
}

Expand Down Expand Up @@ -340,6 +384,18 @@ impl Session {
Err(msg) => msg,
};

let msg = match msg {
Message::GoAway(goaway) => {
info!("Received GOAWAY: {:?}", goaway);
subscriber
.as_mut()
.ok_or(SessionError::RoleViolation)?
.handle_go_away()?;
continue;
}
_ => msg,
};

// TODO GOAWAY, MAX_REQUEST_ID, REQUESTS_BLOCKED
log::warn!("Unimplemented message type received: {:?}", msg);
return Err(SessionError::unimplemented(&format!(
Expand Down
4 changes: 2 additions & 2 deletions moq-transport/src/session/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ impl Publisher {
pub async fn accept(
session: web_transport::Session,
) -> Result<(Session, Publisher), SessionError> {
let (session, publisher, _) = Session::accept(session, None).await?;
let (session, publisher, _) = Session::accept(session, None, None).await?;
Ok((session, publisher.unwrap()))
}

pub async fn connect(
session: web_transport::Session,
) -> Result<(Session, Publisher), SessionError> {
let (session, publisher, _) = Session::connect(session, None).await?;
let (session, publisher, _) = Session::connect(session, None, None).await?;
Ok((session, publisher))
}

Expand Down
Loading
Loading