From 02b232add9673c9fd327872b084ebe90135c24d5 Mon Sep 17 00:00:00 2001 From: Manish Date: Wed, 26 Nov 2025 16:00:11 +0530 Subject: [PATCH 01/10] refactor: restructure relay into lib/bin and add coordinator interface --- Cargo.lock | 1 + moq-relay-ietf/Cargo.toml | 13 +- moq-relay-ietf/src/{ => bin}/main.rs | 30 +- moq-relay-ietf/src/consumer.rs | 24 +- moq-relay-ietf/src/coordinator.rs | 166 +++++++++ moq-relay-ietf/src/coordinator_local.rs | 435 ++++++++++++++++++++++++ moq-relay-ietf/src/lib.rs | 51 +++ moq-relay-ietf/src/relay.rs | 23 +- 8 files changed, 705 insertions(+), 38 deletions(-) rename moq-relay-ietf/src/{ => bin}/main.rs (91%) create mode 100644 moq-relay-ietf/src/coordinator.rs create mode 100644 moq-relay-ietf/src/coordinator_local.rs create mode 100644 moq-relay-ietf/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4949f366..bd73b8ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1282,6 +1282,7 @@ name = "moq-relay-ietf" version = "0.7.5" dependencies = [ "anyhow", + "async-trait", "axum", "clap", "env_logger", diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index cd3e4d82..ea86845f 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "moq-relay-ietf" description = "Media over QUIC" -authors = ["Luke Curley"] -repository = "https://github.com/englishm/moq-rs" +authors = ["Luke Curley", "Manish Kumar Pandit"] +repository = "https://github.com/cloudflare/moq-rs" license = "MIT OR Apache-2.0" version = "0.7.5" @@ -11,6 +11,14 @@ edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] +[lib] +name = "moq_relay_ietf" +path = "src/lib.rs" + +[[bin]] +name = "moq-relay-ietf" +path = "src/bin/main.rs" + [dependencies] moq-transport = { path = "../moq-transport", version = "0.11" } moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } @@ -22,6 +30,7 @@ url = "2" # Async stuff tokio = { version = "1", features = ["full"] } futures = "0.3" +async-trait = "0.1" # Web server to serve the fingerprint axum = { version = "0.7", features = ["tokio"] } diff --git a/moq-relay-ietf/src/main.rs b/moq-relay-ietf/src/bin/main.rs similarity index 91% rename from moq-relay-ietf/src/main.rs rename to moq-relay-ietf/src/bin/main.rs index 995b28ca..aaa2dab3 100644 --- a/moq-relay-ietf/src/main.rs +++ b/moq-relay-ietf/src/bin/main.rs @@ -1,26 +1,10 @@ -use clap::Parser; - -mod api; -mod consumer; -mod local; -mod producer; -mod relay; -mod remote; -mod session; -mod web; - -pub use api::*; -pub use consumer::*; -pub use local::*; -pub use producer::*; -pub use relay::*; -pub use remote::*; -pub use session::*; -pub use web::*; - use std::{net, path::PathBuf}; + +use clap::Parser; use url::Url; +use moq_relay_ietf::{LocalCoordinator, Relay, RelayConfig, Web, WebConfig}; + #[derive(Parser, Clone)] pub struct Cli { /// Listen on this address @@ -103,6 +87,11 @@ async fn main() -> anyhow::Result<()> { None }; + // Create the coordinator + // For now we always use LocalCoordinator. Later we can add HttpCoordinator + // based on --api and --node CLI args. + let coordinator = LocalCoordinator::new(); + // Create a QUIC server for media. let relay = Relay::new(RelayConfig { tls: tls.clone(), @@ -112,6 +101,7 @@ async fn main() -> anyhow::Result<()> { node: cli.node, api: cli.api, announce: cli.announce, + coordinator, })?; if cli.dev { diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index 85242d09..b79531b1 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::Context; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_transport::{ @@ -5,14 +7,14 @@ use moq_transport::{ session::{Announced, SessionError, Subscriber}, }; -use crate::{Api, Locals, Producer}; +use crate::{Coordinator, Locals, Producer}; /// Consumer of tracks from a remote Publisher #[derive(Clone)] pub struct Consumer { remote: Subscriber, locals: Locals, - api: Option, + coordinator: Arc, forward: Option, // Forward all announcements to this subscriber } @@ -20,13 +22,13 @@ impl Consumer { pub fn new( remote: Subscriber, locals: Locals, - api: Option, + coordinator: Arc, forward: Option, ) -> Self { Self { remote, locals, - api, + coordinator, forward, } } @@ -64,13 +66,13 @@ impl Consumer { // Produce the tracks for this announce and return the reader let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce(); - // Start refreshing the API origin, if any - if let Some(api) = self.api.as_ref() { - let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?; - tasks.push( - async move { refresh.run().await.context("failed refreshing origin") }.boxed(), - ); - } + // Register namespace with the coordinator + let namespace_path = reader.namespace.to_utf8_path(); + let _namespace_registration = self + .coordinator + .register_namespace(&namespace_path) + .await + .context("failed to register namespace with coordinator")?; // Register the local tracks, unregister on drop let _register = self.locals.register(reader.clone()).await?; diff --git a/moq-relay-ietf/src/coordinator.rs b/moq-relay-ietf/src/coordinator.rs new file mode 100644 index 00000000..18da56e1 --- /dev/null +++ b/moq-relay-ietf/src/coordinator.rs @@ -0,0 +1,166 @@ +use anyhow::Result; +use async_trait::async_trait; +use url::Url; + +/// Handle returned when a namespace is registered with the coordinator. +/// +/// Dropping this handle automatically unregisters the namespace. +/// This provides RAII-based cleanup - when the publisher disconnects +/// or the namespace is no longer served, cleanup happens automatically. +pub struct NamespaceRegistration { + _inner: Box, +} + +impl NamespaceRegistration { + /// Create a new registration handle wrapping any Send + Sync type. + /// + /// The wrapped value's `Drop` implementation will be called when + /// this registration is dropped. + pub fn new(inner: T) -> Self { + Self { + _inner: Box::new(inner), + } + } +} + +/// Handle returned when a track is registered under a namespace. +/// +/// Dropping this handle automatically unregisters the track. +/// The namespace remains registered even after all tracks are dropped. +pub struct TrackRegistration { + _inner: Box, +} + +impl TrackRegistration { + /// Create a new track registration handle. + pub fn new(inner: T) -> Self { + Self { + _inner: Box::new(inner), + } + } +} + +/// Result of a namespace lookup. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NamespaceOrigin { + /// Namespace is served locally by this relay. + Local, + + /// Namespace is served by a remote relay at the given URL. + Remote(Url), +} + +/// Information about a track within a namespace. +#[derive(Debug, Clone)] +pub struct TrackInfo { + /// The track namespace + pub namespace: String, + + /// The track name within the namespace + pub track_name: String, + + /// Track alias for quick lookup + pub track_alias: u64, +} + +/// Coordinator handles namespace and track registration/discovery across relays. +/// +/// Implementations are responsible for: +/// - Tracking which namespaces are served locally +/// - Tracking which tracks are available under each namespace +/// - Caching remote namespace lookups +/// - Communicating with external registries (HTTP API, Redis, etc.) +/// - Periodic refresh/heartbeat of registrations +/// - Cleanup when registrations are dropped +/// +/// # Thread Safety +/// +/// All methods take `&self` and implementations must be thread-safe. +/// Multiple tasks will call these methods concurrently. +#[async_trait] +pub trait Coordinator: Send + Sync { + /// Register a namespace as locally available on this relay. + /// + /// Called when a publisher sends PUBLISH_NAMESPACE. + /// The coordinator should: + /// 1. Record the namespace as locally available + /// 2. Advertise to external registry if configured + /// 3. Start any refresh/heartbeat tasks + /// 4. Return a handle that unregisters on drop + /// + /// # Arguments + /// + /// * `namespace` - The namespace being registered + /// + /// # Returns + /// + /// A `NamespaceRegistration` handle. The namespace remains registered + /// as long as this handle is held. Dropping it unregisters the namespace. + async fn register_namespace(&self, namespace: &str) -> Result; + + /// Unregister a namespace. + /// + /// Called when a publisher sends PUBLISH_NAMESPACE_DONE. + /// This is an explicit unregistration - the registration handle may still exist + /// but the namespace should be removed from the registry. + /// + /// # Arguments + /// + /// * `namespace` - The namespace to unregister + async fn unregister_namespace(&self, namespace: &str) -> Result<()>; + + /// Register a track as available under a namespace. + /// + /// Called when a publisher sends PUBLISH for a track. + /// The namespace must already be registered. + /// + /// # Arguments + /// + /// * `track_info` - Information about the track being registered + /// + /// # Returns + /// + /// A `TrackRegistration` handle. The track remains registered + /// as long as this handle is held. + async fn register_track(&self, track_info: TrackInfo) -> Result; + + /// Unregister a track. + /// + /// Called when a publisher sends PUBLISH_DONE. + /// Only the track is removed, not the namespace. + /// + /// # Arguments + /// + /// * `namespace` - The namespace containing the track + /// * `track_name` - The track name to unregister + async fn unregister_track(&self, namespace: &str, track_name: &str) -> Result<()>; + + /// Lookup where a namespace is served from. + /// + /// Called when a subscriber requests a namespace. + /// The coordinator should check in order: + /// 1. Local registrations (return `Local`) + /// 2. Cached remote lookups (return `Remote(url)` if not expired) + /// 3. External registry (cache and return result) + /// + /// # Arguments + /// + /// * `namespace` - The namespace to look up + /// + /// # Returns + /// + /// - `Ok(Some(NamespaceOrigin::Local))` - Served by this relay + /// - `Ok(Some(NamespaceOrigin::Remote(url)))` - Served by remote relay + /// - `Ok(None)` - Namespace not found anywhere + async fn lookup(&self, namespace: &str) -> Result>; + + /// Graceful shutdown of the coordinator. + /// + /// Called when the relay is shutting down. Implementations should: + /// - Unregister all local namespaces and tracks + /// - Cancel refresh tasks + /// - Close connections to external registries + async fn shutdown(&self) -> Result<()> { + Ok(()) + } +} diff --git a/moq-relay-ietf/src/coordinator_local.rs b/moq-relay-ietf/src/coordinator_local.rs new file mode 100644 index 00000000..839e053f --- /dev/null +++ b/moq-relay-ietf/src/coordinator_local.rs @@ -0,0 +1,435 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, RwLock, Weak}; + +use anyhow::Result; +use async_trait::async_trait; + +use crate::coordinator::{ + Coordinator, NamespaceOrigin, NamespaceRegistration, TrackInfo, TrackRegistration, +}; + +/// Internal state shared between LocalCoordinator and its drop guards. +struct LocalCoordinatorState { + /// Local namespaces currently registered + namespaces: RwLock>, + + /// Tracks registered under each namespace + /// Maps namespace -> set of track names + tracks: RwLock>>, +} + +/// Drop guard for namespace registration. +/// When dropped, removes the namespace from the coordinator. +struct NamespaceDropGuard { + namespace: String, + state: Weak, +} + +impl Drop for NamespaceDropGuard { + fn drop(&mut self) { + if let Some(state) = self.state.upgrade() { + log::debug!( + "local coordinator: auto-unregistering namespace {} (drop guard)", + self.namespace + ); + state.namespaces.write().unwrap().remove(&self.namespace); + state.tracks.write().unwrap().remove(&self.namespace); + } + } +} + +/// Drop guard for track registration. +/// When dropped, removes the track from the coordinator. +struct TrackDropGuard { + namespace: String, + track_name: String, + state: Weak, +} + +impl Drop for TrackDropGuard { + fn drop(&mut self) { + if let Some(state) = self.state.upgrade() { + log::debug!( + "local coordinator: auto-unregistering track {}/{} (drop guard)", + self.namespace, + self.track_name + ); + if let Some(tracks) = state.tracks.write().unwrap().get_mut(&self.namespace) { + tracks.remove(&self.track_name); + } + } + } +} + +/// Local coordinator for single-relay deployments. +/// +/// This coordinator does not communicate with any external service. +/// It tracks local namespaces and tracks in memory but does not +/// advertise them to other relays. +/// +/// # Use Cases +/// +/// - Development and testing +/// - Single-relay deployments +/// - Environments without external registry +/// +/// # Example +/// +/// ```rust,ignore +/// use moq_relay_ietf::{Relay, RelayConfig, LocalCoordinator}; +/// +/// let config = RelayConfig { +/// coordinator: LocalCoordinator::new(), +/// // ... +/// }; +/// let relay = Relay::new(config)?; +/// ``` +pub struct LocalCoordinator { + state: Arc, +} + +impl LocalCoordinator { + /// Create a new local coordinator. + pub fn new() -> Arc { + Arc::new(Self { + state: Arc::new(LocalCoordinatorState { + namespaces: RwLock::new(HashSet::new()), + tracks: RwLock::new(HashMap::new()), + }), + }) + } + + /// Get the number of registered namespaces (for testing). + #[cfg(test)] + pub fn namespace_count(&self) -> usize { + self.state.namespaces.read().unwrap().len() + } + + /// Get the number of tracks under a namespace (for testing). + #[cfg(test)] + pub fn track_count(&self, namespace: &str) -> usize { + self.state + .tracks + .read() + .unwrap() + .get(namespace) + .map(|t| t.len()) + .unwrap_or(0) + } + + /// Check if a namespace is registered (for testing). + #[cfg(test)] + pub fn has_namespace(&self, namespace: &str) -> bool { + self.state.namespaces.read().unwrap().contains(namespace) + } + + /// Check if a track is registered (for testing). + #[cfg(test)] + pub fn has_track(&self, namespace: &str, track_name: &str) -> bool { + self.state + .tracks + .read() + .unwrap() + .get(namespace) + .map(|t| t.contains(track_name)) + .unwrap_or(false) + } +} + +impl Default for LocalCoordinator { + fn default() -> Self { + Self { + state: Arc::new(LocalCoordinatorState { + namespaces: RwLock::new(HashSet::new()), + tracks: RwLock::new(HashMap::new()), + }), + } + } +} + +#[async_trait] +impl Coordinator for LocalCoordinator { + async fn register_namespace(&self, namespace: &str) -> Result { + log::debug!("local coordinator: registering namespace {}", namespace); + + self.state + .namespaces + .write() + .unwrap() + .insert(namespace.to_string()); + + // Initialize empty track set for this namespace + self.state + .tracks + .write() + .unwrap() + .entry(namespace.to_string()) + .or_default(); + + // Return a drop guard that will unregister the namespace when dropped + let guard = NamespaceDropGuard { + namespace: namespace.to_string(), + state: Arc::downgrade(&self.state), + }; + Ok(NamespaceRegistration::new(guard)) + } + + async fn unregister_namespace(&self, namespace: &str) -> Result<()> { + log::debug!("local coordinator: unregistering namespace {}", namespace); + + self.state.namespaces.write().unwrap().remove(namespace); + self.state.tracks.write().unwrap().remove(namespace); + + Ok(()) + } + + async fn register_track(&self, track_info: TrackInfo) -> Result { + log::debug!( + "local coordinator: registering track {}/{} (alias={})", + track_info.namespace, + track_info.track_name, + track_info.track_alias + ); + + self.state + .tracks + .write() + .unwrap() + .entry(track_info.namespace.clone()) + .or_default() + .insert(track_info.track_name.clone()); + + // Return a drop guard that will unregister the track when dropped + let guard = TrackDropGuard { + namespace: track_info.namespace, + track_name: track_info.track_name, + state: Arc::downgrade(&self.state), + }; + Ok(TrackRegistration::new(guard)) + } + + async fn unregister_track(&self, namespace: &str, track_name: &str) -> Result<()> { + log::debug!( + "local coordinator: unregistering track {}/{}", + namespace, + track_name + ); + + if let Some(tracks) = self.state.tracks.write().unwrap().get_mut(namespace) { + tracks.remove(track_name); + } + + Ok(()) + } + + async fn lookup(&self, namespace: &str) -> Result> { + // Check if we have this namespace locally + if self.state.namespaces.read().unwrap().contains(namespace) { + log::debug!("local coordinator: lookup {} -> Local", namespace); + return Ok(Some(NamespaceOrigin::Local)); + } + + // Single node - if not local, it doesn't exist + log::debug!("local coordinator: lookup {} -> not found", namespace); + Ok(None) + } + + async fn shutdown(&self) -> Result<()> { + log::debug!("local coordinator: shutdown"); + + self.state.namespaces.write().unwrap().clear(); + self.state.tracks.write().unwrap().clear(); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_coordinator() -> Arc { + Arc::new(LocalCoordinator::default()) + } + + #[tokio::test] + async fn test_register_namespace() { + let coord = create_coordinator(); + + let _reg = coord.register_namespace("live/stream").await.unwrap(); + + assert!(coord.has_namespace("live/stream")); + assert_eq!(coord.namespace_count(), 1); + } + + #[tokio::test] + async fn test_unregister_namespace() { + let coord = create_coordinator(); + + let _reg = coord.register_namespace("live/stream").await.unwrap(); + assert!(coord.has_namespace("live/stream")); + + coord.unregister_namespace("live/stream").await.unwrap(); + assert!(!coord.has_namespace("live/stream")); + assert_eq!(coord.namespace_count(), 0); + } + + #[tokio::test] + async fn test_register_track() { + let coord = create_coordinator(); + + let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); + + let track_info = TrackInfo { + namespace: "live/stream".to_string(), + track_name: "audio".to_string(), + track_alias: 1, + }; + let _track_reg = coord.register_track(track_info).await.unwrap(); + + assert!(coord.has_track("live/stream", "audio")); + assert_eq!(coord.track_count("live/stream"), 1); + } + + #[tokio::test] + async fn test_unregister_track_keeps_namespace() { + let coord = create_coordinator(); + + let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); + + let track_info = TrackInfo { + namespace: "live/stream".to_string(), + track_name: "audio".to_string(), + track_alias: 1, + }; + let _track_reg = coord.register_track(track_info).await.unwrap(); + + // Unregister track explicitly + coord + .unregister_track("live/stream", "audio") + .await + .unwrap(); + + // Track should be gone but namespace should remain + assert!(!coord.has_track("live/stream", "audio")); + assert!(coord.has_namespace("live/stream")); + } + + #[tokio::test] + async fn test_unregister_namespace_removes_tracks() { + let coord = create_coordinator(); + + let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); + + let track_info = TrackInfo { + namespace: "live/stream".to_string(), + track_name: "audio".to_string(), + track_alias: 1, + }; + let _track_reg = coord.register_track(track_info).await.unwrap(); + + // Unregister namespace explicitly + coord.unregister_namespace("live/stream").await.unwrap(); + + // Both namespace and tracks should be gone + assert!(!coord.has_namespace("live/stream")); + assert_eq!(coord.track_count("live/stream"), 0); + } + + #[tokio::test] + async fn test_lookup_local() { + let coord = create_coordinator(); + + let _reg = coord.register_namespace("live/stream").await.unwrap(); + + let result = coord.lookup("live/stream").await.unwrap(); + assert_eq!(result, Some(NamespaceOrigin::Local)); + } + + #[tokio::test] + async fn test_lookup_not_found() { + let coord = create_coordinator(); + + let result = coord.lookup("unknown/stream").await.unwrap(); + assert_eq!(result, None); + } + + #[tokio::test] + async fn test_shutdown_clears_all() { + let coord = create_coordinator(); + + let _ns1_reg = coord.register_namespace("ns1").await.unwrap(); + let _ns2_reg = coord.register_namespace("ns2").await.unwrap(); + + let track_info = TrackInfo { + namespace: "ns1".to_string(), + track_name: "track1".to_string(), + track_alias: 1, + }; + let _track_reg = coord.register_track(track_info).await.unwrap(); + + coord.shutdown().await.unwrap(); + + assert_eq!(coord.namespace_count(), 0); + assert_eq!(coord.track_count("ns1"), 0); + } + + #[tokio::test] + async fn test_multiple_tracks_per_namespace() { + let coord = create_coordinator(); + + let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); + + let mut _track_regs = Vec::new(); + for (i, name) in ["audio", "video", "data"].iter().enumerate() { + let track_info = TrackInfo { + namespace: "live/stream".to_string(), + track_name: name.to_string(), + track_alias: i as u64, + }; + _track_regs.push(coord.register_track(track_info).await.unwrap()); + } + + assert_eq!(coord.track_count("live/stream"), 3); + assert!(coord.has_track("live/stream", "audio")); + assert!(coord.has_track("live/stream", "video")); + assert!(coord.has_track("live/stream", "data")); + } + + #[tokio::test] + async fn test_drop_unregisters_namespace() { + let coord = create_coordinator(); + + { + let _reg = coord.register_namespace("live/stream").await.unwrap(); + assert!(coord.has_namespace("live/stream")); + // _reg dropped here + } + + // Namespace should be auto-unregistered + assert!(!coord.has_namespace("live/stream")); + assert_eq!(coord.namespace_count(), 0); + } + + #[tokio::test] + async fn test_drop_unregisters_track() { + let coord = create_coordinator(); + + let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); + + { + let track_info = TrackInfo { + namespace: "live/stream".to_string(), + track_name: "audio".to_string(), + track_alias: 1, + }; + let _track_reg = coord.register_track(track_info).await.unwrap(); + assert!(coord.has_track("live/stream", "audio")); + // _track_reg dropped here + } + + // Track should be auto-unregistered, but namespace remains + assert!(!coord.has_track("live/stream", "audio")); + assert!(coord.has_namespace("live/stream")); + } +} diff --git a/moq-relay-ietf/src/lib.rs b/moq-relay-ietf/src/lib.rs new file mode 100644 index 00000000..0b258514 --- /dev/null +++ b/moq-relay-ietf/src/lib.rs @@ -0,0 +1,51 @@ +//! MoQ Relay library for building Media over QUIC relay servers. +//! +//! This crate provides the core relay functionality that can be embedded +//! into other applications. The relay handles: +//! +//! - Accepting QUIC connections from publishers and subscribers +//! - Routing media between local and remote endpoints +//! - Coordinating namespace/track registration across relay clusters +//! +//! # Example +//! +//! ```rust,ignore +//! use std::sync::Arc; +//! use moq_relay_ietf::{Relay, RelayConfig, LocalCoordinator}; +//! +//! // Create a coordinator (LocalCoordinator for single-relay deployments) +//! let coordinator = LocalCoordinator::new(); +//! +//! // Configure and create the relay +//! let relay = Relay::new(RelayConfig { +//! bind: "[::]:443".parse().unwrap(), +//! tls: tls_config, +//! coordinator, +//! // ... other options +//! })?; +//! +//! // Run the relay +//! relay.run().await?; +//! ``` + +mod api; +mod consumer; +mod coordinator; +mod coordinator_local; +mod local; +mod producer; +mod relay; +mod remote; +mod session; +mod web; + +pub use api::*; +pub use consumer::*; +pub use coordinator::*; +pub use coordinator_local::*; +pub use local::*; +pub use producer::*; +pub use relay::*; +pub use remote::*; +pub use session::*; +pub use web::*; diff --git a/moq-relay-ietf/src/relay.rs b/moq-relay-ietf/src/relay.rs index 41a68237..bc78658d 100644 --- a/moq-relay-ietf/src/relay.rs +++ b/moq-relay-ietf/src/relay.rs @@ -1,4 +1,4 @@ -use std::{net, path::PathBuf}; +use std::{net, path::PathBuf, sync::Arc}; use anyhow::Context; @@ -6,7 +6,10 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_native_ietf::quic; use url::Url; -use crate::{Api, Consumer, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, Session}; +use crate::{ + Api, Consumer, Coordinator, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, + Session, +}; /// Configuration for the relay. pub struct RelayConfig { @@ -31,6 +34,9 @@ pub struct RelayConfig { /// Our hostname which we advertise to other origins. /// We use QUIC, so the certificate must be valid for this address. pub node: Option, + + /// The coordinator for namespace/track registration and discovery. + pub coordinator: Arc, } /// MoQ Relay server. @@ -41,6 +47,7 @@ pub struct Relay { locals: Locals, api: Option, remotes: Option<(RemotesProducer, RemotesConsumer)>, + coordinator: Arc, } impl Relay { @@ -89,6 +96,7 @@ impl Relay { api, locals, remotes, + coordinator: config.coordinator, }) } @@ -128,7 +136,12 @@ impl Relay { self.locals.clone(), remotes.clone(), )), - consumer: Some(Consumer::new(subscriber, self.locals.clone(), None, None)), + consumer: Some(Consumer::new( + subscriber, + self.locals.clone(), + self.coordinator.clone(), + None, + )), }; let forward_producer = session.producer.clone(); @@ -157,7 +170,7 @@ impl Relay { let locals = self.locals.clone(); let remotes = remotes.clone(); let forward = forward_producer.clone(); - let api = self.api.clone(); + let coordinator = self.coordinator.clone(); // Spawn a new task to handle the connection tasks.push(async move { @@ -175,7 +188,7 @@ impl Relay { let session = Session { session, producer: publisher.map(|publisher| Producer::new(publisher, locals.clone(), remotes)), - consumer: subscriber.map(|subscriber| Consumer::new(subscriber, locals, api, forward)), + consumer: subscriber.map(|subscriber| Consumer::new(subscriber, locals, coordinator, forward)), }; if let Err(err) = session.run().await { From 492309ada080d166e353dd345b6d375fc250f69a Mon Sep 17 00:00:00 2001 From: Manish Date: Fri, 28 Nov 2025 17:48:51 +0530 Subject: [PATCH 02/10] feat: add file-based coordinator and rewrote remote for handling remote streams --- Cargo.lock | 13 + moq-native-ietf/src/quic.rs | 182 ++++++- moq-relay-ietf/Cargo.toml | 9 +- .../bin/moq-relay-ietf/file_coordinator.rs | 341 ++++++++++++ .../src/bin/{ => moq-relay-ietf}/main.rs | 26 +- moq-relay-ietf/src/consumer.rs | 15 +- moq-relay-ietf/src/coordinator.rs | 47 +- moq-relay-ietf/src/coordinator_local.rs | 435 --------------- moq-relay-ietf/src/lib.rs | 2 - moq-relay-ietf/src/local.rs | 4 +- moq-relay-ietf/src/producer.rs | 69 ++- moq-relay-ietf/src/relay.rs | 55 +- moq-relay-ietf/src/remote.rs | 503 +++++------------- 13 files changed, 782 insertions(+), 919 deletions(-) create mode 100644 moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs rename moq-relay-ietf/src/bin/{ => moq-relay-ietf}/main.rs (79%) delete mode 100644 moq-relay-ietf/src/coordinator_local.rs diff --git a/Cargo.lock b/Cargo.lock index bd73b8ae..2635604d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -620,6 +620,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "795cbfc56d419a7ce47ccbb7504dd9a5b7c484c083c356e797de08bd988d9629" +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -1286,6 +1296,7 @@ dependencies = [ "axum", "clap", "env_logger", + "fs2", "futures", "hex", "hyper-serve", @@ -1293,6 +1304,8 @@ dependencies = [ "moq-api", "moq-native-ietf", "moq-transport", + "serde", + "serde_json", "tokio", "tower-http", "tracing", diff --git a/moq-native-ietf/src/quic.rs b/moq-native-ietf/src/quic.rs index 0919fc20..e00711a5 100644 --- a/moq-native-ietf/src/quic.rs +++ b/moq-native-ietf/src/quic.rs @@ -1,4 +1,5 @@ use std::{ + fmt, fs::File, io::BufWriter, net, @@ -17,6 +18,25 @@ use futures::future::BoxFuture; use futures::stream::{FuturesUnordered, StreamExt}; use futures::FutureExt; +/// Represents the address family of the local QUIC socket. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AddressFamily { + Ipv4, + Ipv6, + /// IPv6 with dual-stack support (Linux) + Ipv6DualStack, +} + +impl fmt::Display for AddressFamily { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AddressFamily::Ipv4 => write!(f, "IPv4"), + AddressFamily::Ipv6 => write!(f, "IPv6"), + AddressFamily::Ipv6DualStack => write!(f, "IPv6 (dual stack)"), + } + } +} + /// Build a TransportConfig with our standard settings /// /// This is used both for the base endpoint config and when creating @@ -57,20 +77,43 @@ impl Default for Args { impl Args { pub fn load(&self) -> anyhow::Result { let tls = self.tls.load()?; - Ok(Config { - bind: self.bind, - qlog_dir: self.qlog_dir.clone(), - tls, - }) + Ok(Config::new(self.bind, self.qlog_dir.clone(), tls)) } } pub struct Config { - pub bind: net::SocketAddr, + pub bind: Option, + pub socket: net::UdpSocket, pub qlog_dir: Option, pub tls: tls::Config, } +impl Config { + pub fn new(bind: net::SocketAddr, qlog_dir: Option, tls: tls::Config) -> Self { + Self { + bind: Some(bind), + socket: net::UdpSocket::bind(bind) + .context("failed to bind socket") + .unwrap(), + qlog_dir, + tls, + } + } + + pub fn with_socket( + socket: net::UdpSocket, + qlog_dir: Option, + tls: tls::Config, + ) -> Self { + Self { + bind: None, + socket, + qlog_dir, + tls, + } + } +} + pub struct Endpoint { pub client: Client, pub server: Option, @@ -111,13 +154,13 @@ impl Endpoint { // There's a bit more boilerplate to make a generic endpoint. let runtime = quinn::default_runtime().context("no async runtime")?; let endpoint_config = quinn::EndpointConfig::default(); - let socket = std::net::UdpSocket::bind(config.bind).context("failed to bind UDP socket")?; + let socket = config.socket; // Create the generic QUIC endpoint. let quic = quinn::Endpoint::new(endpoint_config, server_config.clone(), socket, runtime) .context("failed to create QUIC endpoint")?; - let server = server_config.clone().map(|base_server_config| Server { + let server = server_config.map(|base_server_config| Server { quic: quic.clone(), accept: Default::default(), qlog_dir: config.qlog_dir.map(Arc::new), @@ -270,7 +313,34 @@ pub struct Client { } impl Client { - pub async fn connect(&self, url: &Url) -> anyhow::Result<(web_transport::Session, String)> { + /// Returns the local address of the QUIC socket. + pub fn local_addr(&self) -> anyhow::Result { + self.quic + .local_addr() + .context("failed to get local address") + } + + /// Returns the address family of the local QUIC socket. + pub fn address_family(&self) -> anyhow::Result { + let local_addr = self + .quic + .local_addr() + .context("failed to get local socket address")?; + + if local_addr.is_ipv4() { + Ok(AddressFamily::Ipv4) + } else if cfg!(target_os = "linux") { + Ok(AddressFamily::Ipv6DualStack) + } else { + Ok(AddressFamily::Ipv6) + } + } + + pub async fn connect( + &self, + url: &Url, + socket_addr: Option, + ) -> anyhow::Result<(web_transport::Session, String)> { let mut config = self.config.clone(); // TODO support connecting to both ALPNs at the same time @@ -303,12 +373,15 @@ impl Client { let host = url.host().context("invalid DNS name")?.to_string(); let port = url.port().unwrap_or(443); - // Look up the DNS entry. - let addr = tokio::net::lookup_host((host.clone(), port)) - .await - .context("failed DNS lookup")? - .next() - .context("no DNS entries")?; + // Look up the DNS entry and filter by socket address family. + let addr = match socket_addr { + Some(addr) => addr, + None => { + // Default DNS resolution logic + self.resolve_dns(&host, port, self.address_family()?) + .await? + } + }; let connection = self.quic.connect_with(config, addr, &host)?.await?; @@ -328,4 +401,83 @@ impl Client { Ok((session.into(), connection_id_hex)) } + + /// Default DNS resolution logic that filters results by address family. + async fn resolve_dns( + &self, + host: &str, + port: u16, + address_family: AddressFamily, + ) -> anyhow::Result { + let local_addr = self.local_addr()?; + + // Collect all DNS results + let addrs: Vec = tokio::net::lookup_host((host, port)) + .await + .context("failed DNS lookup")? + .collect(); + + if addrs.is_empty() { + anyhow::bail!("DNS lookup for host '{}' returned no addresses", host); + } + + // Log all DNS results for debugging + log::debug!( + "DNS lookup for {}, family {:?}: found {} results", + host, + address_family, + addrs.len() + ); + for (i, addr) in addrs.iter().enumerate() { + log::debug!( + " DNS[{}]: {} ({})", + i, + addr, + if addr.is_ipv4() { "IPv4" } else { "IPv6" } + ); + } + + // Filter DNS results to match our local socket's address family + let compatible_addr = match address_family { + AddressFamily::Ipv4 => { + // IPv4 socket: filter to IPv4 addresses + addrs + .iter() + .find(|a| a.is_ipv4()) + .cloned() + .context(format!( + "No IPv4 address found for host '{}' (local socket is IPv4: {})", + host, local_addr + ))? + } + AddressFamily::Ipv6DualStack => { + // IPv6 socket on Linux: dual-stack, use first result + log::debug!( + "Using first DNS result (Linux IPv6 dual-stack): {}", + addrs[0] + ); + addrs[0] + } + AddressFamily::Ipv6 => { + // IPv6 socket non-Linux: filter to IPv6 addresses + addrs + .iter() + .find(|a| a.is_ipv6()) + .cloned() + .context(format!( + "No IPv6 address found for host '{}' (local socket is IPv6: {})", + host, local_addr + ))? + } + }; + + log::debug!( + "Connecting from {} to {} (selected from {} DNS results)", + local_addr, + compatible_addr, + addrs.len() + ); + + Ok(compatible_addr) + } } diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index ea86845f..2fd82a63 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -17,7 +17,7 @@ path = "src/lib.rs" [[bin]] name = "moq-relay-ietf" -path = "src/bin/main.rs" +path = "src/bin/moq-relay-ietf/main.rs" [dependencies] moq-transport = { path = "../moq-transport", version = "0.11" } @@ -40,6 +40,13 @@ hyper-serve = { version = "0.6", features = [ tower-http = { version = "0.5", features = ["cors"] } hex = "0.4" +# Serialization +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +# File locking +fs2 = "0.4" + # Error handling anyhow = { version = "1", features = ["backtrace"] } diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs new file mode 100644 index 00000000..7f1e00cd --- /dev/null +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs @@ -0,0 +1,341 @@ +//! File-based coordinator for multi-relay deployments. +//! +//! This coordinator uses a shared JSON file with file locking to coordinate +//! namespace/track registration across multiple relay instances. No separate +//! server process is required. + +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use fs2::FileExt; +use moq_transport::coding::TrackNamespace; +use serde::{Deserialize, Serialize}; +use url::Url; + +use moq_relay_ietf::{ + Coordinator, NamespaceOrigin, NamespaceRegistration, TrackInfo, TrackRegistration, +}; + +/// Data stored in the shared file +#[derive(Debug, Default, Serialize, Deserialize)] +struct CoordinatorData { + /// Maps namespace path (e.g., "/foo/bar") to relay URL + namespaces: HashMap, + /// Maps "namespace_path:track_name" to track_alias + tracks: HashMap, +} + +impl CoordinatorData { + fn namespace_key(namespace: &TrackNamespace) -> String { + namespace.to_utf8_path() + } + + fn track_key(namespace: &TrackNamespace, track_name: &str) -> String { + format!("{}:{}", Self::namespace_key(namespace), track_name) + } +} + +/// Handle that unregisters a namespace when dropped +struct NamespaceUnregisterHandle { + namespace: TrackNamespace, + file_path: PathBuf, +} + +impl Drop for NamespaceUnregisterHandle { + fn drop(&mut self) { + if let Err(err) = unregister_namespace_sync(&self.file_path, &self.namespace) { + log::warn!("failed to unregister namespace on drop: {}", err); + } + } +} + +/// Handle that unregisters a track when dropped +struct TrackUnregisterHandle { + namespace: TrackNamespace, + track_name: String, + file_path: PathBuf, +} + +impl Drop for TrackUnregisterHandle { + fn drop(&mut self) { + if let Err(err) = unregister_track_sync(&self.file_path, &self.namespace, &self.track_name) + { + log::warn!("failed to unregister track on drop: {}", err); + } + } +} + +/// Synchronous helper for unregistering namespace (used in Drop) +fn unregister_namespace_sync(file_path: &Path, namespace: &TrackNamespace) -> Result<()> { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file_path)?; + + file.lock_exclusive()?; + + let mut data = read_data(&file)?; + let key = CoordinatorData::namespace_key(namespace); + + log::debug!("unregistering namespace: {}", key); + data.namespaces.remove(&key); + + // Remove all tracks under this namespace + let prefix = format!("{}:", key); + data.tracks.retain(|k, _| !k.starts_with(&prefix)); + + write_data(&file, &data)?; + file.unlock()?; + + Ok(()) +} + +/// Synchronous helper for unregistering track (used in Drop) +fn unregister_track_sync( + file_path: &Path, + namespace: &TrackNamespace, + track_name: &str, +) -> Result<()> { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file_path)?; + + file.lock_exclusive()?; + + let mut data = read_data(&file)?; + let key = CoordinatorData::track_key(namespace, track_name); + + log::debug!("unregistering track: {}", key); + data.tracks.remove(&key); + + write_data(&file, &data)?; + file.unlock()?; + + Ok(()) +} + +/// Read coordinator data from file +fn read_data(file: &File) -> Result { + let mut file = file; + file.seek(SeekFrom::Start(0))?; + + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + if contents.is_empty() { + return Ok(CoordinatorData::default()); + } + + serde_json::from_str(&contents).context("failed to parse coordinator data") +} + +/// Write coordinator data to file +fn write_data(file: &File, data: &CoordinatorData) -> Result<()> { + let mut file = file; + file.seek(SeekFrom::Start(0))?; + file.set_len(0)?; + + let json = serde_json::to_string_pretty(data)?; + file.write_all(json.as_bytes())?; + file.flush()?; + + Ok(()) +} + +/// A coordinator that uses a shared file for state storage. +/// +/// Multiple relay instances can use the same file to share namespace/track +/// registration data. File locking ensures safe concurrent access. +pub struct FileCoordinator { + /// Path to the shared coordination file + file_path: PathBuf, + /// URL of this relay (used when registering namespaces) + relay_url: Url, +} + +impl FileCoordinator { + /// Create a new file-based coordinator. + /// + /// # Arguments + /// * `file_path` - Path to the shared coordination file + /// * `relay_url` - URL of this relay instance (advertised to other relays) + pub fn new(file_path: impl AsRef, relay_url: Url) -> Self { + Self { + file_path: file_path.as_ref().to_path_buf(), + relay_url, + } + } +} + +#[async_trait] +impl Coordinator for FileCoordinator { + async fn register_namespace( + &self, + namespace: &TrackNamespace, + ) -> Result { + let namespace = namespace.clone(); + let relay_url = self.relay_url.to_string(); + let file_path = self.file_path.clone(); + + // Run blocking file I/O in a separate thread + let ns_clone = namespace.clone(); + tokio::task::spawn_blocking(move || { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&file_path)?; + + file.lock_exclusive()?; + + let mut data = read_data(&file)?; + let key = CoordinatorData::namespace_key(&ns_clone); + + log::info!("registering namespace: {} -> {}", key, relay_url); + data.namespaces.insert(key, relay_url); + + write_data(&file, &data)?; + file.unlock()?; + + Ok::<_, anyhow::Error>(()) + }) + .await??; + + let handle = NamespaceUnregisterHandle { + namespace, + file_path: self.file_path.clone(), + }; + + Ok(NamespaceRegistration::new(handle)) + } + + async fn unregister_namespace(&self, namespace: &TrackNamespace) -> Result<()> { + let namespace = namespace.clone(); + let file_path = self.file_path.clone(); + + tokio::task::spawn_blocking(move || unregister_namespace_sync(&file_path, &namespace)) + .await? + } + + async fn register_track(&self, track_info: TrackInfo) -> Result { + let file_path = self.file_path.clone(); + let namespace = track_info.namespace.clone(); + let track_name = track_info.track_name.clone(); + let track_alias = track_info.track_alias; + + let ns_clone = namespace.clone(); + let tn_clone = track_name.clone(); + + tokio::task::spawn_blocking(move || { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&file_path)?; + + file.lock_exclusive()?; + + let mut data = read_data(&file)?; + let key = CoordinatorData::track_key(&ns_clone, &tn_clone); + + log::info!("registering track: {}", key); + data.tracks.insert(key, track_alias); + + write_data(&file, &data)?; + file.unlock()?; + + Ok::<_, anyhow::Error>(()) + }) + .await??; + + let handle = TrackUnregisterHandle { + namespace, + track_name, + file_path: self.file_path.clone(), + }; + + Ok(TrackRegistration::new(handle)) + } + + async fn unregister_track(&self, namespace: &TrackNamespace, track_name: &str) -> Result<()> { + let namespace = namespace.clone(); + let track_name = track_name.to_string(); + let file_path = self.file_path.clone(); + + tokio::task::spawn_blocking(move || { + unregister_track_sync(&file_path, &namespace, &track_name) + }) + .await? + } + + async fn lookup(&self, namespace: &TrackNamespace) -> Result { + let namespace = namespace.clone(); + let file_path = self.file_path.clone(); + + tokio::task::spawn_blocking(move || { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&file_path)?; + + file.lock_shared()?; + + let data = read_data(&file)?; + let key = CoordinatorData::namespace_key(&namespace); + + log::debug!("looking up namespace: {}", key); + + // Try exact match first + if let Some(relay_url) = data.namespaces.get(&key) { + file.unlock()?; + let url = Url::parse(relay_url)?; + return Ok(NamespaceOrigin::new(namespace, url)); + } + + // Try prefix matching (find longest matching prefix) + let mut best_match: Option<(String, String)> = None; + for (registered_key, url) in &data.namespaces { + // FIXME(itzmanish): it would be much better to compare on TupleField + // instead of working on strings + let is_prefix = registered_key + .split('/') + .into_iter() + .zip(key.split('/')) + .all(|(a, b)| a == b); + match best_match { + Some((ns, _)) if is_prefix && ns.len() < registered_key.len() => { + best_match = Some((registered_key.clone(), url.clone())); + } + None if is_prefix => { + best_match = Some((registered_key.clone(), url.clone())); + } + _ => {} + } + } + + file.unlock()?; + + if let Some((matched_key, relay_url)) = best_match { + let matched_ns = TrackNamespace::from_utf8_path(&matched_key); + let url = Url::parse(&relay_url)?; + return Ok(NamespaceOrigin::new(matched_ns, url)); + } + + anyhow::bail!("namespace not found: {}", key) + }) + .await? + } + + async fn shutdown(&self) -> Result<()> { + // Nothing to clean up - file will be unlocked automatically + Ok(()) + } +} diff --git a/moq-relay-ietf/src/bin/main.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs similarity index 79% rename from moq-relay-ietf/src/bin/main.rs rename to moq-relay-ietf/src/bin/moq-relay-ietf/main.rs index aaa2dab3..81d148bf 100644 --- a/moq-relay-ietf/src/bin/main.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs @@ -1,9 +1,13 @@ +mod file_coordinator; + +use std::sync::Arc; use std::{net, path::PathBuf}; use clap::Parser; use url::Url; -use moq_relay_ietf::{LocalCoordinator, Relay, RelayConfig, Web, WebConfig}; +use file_coordinator::FileCoordinator; +use moq_relay_ietf::{Relay, RelayConfig, Web, WebConfig}; #[derive(Parser, Clone)] pub struct Cli { @@ -52,6 +56,11 @@ pub struct Cli { /// Requires --dev to enable the web server. Only serves files by exact CID - no index. #[arg(long)] pub mlog_serve: bool, + + /// Path to the shared coordinator file for multi-relay coordination. + /// Multiple relay instances can share namespace/track registration via this file. + #[arg(long, default_value = "/tmp/moq-coordinator.json")] + pub coordinator_file: PathBuf, } #[tokio::main] @@ -87,10 +96,16 @@ async fn main() -> anyhow::Result<()> { None }; - // Create the coordinator - // For now we always use LocalCoordinator. Later we can add HttpCoordinator - // based on --api and --node CLI args. - let coordinator = LocalCoordinator::new(); + // Build the relay URL from the node or bind address + let relay_url = cli + .node + .clone() + .unwrap_or_else(|| Url::parse(&format!("https://{}", cli.bind)).unwrap()); + + // Create the file-based coordinator for multi-relay coordination + let coordinator = Arc::new(FileCoordinator::new(&cli.coordinator_file, relay_url)); + + log::info!("using file coordinator: {}", cli.coordinator_file.display()); // Create a QUIC server for media. let relay = Relay::new(RelayConfig { @@ -99,7 +114,6 @@ async fn main() -> anyhow::Result<()> { qlog_dir: qlog_dir_for_relay, mlog_dir: mlog_dir_for_relay, node: cli.node, - api: cli.api, announce: cli.announce, coordinator, })?; diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index b79531b1..96e2059f 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -12,7 +12,7 @@ use crate::{Coordinator, Locals, Producer}; /// Consumer of tracks from a remote Publisher #[derive(Clone)] pub struct Consumer { - remote: Subscriber, + subscriber: Subscriber, locals: Locals, coordinator: Arc, forward: Option, // Forward all announcements to this subscriber @@ -20,13 +20,13 @@ pub struct Consumer { impl Consumer { pub fn new( - remote: Subscriber, + subscriber: Subscriber, locals: Locals, coordinator: Arc, forward: Option, ) -> Self { Self { - remote, + subscriber, locals, coordinator, forward, @@ -40,7 +40,7 @@ impl Consumer { loop { tokio::select! { // Handle a new announce request - Some(announce) = self.remote.announced() => { + Some(announce) = self.subscriber.announced() => { let this = self.clone(); tasks.push(async move { @@ -67,10 +67,9 @@ impl Consumer { let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce(); // Register namespace with the coordinator - let namespace_path = reader.namespace.to_utf8_path(); let _namespace_registration = self .coordinator - .register_namespace(&namespace_path) + .register_namespace(&reader.namespace) .await .context("failed to register namespace with coordinator")?; @@ -102,7 +101,7 @@ impl Consumer { // Wait for the next subscriber and serve the track. Some(track) = request.next() => { - let mut remote = self.remote.clone(); + let mut subscriber = self.subscriber.clone(); // Spawn a new task to handle the subscribe tasks.push(async move { @@ -110,7 +109,7 @@ impl Consumer { log::info!("forwarding subscribe: {:?}", info); // Forward the subscribe request - if let Err(err) = remote.subscribe(track).await { + if let Err(err) = subscriber.subscribe(track).await { log::warn!("failed forwarding subscribe: {:?}, error: {}", info, err) } diff --git a/moq-relay-ietf/src/coordinator.rs b/moq-relay-ietf/src/coordinator.rs index 18da56e1..7b613494 100644 --- a/moq-relay-ietf/src/coordinator.rs +++ b/moq-relay-ietf/src/coordinator.rs @@ -1,5 +1,6 @@ use anyhow::Result; use async_trait::async_trait; +use moq_transport::coding::TrackNamespace; use url::Url; /// Handle returned when a namespace is registered with the coordinator. @@ -9,6 +10,7 @@ use url::Url; /// or the namespace is no longer served, cleanup happens automatically. pub struct NamespaceRegistration { _inner: Box, + _metadata: Option>, } impl NamespaceRegistration { @@ -19,8 +21,15 @@ impl NamespaceRegistration { pub fn new(inner: T) -> Self { Self { _inner: Box::new(inner), + _metadata: None, } } + + /// Add metadata as list of key value pair of string: string + pub fn with_metadata(mut self, metadata: Vec<(String, String)>) -> Self { + self._metadata = Some(metadata); + self + } } /// Handle returned when a track is registered under a namespace. @@ -42,19 +51,30 @@ impl TrackRegistration { /// Result of a namespace lookup. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum NamespaceOrigin { - /// Namespace is served locally by this relay. - Local, +pub struct NamespaceOrigin(TrackNamespace, Url); + +impl NamespaceOrigin { + /// Create a new NamespaceOrigin. + pub fn new(namespace: TrackNamespace, url: Url) -> Self { + Self(namespace, url) + } - /// Namespace is served by a remote relay at the given URL. - Remote(Url), + /// Get the namespace. + pub fn namespace(&self) -> &TrackNamespace { + &self.0 + } + + /// Get the URL of the relay serving this namespace. + pub fn url(&self) -> Url { + self.1.clone() + } } /// Information about a track within a namespace. #[derive(Debug, Clone)] pub struct TrackInfo { /// The track namespace - pub namespace: String, + pub namespace: TrackNamespace, /// The track name within the namespace pub track_name: String, @@ -96,7 +116,8 @@ pub trait Coordinator: Send + Sync { /// /// A `NamespaceRegistration` handle. The namespace remains registered /// as long as this handle is held. Dropping it unregisters the namespace. - async fn register_namespace(&self, namespace: &str) -> Result; + async fn register_namespace(&self, namespace: &TrackNamespace) + -> Result; /// Unregister a namespace. /// @@ -107,7 +128,7 @@ pub trait Coordinator: Send + Sync { /// # Arguments /// /// * `namespace` - The namespace to unregister - async fn unregister_namespace(&self, namespace: &str) -> Result<()>; + async fn unregister_namespace(&self, namespace: &TrackNamespace) -> Result<()>; /// Register a track as available under a namespace. /// @@ -133,7 +154,7 @@ pub trait Coordinator: Send + Sync { /// /// * `namespace` - The namespace containing the track /// * `track_name` - The track name to unregister - async fn unregister_track(&self, namespace: &str, track_name: &str) -> Result<()>; + async fn unregister_track(&self, namespace: &TrackNamespace, track_name: &str) -> Result<()>; /// Lookup where a namespace is served from. /// @@ -149,10 +170,10 @@ pub trait Coordinator: Send + Sync { /// /// # Returns /// - /// - `Ok(Some(NamespaceOrigin::Local))` - Served by this relay - /// - `Ok(Some(NamespaceOrigin::Remote(url)))` - Served by remote relay - /// - `Ok(None)` - Namespace not found anywhere - async fn lookup(&self, namespace: &str) -> Result>; + /// - `Ok(NamespaceOrigin::Local)` - Served by this relay + /// - `Ok(NamespaceOrigin::Remote(url))` - Served by remote relay + /// - `Err` - Namespace not found anywhere + async fn lookup(&self, namespace: &TrackNamespace) -> Result; /// Graceful shutdown of the coordinator. /// diff --git a/moq-relay-ietf/src/coordinator_local.rs b/moq-relay-ietf/src/coordinator_local.rs deleted file mode 100644 index 839e053f..00000000 --- a/moq-relay-ietf/src/coordinator_local.rs +++ /dev/null @@ -1,435 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, RwLock, Weak}; - -use anyhow::Result; -use async_trait::async_trait; - -use crate::coordinator::{ - Coordinator, NamespaceOrigin, NamespaceRegistration, TrackInfo, TrackRegistration, -}; - -/// Internal state shared between LocalCoordinator and its drop guards. -struct LocalCoordinatorState { - /// Local namespaces currently registered - namespaces: RwLock>, - - /// Tracks registered under each namespace - /// Maps namespace -> set of track names - tracks: RwLock>>, -} - -/// Drop guard for namespace registration. -/// When dropped, removes the namespace from the coordinator. -struct NamespaceDropGuard { - namespace: String, - state: Weak, -} - -impl Drop for NamespaceDropGuard { - fn drop(&mut self) { - if let Some(state) = self.state.upgrade() { - log::debug!( - "local coordinator: auto-unregistering namespace {} (drop guard)", - self.namespace - ); - state.namespaces.write().unwrap().remove(&self.namespace); - state.tracks.write().unwrap().remove(&self.namespace); - } - } -} - -/// Drop guard for track registration. -/// When dropped, removes the track from the coordinator. -struct TrackDropGuard { - namespace: String, - track_name: String, - state: Weak, -} - -impl Drop for TrackDropGuard { - fn drop(&mut self) { - if let Some(state) = self.state.upgrade() { - log::debug!( - "local coordinator: auto-unregistering track {}/{} (drop guard)", - self.namespace, - self.track_name - ); - if let Some(tracks) = state.tracks.write().unwrap().get_mut(&self.namespace) { - tracks.remove(&self.track_name); - } - } - } -} - -/// Local coordinator for single-relay deployments. -/// -/// This coordinator does not communicate with any external service. -/// It tracks local namespaces and tracks in memory but does not -/// advertise them to other relays. -/// -/// # Use Cases -/// -/// - Development and testing -/// - Single-relay deployments -/// - Environments without external registry -/// -/// # Example -/// -/// ```rust,ignore -/// use moq_relay_ietf::{Relay, RelayConfig, LocalCoordinator}; -/// -/// let config = RelayConfig { -/// coordinator: LocalCoordinator::new(), -/// // ... -/// }; -/// let relay = Relay::new(config)?; -/// ``` -pub struct LocalCoordinator { - state: Arc, -} - -impl LocalCoordinator { - /// Create a new local coordinator. - pub fn new() -> Arc { - Arc::new(Self { - state: Arc::new(LocalCoordinatorState { - namespaces: RwLock::new(HashSet::new()), - tracks: RwLock::new(HashMap::new()), - }), - }) - } - - /// Get the number of registered namespaces (for testing). - #[cfg(test)] - pub fn namespace_count(&self) -> usize { - self.state.namespaces.read().unwrap().len() - } - - /// Get the number of tracks under a namespace (for testing). - #[cfg(test)] - pub fn track_count(&self, namespace: &str) -> usize { - self.state - .tracks - .read() - .unwrap() - .get(namespace) - .map(|t| t.len()) - .unwrap_or(0) - } - - /// Check if a namespace is registered (for testing). - #[cfg(test)] - pub fn has_namespace(&self, namespace: &str) -> bool { - self.state.namespaces.read().unwrap().contains(namespace) - } - - /// Check if a track is registered (for testing). - #[cfg(test)] - pub fn has_track(&self, namespace: &str, track_name: &str) -> bool { - self.state - .tracks - .read() - .unwrap() - .get(namespace) - .map(|t| t.contains(track_name)) - .unwrap_or(false) - } -} - -impl Default for LocalCoordinator { - fn default() -> Self { - Self { - state: Arc::new(LocalCoordinatorState { - namespaces: RwLock::new(HashSet::new()), - tracks: RwLock::new(HashMap::new()), - }), - } - } -} - -#[async_trait] -impl Coordinator for LocalCoordinator { - async fn register_namespace(&self, namespace: &str) -> Result { - log::debug!("local coordinator: registering namespace {}", namespace); - - self.state - .namespaces - .write() - .unwrap() - .insert(namespace.to_string()); - - // Initialize empty track set for this namespace - self.state - .tracks - .write() - .unwrap() - .entry(namespace.to_string()) - .or_default(); - - // Return a drop guard that will unregister the namespace when dropped - let guard = NamespaceDropGuard { - namespace: namespace.to_string(), - state: Arc::downgrade(&self.state), - }; - Ok(NamespaceRegistration::new(guard)) - } - - async fn unregister_namespace(&self, namespace: &str) -> Result<()> { - log::debug!("local coordinator: unregistering namespace {}", namespace); - - self.state.namespaces.write().unwrap().remove(namespace); - self.state.tracks.write().unwrap().remove(namespace); - - Ok(()) - } - - async fn register_track(&self, track_info: TrackInfo) -> Result { - log::debug!( - "local coordinator: registering track {}/{} (alias={})", - track_info.namespace, - track_info.track_name, - track_info.track_alias - ); - - self.state - .tracks - .write() - .unwrap() - .entry(track_info.namespace.clone()) - .or_default() - .insert(track_info.track_name.clone()); - - // Return a drop guard that will unregister the track when dropped - let guard = TrackDropGuard { - namespace: track_info.namespace, - track_name: track_info.track_name, - state: Arc::downgrade(&self.state), - }; - Ok(TrackRegistration::new(guard)) - } - - async fn unregister_track(&self, namespace: &str, track_name: &str) -> Result<()> { - log::debug!( - "local coordinator: unregistering track {}/{}", - namespace, - track_name - ); - - if let Some(tracks) = self.state.tracks.write().unwrap().get_mut(namespace) { - tracks.remove(track_name); - } - - Ok(()) - } - - async fn lookup(&self, namespace: &str) -> Result> { - // Check if we have this namespace locally - if self.state.namespaces.read().unwrap().contains(namespace) { - log::debug!("local coordinator: lookup {} -> Local", namespace); - return Ok(Some(NamespaceOrigin::Local)); - } - - // Single node - if not local, it doesn't exist - log::debug!("local coordinator: lookup {} -> not found", namespace); - Ok(None) - } - - async fn shutdown(&self) -> Result<()> { - log::debug!("local coordinator: shutdown"); - - self.state.namespaces.write().unwrap().clear(); - self.state.tracks.write().unwrap().clear(); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn create_coordinator() -> Arc { - Arc::new(LocalCoordinator::default()) - } - - #[tokio::test] - async fn test_register_namespace() { - let coord = create_coordinator(); - - let _reg = coord.register_namespace("live/stream").await.unwrap(); - - assert!(coord.has_namespace("live/stream")); - assert_eq!(coord.namespace_count(), 1); - } - - #[tokio::test] - async fn test_unregister_namespace() { - let coord = create_coordinator(); - - let _reg = coord.register_namespace("live/stream").await.unwrap(); - assert!(coord.has_namespace("live/stream")); - - coord.unregister_namespace("live/stream").await.unwrap(); - assert!(!coord.has_namespace("live/stream")); - assert_eq!(coord.namespace_count(), 0); - } - - #[tokio::test] - async fn test_register_track() { - let coord = create_coordinator(); - - let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); - - let track_info = TrackInfo { - namespace: "live/stream".to_string(), - track_name: "audio".to_string(), - track_alias: 1, - }; - let _track_reg = coord.register_track(track_info).await.unwrap(); - - assert!(coord.has_track("live/stream", "audio")); - assert_eq!(coord.track_count("live/stream"), 1); - } - - #[tokio::test] - async fn test_unregister_track_keeps_namespace() { - let coord = create_coordinator(); - - let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); - - let track_info = TrackInfo { - namespace: "live/stream".to_string(), - track_name: "audio".to_string(), - track_alias: 1, - }; - let _track_reg = coord.register_track(track_info).await.unwrap(); - - // Unregister track explicitly - coord - .unregister_track("live/stream", "audio") - .await - .unwrap(); - - // Track should be gone but namespace should remain - assert!(!coord.has_track("live/stream", "audio")); - assert!(coord.has_namespace("live/stream")); - } - - #[tokio::test] - async fn test_unregister_namespace_removes_tracks() { - let coord = create_coordinator(); - - let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); - - let track_info = TrackInfo { - namespace: "live/stream".to_string(), - track_name: "audio".to_string(), - track_alias: 1, - }; - let _track_reg = coord.register_track(track_info).await.unwrap(); - - // Unregister namespace explicitly - coord.unregister_namespace("live/stream").await.unwrap(); - - // Both namespace and tracks should be gone - assert!(!coord.has_namespace("live/stream")); - assert_eq!(coord.track_count("live/stream"), 0); - } - - #[tokio::test] - async fn test_lookup_local() { - let coord = create_coordinator(); - - let _reg = coord.register_namespace("live/stream").await.unwrap(); - - let result = coord.lookup("live/stream").await.unwrap(); - assert_eq!(result, Some(NamespaceOrigin::Local)); - } - - #[tokio::test] - async fn test_lookup_not_found() { - let coord = create_coordinator(); - - let result = coord.lookup("unknown/stream").await.unwrap(); - assert_eq!(result, None); - } - - #[tokio::test] - async fn test_shutdown_clears_all() { - let coord = create_coordinator(); - - let _ns1_reg = coord.register_namespace("ns1").await.unwrap(); - let _ns2_reg = coord.register_namespace("ns2").await.unwrap(); - - let track_info = TrackInfo { - namespace: "ns1".to_string(), - track_name: "track1".to_string(), - track_alias: 1, - }; - let _track_reg = coord.register_track(track_info).await.unwrap(); - - coord.shutdown().await.unwrap(); - - assert_eq!(coord.namespace_count(), 0); - assert_eq!(coord.track_count("ns1"), 0); - } - - #[tokio::test] - async fn test_multiple_tracks_per_namespace() { - let coord = create_coordinator(); - - let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); - - let mut _track_regs = Vec::new(); - for (i, name) in ["audio", "video", "data"].iter().enumerate() { - let track_info = TrackInfo { - namespace: "live/stream".to_string(), - track_name: name.to_string(), - track_alias: i as u64, - }; - _track_regs.push(coord.register_track(track_info).await.unwrap()); - } - - assert_eq!(coord.track_count("live/stream"), 3); - assert!(coord.has_track("live/stream", "audio")); - assert!(coord.has_track("live/stream", "video")); - assert!(coord.has_track("live/stream", "data")); - } - - #[tokio::test] - async fn test_drop_unregisters_namespace() { - let coord = create_coordinator(); - - { - let _reg = coord.register_namespace("live/stream").await.unwrap(); - assert!(coord.has_namespace("live/stream")); - // _reg dropped here - } - - // Namespace should be auto-unregistered - assert!(!coord.has_namespace("live/stream")); - assert_eq!(coord.namespace_count(), 0); - } - - #[tokio::test] - async fn test_drop_unregisters_track() { - let coord = create_coordinator(); - - let _ns_reg = coord.register_namespace("live/stream").await.unwrap(); - - { - let track_info = TrackInfo { - namespace: "live/stream".to_string(), - track_name: "audio".to_string(), - track_alias: 1, - }; - let _track_reg = coord.register_track(track_info).await.unwrap(); - assert!(coord.has_track("live/stream", "audio")); - // _track_reg dropped here - } - - // Track should be auto-unregistered, but namespace remains - assert!(!coord.has_track("live/stream", "audio")); - assert!(coord.has_namespace("live/stream")); - } -} diff --git a/moq-relay-ietf/src/lib.rs b/moq-relay-ietf/src/lib.rs index 0b258514..ff0148b1 100644 --- a/moq-relay-ietf/src/lib.rs +++ b/moq-relay-ietf/src/lib.rs @@ -31,7 +31,6 @@ mod api; mod consumer; mod coordinator; -mod coordinator_local; mod local; mod producer; mod relay; @@ -42,7 +41,6 @@ mod web; pub use api::*; pub use consumer::*; pub use coordinator::*; -pub use coordinator_local::*; pub use local::*; pub use producer::*; pub use relay::*; diff --git a/moq-relay-ietf/src/local.rs b/moq-relay-ietf/src/local.rs index 6f355a0a..406e6650 100644 --- a/moq-relay-ietf/src/local.rs +++ b/moq-relay-ietf/src/local.rs @@ -46,9 +46,9 @@ impl Locals { Ok(registration) } - /// Lookup local tracks by namespace using hierarchical prefix matching. + /// Retrieve local tracks by namespace using hierarchical prefix matching. /// Returns the TracksReader for the longest matching namespace prefix. - pub fn route(&self, namespace: &TrackNamespace) -> Option { + pub fn retrieve(&self, namespace: &TrackNamespace) -> Option { let lookup = self.lookup.lock().unwrap(); // Find the longest matching prefix diff --git a/moq-relay-ietf/src/producer.rs b/moq-relay-ietf/src/producer.rs index 3de13bef..4bd8e177 100644 --- a/moq-relay-ietf/src/producer.rs +++ b/moq-relay-ietf/src/producer.rs @@ -4,20 +4,20 @@ use moq_transport::{ session::{Publisher, SessionError, Subscribed, TrackStatusRequested}, }; -use crate::{Locals, RemotesConsumer}; +use crate::{Locals, RemoteManager}; /// Producer of tracks to a remote Subscriber #[derive(Clone)] pub struct Producer { - remote_publisher: Publisher, + publisher: Publisher, locals: Locals, - remotes: Option, + remotes: RemoteManager, } impl Producer { - pub fn new(remote: Publisher, locals: Locals, remotes: Option) -> Self { + pub fn new(publisher: Publisher, locals: Locals, remotes: RemoteManager) -> Self { Self { - remote_publisher: remote, + publisher, locals, remotes, } @@ -25,7 +25,7 @@ impl Producer { /// Announce new tracks to the remote server. pub async fn announce(&mut self, tracks: TracksReader) -> Result<(), SessionError> { - self.remote_publisher.announce(tracks).await + self.publisher.announce(tracks).await } /// Run the producer to serve subscribe requests. @@ -35,12 +35,12 @@ impl Producer { FuturesUnordered::new(); loop { - let mut remote_publisher_subscribed = self.remote_publisher.clone(); - let mut remote_publisher_track_status = self.remote_publisher.clone(); + let mut publisher_subscribed = self.publisher.clone(); + let mut publisher_track_status = self.publisher.clone(); tokio::select! { // Handle a new subscribe request - Some(subscribed) = remote_publisher_subscribed.subscribed() => { + Some(subscribed) = publisher_subscribed.subscribed() => { let this = self.clone(); // Spawn a new task to handle the subscribe @@ -50,12 +50,12 @@ impl Producer { // Serve the subscribe request if let Err(err) = this.serve_subscribe(subscribed).await { - log::warn!("failed serving subscribe: {:?}, error: {}", info, err) + log::warn!("failed serving subscribe: {:?}, error: {}", info, err); } }.boxed()) }, // Handle a new track_status request - Some(track_status_requested) = remote_publisher_track_status.track_status_requested() => { + Some(track_status_requested) = publisher_track_status.track_status_requested() => { let this = self.clone(); // Spawn a new task to handle the track_status request @@ -77,44 +77,35 @@ impl Producer { /// Serve a subscribe request. async fn serve_subscribe(self, subscribed: Subscribed) -> Result<(), anyhow::Error> { + let namespace = subscribed.track_namespace.clone(); + let track_name = subscribed.track_name.clone(); + // Check local tracks first, and serve from local if possible - if let Some(mut local) = self.locals.route(&subscribed.track_namespace) { + if let Some(mut local) = self.locals.retrieve(&namespace) { // Pass the full requested namespace, not the announced prefix - if let Some(track) = - local.subscribe(subscribed.track_namespace.clone(), &subscribed.track_name) - { + if let Some(track) = local.subscribe(namespace.clone(), &track_name) { log::info!("serving subscribe from local: {:?}", track.info); return Ok(subscribed.serve(track).await?); } } // Check remote tracks second, and serve from remote if possible - if let Some(remotes) = &self.remotes { - // Try to route to a remote for this namespace - if let Some(remote) = remotes.route(&subscribed.track_namespace).await? { - if let Some(track) = remote.subscribe( - subscribed.track_namespace.clone(), - subscribed.track_name.clone(), - )? { - log::info!( - "serving subscribe from remote: {:?} {:?}", - remote.info, - track.info - ); - - // NOTE: Depends on drop(track) being called afterwards - return Ok(subscribed.serve(track.reader).await?); - } - } + if let Some(track) = self + .remotes + .subscribe(namespace.clone(), track_name.clone()) + .await? + { + log::info!("serving subscribe from remote: {:?}", track.info); + return Ok(subscribed.serve(track).await?); } - let namespace = subscribed.track_namespace.clone(); - let name = subscribed.track_name.clone(); - Err(ServeError::not_found_ctx(format!( + // Track not found - close the subscription with not found error + let err = ServeError::not_found_ctx(format!( "track '{}/{}' not found in local or remote tracks", - namespace, name - )) - .into()) + namespace, track_name + )); + subscribed.close(err.clone())?; + Err(err.into()) } /// Serve a track_status request. @@ -125,7 +116,7 @@ impl Producer { // Check local tracks first, and serve from local if possible if let Some(mut local_tracks) = self .locals - .route(&track_status_requested.request_msg.track_namespace) + .retrieve(&track_status_requested.request_msg.track_namespace) { if let Some(track) = local_tracks.get_track_reader( &track_status_requested.request_msg.track_namespace, diff --git a/moq-relay-ietf/src/relay.rs b/moq-relay-ietf/src/relay.rs index bc78658d..f843f9fa 100644 --- a/moq-relay-ietf/src/relay.rs +++ b/moq-relay-ietf/src/relay.rs @@ -6,10 +6,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use moq_native_ietf::quic; use url::Url; -use crate::{ - Api, Consumer, Coordinator, Locals, Producer, Remotes, RemotesConsumer, RemotesProducer, - Session, -}; +use crate::{Api, Consumer, Coordinator, Locals, Producer, RemoteManager, Session}; /// Configuration for the relay. pub struct RelayConfig { @@ -28,9 +25,6 @@ pub struct RelayConfig { /// Forward all announcements to the (optional) URL. pub announce: Option, - /// Connect to the HTTP moq-api at this URL. - pub api: Option, - /// Our hostname which we advertise to other origins. /// We use QUIC, so the certificate must be valid for this address. pub node: Option, @@ -45,19 +39,18 @@ pub struct Relay { announce_url: Option, mlog_dir: Option, locals: Locals, - api: Option, - remotes: Option<(RemotesProducer, RemotesConsumer)>, + remotes: RemoteManager, coordinator: Arc, } impl Relay { pub fn new(config: RelayConfig) -> anyhow::Result { // Create a QUIC endpoint that can be used for both clients and servers. - let quic = quic::Endpoint::new(quic::Config { - bind: config.bind, - qlog_dir: config.qlog_dir, - tls: config.tls, - })?; + let quic = quic::Endpoint::new(quic::Config::new( + config.bind, + config.qlog_dir, + config.tls.clone(), + ))?; // Validate mlog directory if provided if let Some(mlog_dir) = &config.mlog_dir { @@ -70,30 +63,15 @@ impl Relay { log::info!("mlog output enabled: {}", mlog_dir.display()); } - // Create an API client if we have the necessary configuration - let api = if let (Some(url), Some(node)) = (config.api, config.node) { - log::info!("using moq-api: url={} node={}", url, node); - Some(Api::new(url, node)) - } else { - None - }; - let locals = Locals::new(); - // Create remotes if we have an API client - let remotes = api.clone().map(|api| { - Remotes { - api, - quic: quic.client.clone(), - } - .produce() - }); + // Create remote manager - uses coordinator for namespace lookups + let remotes = RemoteManager::new(config.coordinator.clone(), config.tls.clone()); Ok(Self { quic, announce_url: config.announce, mlog_dir: config.mlog_dir, - api, locals, remotes, coordinator: config.coordinator, @@ -104,11 +82,7 @@ impl Relay { pub async fn run(self) -> anyhow::Result<()> { let mut tasks = FuturesUnordered::new(); - // Start the remotes producer task, if any - let remotes = self.remotes.map(|(producer, consumer)| { - tasks.push(producer.run().boxed()); - consumer - }); + let remotes = self.remotes.clone(); // Start the forwarder, if any let forward_producer = if let Some(url) = &self.announce_url { @@ -118,7 +92,7 @@ impl Relay { let (session, _quic_client_initial_cid) = self .quic .client - .connect(url) + .connect(url, None) .await .context("failed to establish forward connection")?; @@ -129,6 +103,7 @@ impl Relay { .context("failed to establish forward session")?; // Create a normal looking session, except we never forward or register announces. + let coordinator = self.coordinator.clone(); let session = Session { session, producer: Some(Producer::new( @@ -139,7 +114,7 @@ impl Relay { consumer: Some(Consumer::new( subscriber, self.locals.clone(), - self.coordinator.clone(), + coordinator, None, )), }; @@ -174,7 +149,6 @@ impl Relay { // Spawn a new task to handle the connection tasks.push(async move { - // Create the MoQ session over the connection (setup handshake etc) let (session, publisher, subscriber) = match moq_transport::session::Session::accept(conn, mlog_path).await { Ok(session) => session, @@ -185,8 +159,9 @@ impl Relay { }; // Create our MoQ relay session + let moq_session = session; let session = Session { - session, + session: moq_session, producer: publisher.map(|publisher| Producer::new(publisher, locals.clone(), remotes)), consumer: subscriber.map(|subscriber| Consumer::new(subscriber, locals, coordinator, forward)), }; diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index 1b412d0b..c4c61d9b 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -1,412 +1,199 @@ use std::collections::HashMap; - -use std::collections::VecDeque; -use std::fmt; -use std::ops; use std::sync::Arc; -use std::sync::Weak; -use futures::stream::FuturesUnordered; -use futures::FutureExt; -use futures::StreamExt; use moq_native_ietf::quic; use moq_transport::coding::TrackNamespace; -use moq_transport::serve::{Track, TrackReader, TrackWriter}; -use moq_transport::watch::State; +use moq_transport::serve::{Track, TrackReader}; +use tokio::sync::Mutex; use url::Url; -use crate::Api; - -/// Information about remote origins. -pub struct Remotes { - /// The client we use to fetch/store origin information. - pub api: Api, - - // A QUIC endpoint we'll use to fetch from other origins. - pub quic: quic::Client, -} - -impl Remotes { - pub fn produce(self) -> (RemotesProducer, RemotesConsumer) { - let (send, recv) = State::default().split(); - let info = Arc::new(self); - - let producer = RemotesProducer::new(info.clone(), send); - let consumer = RemotesConsumer::new(info, recv); - - (producer, consumer) - } -} - -#[derive(Default)] -struct RemotesState { - lookup: HashMap, - requested: VecDeque, -} +use crate::Coordinator; -// Clone for convenience, but there should only be one instance of this +/// Manages connections to remote relays. +/// +/// When a subscription request comes in for a namespace that isn't local, +/// RemoteManager uses the coordinator to find which remote relay serves it, +/// establishes a connection if needed, and subscribes to the track. #[derive(Clone)] -pub struct RemotesProducer { - info: Arc, - state: State, -} - -impl RemotesProducer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - /// Block until the next remote requested by a consumer. - async fn next(&mut self) -> Option { - loop { - { - let state = self.state.lock(); - if !state.requested.is_empty() { - return state.into_mut()?.requested.pop_front(); - } - - state.modified()? - } - .await; +pub struct RemoteManager { + coordinator: Arc, + tls: moq_native_ietf::tls::Config, + remotes: Arc>>, +} + +impl RemoteManager { + /// Create a new RemoteManager. + pub fn new(coordinator: Arc, tls: moq_native_ietf::tls::Config) -> Self { + Self { + coordinator, + tls, + remotes: Arc::new(Mutex::new(HashMap::new())), } } - /// Run the remotes producer to serve remote requests. - pub async fn run(mut self) -> anyhow::Result<()> { - let mut tasks = FuturesUnordered::new(); - - loop { - tokio::select! { - Some(mut remote) = self.next() => { - let url = remote.url.clone(); - - // Spawn a task to serve the remote - tasks.push(async move { - let info = remote.info.clone(); - - log::warn!("serving remote: {:?}", info); - - // Run the remote producer - if let Err(err) = remote.run().await { - log::warn!("failed serving remote: {:?}, error: {}", info, err); - } + /// Subscribe to a track from a remote relay. + /// + /// This will: + /// 1. Use the coordinator to lookup which relay serves the namespace + /// 2. Connect to that relay if not already connected + /// 3. Subscribe to the specific track + /// + /// Returns None if the namespace isn't found in any remote relay. + pub async fn subscribe( + &self, + namespace: TrackNamespace, + track_name: String, + ) -> anyhow::Result> { + // Ask coordinator where this namespace lives + let origin = match self.coordinator.lookup(&namespace).await { + Ok(origin) => origin, + Err(_) => return Ok(None), // Namespace not found anywhere + }; - url - }); - } + let url = origin.url(); - // Handle finished remote producers - res = tasks.next(), if !tasks.is_empty() => { - let url = res.unwrap(); + // Get or create a connection to the remote relay + let remote = self.get_or_connect(&url).await?; - if let Some(mut state) = self.state.lock_mut() { - state.lookup.remove(&url); - } - }, - else => return Ok(()), - } - } + // Subscribe to the track on the remote + remote.subscribe(namespace, track_name).await } -} - -impl ops::Deref for RemotesProducer { - type Target = Remotes; - fn deref(&self) -> &Self::Target { - &self.info - } -} + /// Get an existing remote connection or create a new one. + async fn get_or_connect(&self, url: &Url) -> anyhow::Result { + let mut remotes = self.remotes.lock().await; -#[derive(Clone)] -pub struct RemotesConsumer { - pub info: Arc, - state: State, -} - -impl RemotesConsumer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - /// Route to a remote origin based on the namespace. - pub async fn route( - &self, - namespace: &TrackNamespace, - ) -> anyhow::Result> { - // Always fetch the origin instead of using the (potentially invalid) cache. - let origin = match self.api.get_origin(&namespace.to_utf8_path()).await? { - None => return Ok(None), - Some(origin) => origin, - }; - - // Check if we already have a remote for this origin - let state = self.state.lock(); - if let Some(remote) = state.lookup.get(&origin.url).cloned() { - return Ok(Some(remote)); + // Check if we already have a connection + if let Some(remote) = remotes.get(url) { + if remote.is_connected() { + return Ok(remote.clone()); + } + // Connection is dead, remove it + remotes.remove(url); } - // Create a new remote for this origin - let mut state = match state.into_mut() { - Some(state) => state, - None => return Ok(None), - }; - - let remote = Remote { - url: origin.url.clone(), - remotes: self.info.clone(), - }; - - // Produce the remote - let (writer, reader) = remote.produce(); - state.requested.push_back(writer); + // Create a new connection with its own QUIC client + log::info!("connecting to remote relay: {}", url); + let remote = Remote::connect(url.clone(), self.tls.clone()).await?; - // Insert the remote into our Map - state.lookup.insert(origin.url, reader.clone()); + remotes.insert(url.clone(), remote.clone()); - Ok(Some(reader)) + Ok(remote) } -} - -impl ops::Deref for RemotesConsumer { - type Target = Remotes; - fn deref(&self) -> &Self::Target { - &self.info + /// Remove a remote connection (called when connection fails). + pub async fn remove(&self, url: &Url) { + let mut remotes = self.remotes.lock().await; + remotes.remove(url); } } +/// A connection to a single remote relay with its own QUIC client. +#[derive(Clone)] pub struct Remote { - pub remotes: Arc, - pub url: Url, -} - -impl fmt::Debug for Remote { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Remote") - .field("url", &self.url.to_string()) - .finish() - } -} - -impl ops::Deref for Remote { - type Target = Remotes; - - fn deref(&self) -> &Self::Target { - &self.remotes - } + url: Url, + subscriber: moq_transport::session::Subscriber, + /// Track subscriptions - maps (namespace, track_name) to track reader + tracks: Arc>>, } impl Remote { - /// Create a new broadcast. - pub fn produce(self) -> (RemoteProducer, RemoteConsumer) { - let (send, recv) = State::default().split(); - let info = Arc::new(self); - - let consumer = RemoteConsumer::new(info.clone(), recv); - let producer = RemoteProducer::new(info, send); - - (producer, consumer) - } -} - -#[derive(Default)] -struct RemoteState { - tracks: HashMap<(TrackNamespace, String), RemoteTrackWeak>, - requested: VecDeque, -} - -pub struct RemoteProducer { - pub info: Arc, - state: State, -} - -impl RemoteProducer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } - } - - pub async fn run(&mut self) -> anyhow::Result<()> { - // TODO reuse QUIC and MoQ sessions - let (session, _quic_client_initial_cid) = self.quic.connect(&self.url).await?; + /// Connect to a remote relay with a dedicated QUIC client. + async fn connect(url: Url, tls: moq_native_ietf::tls::Config) -> anyhow::Result { + // Create a dedicated QUIC endpoint for this remote connection + let bind_addr: std::net::SocketAddr = "[::]:0".parse()?; + let quic_config = quic::Config::new(bind_addr, None, tls); + let quic = quic::Endpoint::new(quic_config)?; + + // Connect to the remote relay (DNS resolution happens inside connect) + let (session, _cid) = quic.client.connect(&url, None).await?; let (session, subscriber) = moq_transport::session::Subscriber::connect(session).await?; - // Run the session - let mut session = session.run().boxed(); - let mut tasks = FuturesUnordered::new(); - - let mut done = None; - - // Serve requested tracks - loop { - tokio::select! { - track = self.next(), if done.is_none() => { - let track = match track { - Ok(Some(track)) => track, - Ok(None) => { done = Some(Ok(())); continue }, - Err(err) => { done = Some(Err(err)); continue }, - }; - - let info = track.info.clone(); - let mut subscriber = subscriber.clone(); - - tasks.push(async move { - if let Err(err) = subscriber.subscribe(track).await { - log::warn!("failed serving track: {:?}, error: {}", info, err); - } - }); - } - _ = tasks.next(), if !tasks.is_empty() => {}, - - // Keep running the session - res = &mut session, if !tasks.is_empty() || done.is_none() => return Ok(res?), - - else => return done.unwrap(), + // Spawn a task to run the session + let session_url = url.clone(); + tokio::spawn(async move { + if let Err(err) = session.run().await { + log::warn!("remote session closed: {} - {}", session_url, err); } - } - } - - /// Block until the next track requested by a consumer. - async fn next(&self) -> anyhow::Result> { - loop { - let notify = { - let state = self.state.lock(); - - // Check if we have any requested tracks - if !state.requested.is_empty() { - return Ok(state - .into_mut() - .and_then(|mut state| state.requested.pop_front())); - } - - match state.modified() { - Some(notified) => notified, - None => return Ok(None), - } - }; - - notify.await - } - } -} - -impl ops::Deref for RemoteProducer { - type Target = Remote; + }); - fn deref(&self) -> &Self::Target { - &self.info + Ok(Self { + url, + subscriber, + tracks: Arc::new(Mutex::new(HashMap::new())), + }) } -} -#[derive(Clone)] -pub struct RemoteConsumer { - pub info: Arc, - state: State, -} - -impl RemoteConsumer { - fn new(info: Arc, state: State) -> Self { - Self { info, state } + /// Check if the connection is still alive. + /// Note: This is a simple heuristic - we assume connected until proven otherwise. + fn is_connected(&self) -> bool { + // We don't have a direct way to check if the subscriber is closed, + // so we assume it's connected. Dead connections will be cleaned up + // when subscribe operations fail. + true } - /// Request a track from the broadcast. - pub fn subscribe( + /// Subscribe to a track on this remote relay. + pub async fn subscribe( &self, namespace: TrackNamespace, - name: String, - ) -> anyhow::Result> { - let key = (namespace.clone(), name.clone()); - let state = self.state.lock(); - if let Some(track) = state.tracks.get(&key) { - if let Some(track) = track.upgrade() { - return Ok(Some(track)); + track_name: String, + ) -> anyhow::Result> { + let key = (namespace.clone(), track_name.clone()); + + // Check if we already have this track + { + let tracks = self.tracks.lock().await; + if let Some(reader) = tracks.get(&key) { + return Ok(Some(reader.clone())); } } - let mut state = match state.into_mut() { - Some(state) => state, - None => return Ok(None), - }; - - let (writer, reader) = Track::new(namespace, name).produce(); - let reader = RemoteTrackReader::new(reader, self.state.clone()); - - // Insert the track into our Map so we deduplicate future requests. - state.tracks.insert(key, reader.downgrade()); - state.requested.push_back(writer); - - Ok(Some(reader)) - } -} - -impl ops::Deref for RemoteConsumer { - type Target = Remote; - - fn deref(&self) -> &Self::Target { - &self.info - } -} - -#[derive(Clone)] -pub struct RemoteTrackReader { - pub reader: TrackReader, - drop: Arc, -} + // Create a new track and subscribe + let (writer, reader) = Track::new(namespace.clone(), track_name.clone()).produce(); + + // Subscribe to the track on the remote + let mut subscriber = self.subscriber.clone(); + let track_key = key.clone(); + let tracks = self.tracks.clone(); + let url = self.url.clone(); + + tokio::spawn(async move { + log::info!( + "subscribing to remote track: {} - {}/{}", + url, + track_key.0, + track_key.1 + ); + + if let Err(err) = subscriber.subscribe(writer).await { + log::warn!( + "failed subscribing to remote track: {} - {}/{} - {}", + url, + track_key.0, + track_key.1, + err + ); + } -impl RemoteTrackReader { - fn new(reader: TrackReader, parent: State) -> Self { - let drop = Arc::new(RemoteTrackDrop { - parent, - key: (reader.namespace.clone(), reader.name.clone()), + // Remove track from map when subscription ends + tracks.lock().await.remove(&track_key); }); - Self { reader, drop } - } - - fn downgrade(&self) -> RemoteTrackWeak { - RemoteTrackWeak { - reader: self.reader.clone(), - drop: Arc::downgrade(&self.drop), + // Store the reader for deduplication + { + let mut tracks = self.tracks.lock().await; + tracks.insert(key, reader.clone()); } - } -} -impl ops::Deref for RemoteTrackReader { - type Target = TrackReader; - - fn deref(&self) -> &Self::Target { - &self.reader - } -} - -impl ops::DerefMut for RemoteTrackReader { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.reader - } -} - -struct RemoteTrackWeak { - reader: TrackReader, - drop: Weak, -} - -impl RemoteTrackWeak { - fn upgrade(&self) -> Option { - Some(RemoteTrackReader { - reader: self.reader.clone(), - drop: self.drop.upgrade()?, - }) + Ok(Some(reader)) } } -struct RemoteTrackDrop { - parent: State, - key: (TrackNamespace, String), -} - -impl Drop for RemoteTrackDrop { - fn drop(&mut self) { - if let Some(mut parent) = self.parent.lock_mut() { - parent.tracks.remove(&self.key); - } +impl std::fmt::Debug for Remote { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Remote") + .field("url", &self.url.to_string()) + .finish() } } From 767097f1138ef33433890e97b3d11687cdbdecd4 Mon Sep 17 00:00:00 2001 From: Manish Date: Mon, 1 Dec 2025 12:02:53 +0530 Subject: [PATCH 03/10] refactor: simplify QUIC config initialization using new Config::new constructor --- moq-clock-ietf/src/main.rs | 8 ++------ moq-pub/src/main.rs | 12 ++++++------ moq-sub/src/main.rs | 8 ++------ 3 files changed, 10 insertions(+), 18 deletions(-) diff --git a/moq-clock-ietf/src/main.rs b/moq-clock-ietf/src/main.rs index 199116cb..7d0f6951 100644 --- a/moq-clock-ietf/src/main.rs +++ b/moq-clock-ietf/src/main.rs @@ -29,16 +29,12 @@ async fn main() -> anyhow::Result<()> { let tls = config.tls.load()?; // Create the QUIC endpoint - let quic = quic::Endpoint::new(quic::Config { - bind: config.bind, - qlog_dir: None, - tls, - })?; + let quic = quic::Endpoint::new(quic::Config::new(config.bind, None, tls))?; log::info!("connecting to server: url={}", config.url); // Connect to the server - let (session, connection_id) = quic.client.connect(&config.url).await?; + let (session, connection_id) = quic.client.connect(&config.url, None).await?; log::info!( "connected with CID: {} (use this to look up qlog/mlog on server)", diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs index 7e71764c..cd9350fc 100644 --- a/moq-pub/src/main.rs +++ b/moq-pub/src/main.rs @@ -57,14 +57,14 @@ async fn main() -> anyhow::Result<()> { let tls = cli.tls.load()?; - let quic = quic::Endpoint::new(moq_native_ietf::quic::Config { - bind: cli.bind, - qlog_dir: None, - tls: tls.clone(), - })?; + let quic = quic::Endpoint::new(moq_native_ietf::quic::Config::new( + cli.bind, + None, + tls.clone(), + ))?; log::info!("connecting to relay: url={}", cli.url); - let (session, connection_id) = quic.client.connect(&cli.url).await?; + let (session, connection_id) = quic.client.connect(&cli.url, None).await?; log::info!( "connected with CID: {} (use this to look up qlog/mlog on server)", diff --git a/moq-sub/src/main.rs b/moq-sub/src/main.rs index e38154f2..2663833e 100644 --- a/moq-sub/src/main.rs +++ b/moq-sub/src/main.rs @@ -22,13 +22,9 @@ async fn main() -> anyhow::Result<()> { let config = Config::parse(); let tls = config.tls.load()?; - let quic = quic::Endpoint::new(quic::Config { - bind: config.bind, - qlog_dir: None, - tls, - })?; + let quic = quic::Endpoint::new(quic::Config::new(config.bind, None, tls))?; - let (session, connection_id) = quic.client.connect(&config.url).await?; + let (session, connection_id) = quic.client.connect(&config.url, None).await?; log::info!( "connected with CID: {} (use this to look up qlog/mlog on server)", From 02bf324cce46482b69fd8acecd86e80440aca0a7 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 2 Dec 2025 18:56:54 +0530 Subject: [PATCH 04/10] fix: return clients on lookup for coordinator and misc fix --- moq-native-ietf/src/quic.rs | 21 ++- moq-relay-ietf/Cargo.toml | 2 + .../bin/moq-relay-ietf/file_coordinator.rs | 123 +++++++++------- moq-relay-ietf/src/bin/moq-relay-ietf/main.rs | 3 +- moq-relay-ietf/src/consumer.rs | 3 +- moq-relay-ietf/src/coordinator.rs | 96 ++++++++++-- moq-relay-ietf/src/lib.rs | 6 +- moq-relay-ietf/src/relay.rs | 105 ++++++++++--- moq-relay-ietf/src/remote.rs | 38 ++--- moq-transport/src/coding/track_namespace.rs | 138 ++++++++++++++++++ 10 files changed, 423 insertions(+), 112 deletions(-) diff --git a/moq-native-ietf/src/quic.rs b/moq-native-ietf/src/quic.rs index e00711a5..8b6cdad6 100644 --- a/moq-native-ietf/src/quic.rs +++ b/moq-native-ietf/src/quic.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, fmt, fs::File, io::BufWriter, @@ -86,6 +87,7 @@ pub struct Config { pub socket: net::UdpSocket, pub qlog_dir: Option, pub tls: tls::Config, + pub tags: HashSet, } impl Config { @@ -97,6 +99,7 @@ impl Config { .unwrap(), qlog_dir, tls, + tags: HashSet::new(), } } @@ -110,13 +113,25 @@ impl Config { socket, qlog_dir, tls, + tags: HashSet::new(), } } + + pub fn with_tag(mut self, tag: String) -> Self { + self.tags.insert(tag); + self + } } pub struct Endpoint { pub client: Client, pub server: Option, + /// Tags associated with this endpoint + /// These are used to filter endpoints for different purposes, for eg- + /// "server" tag is used to filter endpoints for relay server + /// "forward" tag is used to filter endpoints for forwarder + /// This is upto the user to define and use + pub tags: HashSet, } impl Endpoint { @@ -173,7 +188,11 @@ impl Endpoint { transport, }; - Ok(Self { client, server }) + Ok(Self { + client, + server, + tags: config.tags, + }) } } diff --git a/moq-relay-ietf/Cargo.toml b/moq-relay-ietf/Cargo.toml index 2fd82a63..13ed4595 100644 --- a/moq-relay-ietf/Cargo.toml +++ b/moq-relay-ietf/Cargo.toml @@ -23,6 +23,7 @@ path = "src/bin/moq-relay-ietf/main.rs" moq-transport = { path = "../moq-transport", version = "0.11" } moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" } moq-api = { path = "../moq-api", version = "0.2" } +web-transport = { workspace = true } # QUIC url = "2" @@ -58,3 +59,4 @@ log = { workspace = true } env_logger = { workspace = true } tracing = "0.1" tracing-subscriber = "0.3" +thiserror = "2.0.17" diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs index 7f1e00cd..7b5d54c3 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs @@ -12,12 +12,14 @@ use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; use async_trait::async_trait; use fs2::FileExt; +use moq_native_ietf::quic::Client; use moq_transport::coding::TrackNamespace; use serde::{Deserialize, Serialize}; use url::Url; use moq_relay_ietf::{ - Coordinator, NamespaceOrigin, NamespaceRegistration, TrackInfo, TrackRegistration, + Coordinator, CoordinatorError, CoordinatorResult, NamespaceOrigin, NamespaceRegistration, + TrackInfo, TrackRegistration, }; /// Data stored in the shared file @@ -179,7 +181,7 @@ impl Coordinator for FileCoordinator { async fn register_namespace( &self, namespace: &TrackNamespace, - ) -> Result { + ) -> CoordinatorResult { let namespace = namespace.clone(); let relay_url = self.relay_url.to_string(); let file_path = self.file_path.clone(); @@ -216,15 +218,17 @@ impl Coordinator for FileCoordinator { Ok(NamespaceRegistration::new(handle)) } - async fn unregister_namespace(&self, namespace: &TrackNamespace) -> Result<()> { + async fn unregister_namespace(&self, namespace: &TrackNamespace) -> CoordinatorResult<()> { let namespace = namespace.clone(); let file_path = self.file_path.clone(); tokio::task::spawn_blocking(move || unregister_namespace_sync(&file_path, &namespace)) - .await? + .await??; + + Ok(()) } - async fn register_track(&self, track_info: TrackInfo) -> Result { + async fn register_track(&self, track_info: TrackInfo) -> CoordinatorResult { let file_path = self.file_path.clone(); let namespace = track_info.namespace.clone(); let track_name = track_info.track_name.clone(); @@ -264,7 +268,11 @@ impl Coordinator for FileCoordinator { Ok(TrackRegistration::new(handle)) } - async fn unregister_track(&self, namespace: &TrackNamespace, track_name: &str) -> Result<()> { + async fn unregister_track( + &self, + namespace: &TrackNamespace, + track_name: &str, + ) -> CoordinatorResult<()> { let namespace = namespace.clone(); let track_name = track_name.to_string(); let file_path = self.file_path.clone(); @@ -272,69 +280,78 @@ impl Coordinator for FileCoordinator { tokio::task::spawn_blocking(move || { unregister_track_sync(&file_path, &namespace, &track_name) }) - .await? + .await??; + + Ok(()) } - async fn lookup(&self, namespace: &TrackNamespace) -> Result { + async fn lookup( + &self, + namespace: &TrackNamespace, + ) -> CoordinatorResult<(NamespaceOrigin, Option<&Client>)> { let namespace = namespace.clone(); let file_path = self.file_path.clone(); - tokio::task::spawn_blocking(move || { - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&file_path)?; + let result = tokio::task::spawn_blocking( + move || -> Result)>> { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&file_path)?; - file.lock_shared()?; + file.lock_shared()?; - let data = read_data(&file)?; - let key = CoordinatorData::namespace_key(&namespace); + let data = read_data(&file)?; + let key = CoordinatorData::namespace_key(&namespace); - log::debug!("looking up namespace: {}", key); + log::debug!("looking up namespace: {}", key); - // Try exact match first - if let Some(relay_url) = data.namespaces.get(&key) { - file.unlock()?; - let url = Url::parse(relay_url)?; - return Ok(NamespaceOrigin::new(namespace, url)); - } - - // Try prefix matching (find longest matching prefix) - let mut best_match: Option<(String, String)> = None; - for (registered_key, url) in &data.namespaces { - // FIXME(itzmanish): it would be much better to compare on TupleField - // instead of working on strings - let is_prefix = registered_key - .split('/') - .into_iter() - .zip(key.split('/')) - .all(|(a, b)| a == b); - match best_match { - Some((ns, _)) if is_prefix && ns.len() < registered_key.len() => { - best_match = Some((registered_key.clone(), url.clone())); - } - None if is_prefix => { - best_match = Some((registered_key.clone(), url.clone())); + // Try exact match first + if let Some(relay_url) = data.namespaces.get(&key) { + file.unlock()?; + let url = Url::parse(relay_url)?; + return Ok(Some((NamespaceOrigin::new(namespace, url), None))); + } + + // Try prefix matching (find longest matching prefix) + let mut best_match: Option<(String, String)> = None; + for (registered_key, url) in &data.namespaces { + // FIXME(itzmanish): it would be much better to compare on TupleField + // instead of working on strings + let is_prefix = registered_key + .split('/') + .into_iter() + .zip(key.split('/')) + .all(|(a, b)| a == b); + match best_match { + Some((ns, _)) if is_prefix && ns.len() < registered_key.len() => { + best_match = Some((registered_key.clone(), url.clone())); + } + None if is_prefix => { + best_match = Some((registered_key.clone(), url.clone())); + } + _ => {} } - _ => {} } - } - file.unlock()?; + file.unlock()?; + + if let Some((matched_key, relay_url)) = best_match { + let matched_ns = TrackNamespace::from_utf8_path(&matched_key); + let url = Url::parse(&relay_url)?; + return Ok(Some((NamespaceOrigin::new(matched_ns, url), None))); + } - if let Some((matched_key, relay_url)) = best_match { - let matched_ns = TrackNamespace::from_utf8_path(&matched_key); - let url = Url::parse(&relay_url)?; - return Ok(NamespaceOrigin::new(matched_ns, url)); - } + Ok(None) + }, + ) + .await??; - anyhow::bail!("namespace not found: {}", key) - }) - .await? + result.ok_or(CoordinatorError::NamespaceNotFound) } - async fn shutdown(&self) -> Result<()> { + async fn shutdown(&self) -> CoordinatorResult<()> { // Nothing to clean up - file will be unlocked automatically Ok(()) } diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs index 81d148bf..199e825e 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs @@ -110,7 +110,8 @@ async fn main() -> anyhow::Result<()> { // Create a QUIC server for media. let relay = Relay::new(RelayConfig { tls: tls.clone(), - bind: cli.bind, + bind: Some(cli.bind), + endpoints: vec![], qlog_dir: qlog_dir_for_relay, mlog_dir: mlog_dir_for_relay, node: cli.node, diff --git a/moq-relay-ietf/src/consumer.rs b/moq-relay-ietf/src/consumer.rs index 96e2059f..a93d1126 100644 --- a/moq-relay-ietf/src/consumer.rs +++ b/moq-relay-ietf/src/consumer.rs @@ -70,8 +70,7 @@ impl Consumer { let _namespace_registration = self .coordinator .register_namespace(&reader.namespace) - .await - .context("failed to register namespace with coordinator")?; + .await?; // Register the local tracks, unregister on drop let _register = self.locals.register(reader.clone()).await?; diff --git a/moq-relay-ietf/src/coordinator.rs b/moq-relay-ietf/src/coordinator.rs index 7b613494..df1d1ee2 100644 --- a/moq-relay-ietf/src/coordinator.rs +++ b/moq-relay-ietf/src/coordinator.rs @@ -1,8 +1,46 @@ -use anyhow::Result; use async_trait::async_trait; +use moq_native_ietf::quic; use moq_transport::coding::TrackNamespace; use url::Url; +#[derive(Debug, thiserror::Error)] +pub enum CoordinatorError { + #[error("namespace not found")] + NamespaceNotFound, + + #[error("namespace already registered")] + NamespaceAlreadyRegistered, + + #[error("track not found")] + TrackNotFound, + + #[error("track already registered")] + TrackAlreadyRegistered, + + #[error("Internal Error: {0}")] + Other(anyhow::Error), +} + +impl From for CoordinatorError { + fn from(err: anyhow::Error) -> Self { + Self::Other(err) + } +} + +impl From for CoordinatorError { + fn from(err: tokio::task::JoinError) -> Self { + Self::Other(err.into()) + } +} + +impl From for CoordinatorError { + fn from(err: std::io::Error) -> Self { + Self::Other(err.into()) + } +} + +pub type CoordinatorResult = std::result::Result; + /// Handle returned when a namespace is registered with the coordinator. /// /// Dropping this handle automatically unregisters the namespace. @@ -51,22 +89,44 @@ impl TrackRegistration { /// Result of a namespace lookup. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct NamespaceOrigin(TrackNamespace, Url); +pub struct NamespaceOrigin { + namespace: TrackNamespace, + url: Url, + metadata: Option>, +} impl NamespaceOrigin { /// Create a new NamespaceOrigin. pub fn new(namespace: TrackNamespace, url: Url) -> Self { - Self(namespace, url) + Self { + namespace, + url, + metadata: None, + } + } + + pub fn with_metadata(mut self, values: (String, String)) -> Self { + if let Some(metadata) = &mut self.metadata { + metadata.push(values); + } else { + self.metadata = Some(vec![values]); + } + self } /// Get the namespace. pub fn namespace(&self) -> &TrackNamespace { - &self.0 + &self.namespace } /// Get the URL of the relay serving this namespace. pub fn url(&self) -> Url { - self.1.clone() + self.url.clone() + } + + /// Get the metadata associated with this namespace. + pub fn metadata(&self) -> Option> { + self.metadata.clone() } } @@ -116,8 +176,10 @@ pub trait Coordinator: Send + Sync { /// /// A `NamespaceRegistration` handle. The namespace remains registered /// as long as this handle is held. Dropping it unregisters the namespace. - async fn register_namespace(&self, namespace: &TrackNamespace) - -> Result; + async fn register_namespace( + &self, + namespace: &TrackNamespace, + ) -> CoordinatorResult; /// Unregister a namespace. /// @@ -128,7 +190,7 @@ pub trait Coordinator: Send + Sync { /// # Arguments /// /// * `namespace` - The namespace to unregister - async fn unregister_namespace(&self, namespace: &TrackNamespace) -> Result<()>; + async fn unregister_namespace(&self, namespace: &TrackNamespace) -> CoordinatorResult<()>; /// Register a track as available under a namespace. /// @@ -143,7 +205,7 @@ pub trait Coordinator: Send + Sync { /// /// A `TrackRegistration` handle. The track remains registered /// as long as this handle is held. - async fn register_track(&self, track_info: TrackInfo) -> Result; + async fn register_track(&self, track_info: TrackInfo) -> CoordinatorResult; /// Unregister a track. /// @@ -154,7 +216,11 @@ pub trait Coordinator: Send + Sync { /// /// * `namespace` - The namespace containing the track /// * `track_name` - The track name to unregister - async fn unregister_track(&self, namespace: &TrackNamespace, track_name: &str) -> Result<()>; + async fn unregister_track( + &self, + namespace: &TrackNamespace, + track_name: &str, + ) -> CoordinatorResult<()>; /// Lookup where a namespace is served from. /// @@ -170,10 +236,12 @@ pub trait Coordinator: Send + Sync { /// /// # Returns /// - /// - `Ok(NamespaceOrigin::Local)` - Served by this relay - /// - `Ok(NamespaceOrigin::Remote(url))` - Served by remote relay + /// - `Ok(NamespaceOrigin, Option)` - Namespace origin and optional client if available /// - `Err` - Namespace not found anywhere - async fn lookup(&self, namespace: &TrackNamespace) -> Result; + async fn lookup( + &self, + namespace: &TrackNamespace, + ) -> CoordinatorResult<(NamespaceOrigin, Option<&quic::Client>)>; /// Graceful shutdown of the coordinator. /// @@ -181,7 +249,7 @@ pub trait Coordinator: Send + Sync { /// - Unregister all local namespaces and tracks /// - Cancel refresh tasks /// - Close connections to external registries - async fn shutdown(&self) -> Result<()> { + async fn shutdown(&self) -> CoordinatorResult<()> { Ok(()) } } diff --git a/moq-relay-ietf/src/lib.rs b/moq-relay-ietf/src/lib.rs index ff0148b1..aac39326 100644 --- a/moq-relay-ietf/src/lib.rs +++ b/moq-relay-ietf/src/lib.rs @@ -11,10 +11,10 @@ //! //! ```rust,ignore //! use std::sync::Arc; -//! use moq_relay_ietf::{Relay, RelayConfig, LocalCoordinator}; +//! use moq_relay_ietf::{Relay, RelayConfig, FileCoordinator}; //! -//! // Create a coordinator (LocalCoordinator for single-relay deployments) -//! let coordinator = LocalCoordinator::new(); +//! // Create a coordinator (FileCoordinator for multi-relay deployments) +//! let coordinator = FileCoordinator::new("/path/to/coordination/file", "https://relay.example.com"); //! //! // Configure and create the relay //! let relay = Relay::new(RelayConfig { diff --git a/moq-relay-ietf/src/relay.rs b/moq-relay-ietf/src/relay.rs index f843f9fa..662e5a13 100644 --- a/moq-relay-ietf/src/relay.rs +++ b/moq-relay-ietf/src/relay.rs @@ -1,17 +1,32 @@ -use std::{net, path::PathBuf, sync::Arc}; +use std::{future::Future, net, path::PathBuf, pin::Pin, sync::Arc}; use anyhow::Context; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; -use moq_native_ietf::quic; +use moq_native_ietf::quic::{self, Endpoint}; use url::Url; -use crate::{Api, Consumer, Coordinator, Locals, Producer, RemoteManager, Session}; +use crate::{Consumer, Coordinator, Locals, Producer, RemoteManager, Session}; + +// A type alias for boxed future +type ServerFuture = Pin< + Box< + dyn Future< + Output = ( + anyhow::Result<(web_transport::Session, String)>, + quic::Server, + ), + >, + >, +>; /// Configuration for the relay. pub struct RelayConfig { /// Listen on this address - pub bind: net::SocketAddr, + pub bind: Option, + + /// Optional list of endpoints if provided, we won't use bind + pub endpoints: Vec, /// The TLS configuration. pub tls: moq_native_ietf::tls::Config, @@ -35,7 +50,7 @@ pub struct RelayConfig { /// MoQ Relay server. pub struct Relay { - quic: quic::Endpoint, + quic_endpoints: Vec, announce_url: Option, mlog_dir: Option, locals: Locals, @@ -45,12 +60,24 @@ pub struct Relay { impl Relay { pub fn new(config: RelayConfig) -> anyhow::Result { - // Create a QUIC endpoint that can be used for both clients and servers. - let quic = quic::Endpoint::new(quic::Config::new( - config.bind, - config.qlog_dir, - config.tls.clone(), - ))?; + if config.bind.is_some() && !config.endpoints.is_empty() { + anyhow::bail!("cannot specify both bind and endpoints"); + } + + let endpoints = if config.bind.is_some() { + let endpoint = quic::Endpoint::new(quic::Config::new( + config.bind.unwrap(), + config.qlog_dir.clone(), + config.tls.clone(), + ))?; + vec![endpoint] + } else { + config.endpoints + }; + + if endpoints.is_empty() { + anyhow::bail!("no endpoints available to start the server"); + } // Validate mlog directory if provided if let Some(mlog_dir) = &config.mlog_dir { @@ -65,11 +92,17 @@ impl Relay { let locals = Locals::new(); + // FIXME(itzmanish): have a generic filter to find endpoints for forward, remote etc. + let remote_clients = endpoints + .iter() + .map(|endpoint| endpoint.client.clone()) + .collect::>(); + // Create remote manager - uses coordinator for namespace lookups - let remotes = RemoteManager::new(config.coordinator.clone(), config.tls.clone()); + let remotes = RemoteManager::new(config.coordinator.clone(), remote_clients)?; Ok(Self { - quic, + quic_endpoints: endpoints, announce_url: config.announce, mlog_dir: config.mlog_dir, locals, @@ -89,8 +122,7 @@ impl Relay { log::info!("forwarding announces to {}", url); // Establish a QUIC connection to the forward URL - let (session, _quic_client_initial_cid) = self - .quic + let (session, _quic_client_initial_cid) = self.quic_endpoints[0] .client .connect(url, None) .await @@ -128,15 +160,46 @@ impl Relay { None }; - // Start the QUIC server loop - let mut server = self.quic.server.context("missing TLS certificate")?; - log::info!("listening on {}", server.local_addr()?); + let servers: Vec = self + .quic_endpoints + .into_iter() + .map(|endpoint| { + endpoint + .server + .context("missing TLS certificate for server") + }) + .collect::>()?; + + // This will hold the futures for all our listening servers. + let mut accepts: FuturesUnordered = FuturesUnordered::new(); + for mut server in servers { + log::info!("listening on {}", server.local_addr()?); + + // Create a future, box it, and push it to the collection. + accepts.push( + async move { + let conn = server.accept().await.context("accept failed"); + (conn, server) + } + .boxed(), + ); + } loop { tokio::select! { - // Accept a new QUIC connection - res = server.accept() => { - let (conn, connection_id) = res.context("failed to accept QUIC connection")?; + // This branch polls all the `accept` futures concurrently. + Some((conn_result, mut server)) = accepts.next() => { + // An accept operation has completed. + // First, immediately queue up the next accept() call for this server. + accepts.push( + async move { + let conn = server.accept().await.context("accept failed"); + (conn, server) + } + .boxed(), + ); + + let (conn, connection_id) = conn_result.context("failed to accept QUIC connection")?; // Construct mlog path from connection ID if mlog directory is configured let mlog_path = self.mlog_dir.as_ref() diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index c4c61d9b..56c269ed 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -17,18 +17,21 @@ use crate::Coordinator; #[derive(Clone)] pub struct RemoteManager { coordinator: Arc, - tls: moq_native_ietf::tls::Config, + clients: Vec, remotes: Arc>>, } impl RemoteManager { /// Create a new RemoteManager. - pub fn new(coordinator: Arc, tls: moq_native_ietf::tls::Config) -> Self { - Self { + pub fn new( + coordinator: Arc, + clients: Vec, + ) -> anyhow::Result { + Ok(Self { coordinator, - tls, + clients, remotes: Arc::new(Mutex::new(HashMap::new())), - } + }) } /// Subscribe to a track from a remote relay. @@ -45,22 +48,26 @@ impl RemoteManager { track_name: String, ) -> anyhow::Result> { // Ask coordinator where this namespace lives - let origin = match self.coordinator.lookup(&namespace).await { - Ok(origin) => origin, + let (origin, client) = match self.coordinator.lookup(&namespace).await { + Ok((origin, client)) => (origin, client), Err(_) => return Ok(None), // Namespace not found anywhere }; let url = origin.url(); // Get or create a connection to the remote relay - let remote = self.get_or_connect(&url).await?; + let remote = self.get_or_connect(&url, client).await?; // Subscribe to the track on the remote remote.subscribe(namespace, track_name).await } /// Get an existing remote connection or create a new one. - async fn get_or_connect(&self, url: &Url) -> anyhow::Result { + async fn get_or_connect( + &self, + url: &Url, + client: Option<&quic::Client>, + ) -> anyhow::Result { let mut remotes = self.remotes.lock().await; // Check if we already have a connection @@ -72,9 +79,11 @@ impl RemoteManager { remotes.remove(url); } + let client = client.unwrap_or(&self.clients[0]); + // Create a new connection with its own QUIC client log::info!("connecting to remote relay: {}", url); - let remote = Remote::connect(url.clone(), self.tls.clone()).await?; + let remote = Remote::connect(url.clone(), client).await?; remotes.insert(url.clone(), remote.clone()); @@ -99,14 +108,9 @@ pub struct Remote { impl Remote { /// Connect to a remote relay with a dedicated QUIC client. - async fn connect(url: Url, tls: moq_native_ietf::tls::Config) -> anyhow::Result { - // Create a dedicated QUIC endpoint for this remote connection - let bind_addr: std::net::SocketAddr = "[::]:0".parse()?; - let quic_config = quic::Config::new(bind_addr, None, tls); - let quic = quic::Endpoint::new(quic_config)?; - + async fn connect(url: Url, client: &quic::Client) -> anyhow::Result { // Connect to the remote relay (DNS resolution happens inside connect) - let (session, _cid) = quic.client.connect(&url, None).await?; + let (session, _cid) = client.connect(&url, None).await?; let (session, subscriber) = moq_transport::session::Subscriber::connect(session).await?; // Spawn a task to run the session diff --git a/moq-transport/src/coding/track_namespace.rs b/moq-transport/src/coding/track_namespace.rs index 338fa5ad..ff63d1a4 100644 --- a/moq-transport/src/coding/track_namespace.rs +++ b/moq-transport/src/coding/track_namespace.rs @@ -1,6 +1,18 @@ use super::{Decode, DecodeError, Encode, EncodeError, TupleField}; use core::hash::{Hash, Hasher}; +use std::convert::TryFrom; use std::fmt; +use thiserror::Error; + +/// Error type for TrackNamespace conversion failures +#[derive(Debug, Clone, Error, PartialEq, Eq)] +pub enum TrackNamespaceError { + #[error("too many fields: {0} exceeds maximum of {1}")] + TooManyFields(usize, usize), + + #[error("field too large: {0} bytes exceeds maximum of {1}")] + FieldTooLarge(usize, usize), +} /// TrackNamespace #[derive(Clone, Default, Eq, PartialEq)] @@ -92,11 +104,69 @@ impl fmt::Display for TrackNamespace { } } +impl TryFrom> for TrackNamespace { + type Error = TrackNamespaceError; + + fn try_from(fields: Vec) -> Result { + if fields.len() > Self::MAX_FIELDS { + return Err(TrackNamespaceError::TooManyFields( + fields.len(), + Self::MAX_FIELDS, + )); + } + for field in &fields { + if field.value.len() > TupleField::MAX_VALUE_SIZE { + return Err(TrackNamespaceError::FieldTooLarge( + field.value.len(), + TupleField::MAX_VALUE_SIZE, + )); + } + } + Ok(Self { fields }) + } +} + +impl TryFrom<&str> for TrackNamespace { + type Error = TrackNamespaceError; + + fn try_from(path: &str) -> Result { + let fields: Vec = path.split('/').map(TupleField::from_utf8).collect(); + Self::try_from(fields) + } +} + +impl TryFrom for TrackNamespace { + type Error = TrackNamespaceError; + + fn try_from(path: String) -> Result { + Self::try_from(path.as_str()) + } +} + +impl TryFrom> for TrackNamespace { + type Error = TrackNamespaceError; + + fn try_from(parts: Vec<&str>) -> Result { + let fields: Vec = parts.into_iter().map(TupleField::from_utf8).collect(); + Self::try_from(fields) + } +} + +impl TryFrom> for TrackNamespace { + type Error = TrackNamespaceError; + + fn try_from(parts: Vec) -> Result { + let fields: Vec = parts.iter().map(|s| TupleField::from_utf8(s)).collect(); + Self::try_from(fields) + } +} + #[cfg(test)] mod tests { use super::*; use bytes::Bytes; use bytes::BytesMut; + use std::convert::TryInto; #[test] fn encode_decode() { @@ -165,4 +235,72 @@ mod tests { DecodeError::FieldBoundsExceeded(_) )); } + + #[test] + fn try_from_str() { + let ns: TrackNamespace = "test/path/to/resource".try_into().unwrap(); + assert_eq!(ns.fields.len(), 4); + assert_eq!(ns.to_utf8_path(), "/test/path/to/resource"); + } + + #[test] + fn try_from_string() { + let path = String::from("test/path"); + let ns: TrackNamespace = path.try_into().unwrap(); + assert_eq!(ns.fields.len(), 2); + assert_eq!(ns.to_utf8_path(), "/test/path"); + } + + #[test] + fn try_from_vec_str() { + let parts = vec!["test", "path", "to", "resource"]; + let ns: TrackNamespace = parts.try_into().unwrap(); + assert_eq!(ns.fields.len(), 4); + assert_eq!(ns.to_utf8_path(), "/test/path/to/resource"); + } + + #[test] + fn try_from_vec_string() { + let parts = vec![String::from("test"), String::from("path")]; + let ns: TrackNamespace = parts.try_into().unwrap(); + assert_eq!(ns.fields.len(), 2); + assert_eq!(ns.to_utf8_path(), "/test/path"); + } + + #[test] + fn try_from_vec_tuple_field() { + let fields = vec![ + TupleField::from_utf8("test"), + TupleField::from_utf8("path"), + ]; + let ns: TrackNamespace = fields.try_into().unwrap(); + assert_eq!(ns.fields.len(), 2); + assert_eq!(ns.to_utf8_path(), "/test/path"); + } + + #[test] + fn try_from_too_many_fields() { + let mut fields = Vec::new(); + for i in 0..TrackNamespace::MAX_FIELDS + 1 { + fields.push(TupleField::from_utf8(&format!("field{}", i))); + } + let result: Result = fields.try_into(); + assert!(matches!( + result.unwrap_err(), + TrackNamespaceError::TooManyFields(33, 32) + )); + } + + #[test] + fn try_from_field_too_large() { + let large_value = "x".repeat(TupleField::MAX_VALUE_SIZE + 1); + let fields = vec![TupleField { + value: large_value.into_bytes(), + }]; + let result: Result = fields.try_into(); + assert!(matches!( + result.unwrap_err(), + TrackNamespaceError::FieldTooLarge(4097, 4096) + )); + } } From 9d4e76d28a6de639b69f626ee34c96093a004429 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 2 Dec 2025 19:11:18 +0530 Subject: [PATCH 05/10] docs: clarify coordinator file usage in CLI help text and add FIXME for unregister_namespace --- moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs | 2 ++ moq-relay-ietf/src/bin/moq-relay-ietf/main.rs | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs index 7b5d54c3..8ce15dd9 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs @@ -218,6 +218,8 @@ impl Coordinator for FileCoordinator { Ok(NamespaceRegistration::new(handle)) } + // FIXME(itzmanish): Not being called currently but we need to call this on publish_namespace_done + // currently unregister happens on drop of namespace async fn unregister_namespace(&self, namespace: &TrackNamespace) -> CoordinatorResult<()> { let namespace = namespace.clone(); let file_path = self.file_path.clone(); diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs index 199e825e..533d40e2 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/main.rs @@ -59,6 +59,10 @@ pub struct Cli { /// Path to the shared coordinator file for multi-relay coordination. /// Multiple relay instances can share namespace/track registration via this file. + /// User doesn't have to explicitly create and populate anything. This path will be + /// used by file coordinator to store namespace/track registration information. + /// User need to make sure if multiple relay's are being used all of them have same path + /// to this file. #[arg(long, default_value = "/tmp/moq-coordinator.json")] pub coordinator_file: PathBuf, } From 838b5ceae0ea55f13ed4238c707bffaaf3a11758 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 2 Dec 2025 19:15:32 +0530 Subject: [PATCH 06/10] fix: add lifetime parameter to lookup method signature for proper borrow checking --- moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs | 4 ++-- moq-relay-ietf/src/coordinator.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs index 8ce15dd9..aaf0502a 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs @@ -287,10 +287,10 @@ impl Coordinator for FileCoordinator { Ok(()) } - async fn lookup( + async fn lookup<'a>( &self, namespace: &TrackNamespace, - ) -> CoordinatorResult<(NamespaceOrigin, Option<&Client>)> { + ) -> CoordinatorResult<(NamespaceOrigin, Option<&'a Client>)> { let namespace = namespace.clone(); let file_path = self.file_path.clone(); diff --git a/moq-relay-ietf/src/coordinator.rs b/moq-relay-ietf/src/coordinator.rs index df1d1ee2..5fed0c2c 100644 --- a/moq-relay-ietf/src/coordinator.rs +++ b/moq-relay-ietf/src/coordinator.rs @@ -238,10 +238,10 @@ pub trait Coordinator: Send + Sync { /// /// - `Ok(NamespaceOrigin, Option)` - Namespace origin and optional client if available /// - `Err` - Namespace not found anywhere - async fn lookup( + async fn lookup<'a>( &self, namespace: &TrackNamespace, - ) -> CoordinatorResult<(NamespaceOrigin, Option<&quic::Client>)>; + ) -> CoordinatorResult<(NamespaceOrigin, Option<&'a quic::Client>)>; /// Graceful shutdown of the coordinator. /// From 9b91003091c924cde62e154261e89780aeaf325d Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 2 Dec 2025 19:20:54 +0530 Subject: [PATCH 07/10] fix: prevent file truncation when opening for read/write in FileCoordinator --- moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs index aaf0502a..7e1b3d1a 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs @@ -77,6 +77,7 @@ fn unregister_namespace_sync(file_path: &Path, namespace: &TrackNamespace) -> Re .read(true) .write(true) .create(true) + .truncate(false) .open(file_path)?; file.lock_exclusive()?; @@ -107,6 +108,7 @@ fn unregister_track_sync( .read(true) .write(true) .create(true) + .truncate(false) .open(file_path)?; file.lock_exclusive()?; @@ -193,6 +195,7 @@ impl Coordinator for FileCoordinator { .read(true) .write(true) .create(true) + .truncate(false) .open(&file_path)?; file.lock_exclusive()?; @@ -300,6 +303,7 @@ impl Coordinator for FileCoordinator { .read(true) .write(true) .create(true) + .truncate(false) .open(&file_path)?; file.lock_shared()?; @@ -323,7 +327,6 @@ impl Coordinator for FileCoordinator { // instead of working on strings let is_prefix = registered_key .split('/') - .into_iter() .zip(key.split('/')) .all(|(a, b)| a == b); match best_match { From ecc63ee7ddf6c9b1d295fd699fc9ec74b040c1e2 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 2 Dec 2025 19:28:47 +0530 Subject: [PATCH 08/10] refactor: remove track registration from coordinator interface and file implementation --- .../bin/moq-relay-ietf/file_coordinator.rs | 113 +----------------- moq-relay-ietf/src/coordinator.rs | 69 +---------- 2 files changed, 2 insertions(+), 180 deletions(-) diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs index 7e1b3d1a..176b602c 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs @@ -1,7 +1,7 @@ //! File-based coordinator for multi-relay deployments. //! //! This coordinator uses a shared JSON file with file locking to coordinate -//! namespace/track registration across multiple relay instances. No separate +//! namespace registration across multiple relay instances. No separate //! server process is required. use std::collections::HashMap; @@ -19,7 +19,6 @@ use url::Url; use moq_relay_ietf::{ Coordinator, CoordinatorError, CoordinatorResult, NamespaceOrigin, NamespaceRegistration, - TrackInfo, TrackRegistration, }; /// Data stored in the shared file @@ -27,18 +26,12 @@ use moq_relay_ietf::{ struct CoordinatorData { /// Maps namespace path (e.g., "/foo/bar") to relay URL namespaces: HashMap, - /// Maps "namespace_path:track_name" to track_alias - tracks: HashMap, } impl CoordinatorData { fn namespace_key(namespace: &TrackNamespace) -> String { namespace.to_utf8_path() } - - fn track_key(namespace: &TrackNamespace, track_name: &str) -> String { - format!("{}:{}", Self::namespace_key(namespace), track_name) - } } /// Handle that unregisters a namespace when dropped @@ -55,22 +48,6 @@ impl Drop for NamespaceUnregisterHandle { } } -/// Handle that unregisters a track when dropped -struct TrackUnregisterHandle { - namespace: TrackNamespace, - track_name: String, - file_path: PathBuf, -} - -impl Drop for TrackUnregisterHandle { - fn drop(&mut self) { - if let Err(err) = unregister_track_sync(&self.file_path, &self.namespace, &self.track_name) - { - log::warn!("failed to unregister track on drop: {}", err); - } - } -} - /// Synchronous helper for unregistering namespace (used in Drop) fn unregister_namespace_sync(file_path: &Path, namespace: &TrackNamespace) -> Result<()> { let file = OpenOptions::new() @@ -88,37 +65,6 @@ fn unregister_namespace_sync(file_path: &Path, namespace: &TrackNamespace) -> Re log::debug!("unregistering namespace: {}", key); data.namespaces.remove(&key); - // Remove all tracks under this namespace - let prefix = format!("{}:", key); - data.tracks.retain(|k, _| !k.starts_with(&prefix)); - - write_data(&file, &data)?; - file.unlock()?; - - Ok(()) -} - -/// Synchronous helper for unregistering track (used in Drop) -fn unregister_track_sync( - file_path: &Path, - namespace: &TrackNamespace, - track_name: &str, -) -> Result<()> { - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(false) - .open(file_path)?; - - file.lock_exclusive()?; - - let mut data = read_data(&file)?; - let key = CoordinatorData::track_key(namespace, track_name); - - log::debug!("unregistering track: {}", key); - data.tracks.remove(&key); - write_data(&file, &data)?; file.unlock()?; @@ -233,63 +179,6 @@ impl Coordinator for FileCoordinator { Ok(()) } - async fn register_track(&self, track_info: TrackInfo) -> CoordinatorResult { - let file_path = self.file_path.clone(); - let namespace = track_info.namespace.clone(); - let track_name = track_info.track_name.clone(); - let track_alias = track_info.track_alias; - - let ns_clone = namespace.clone(); - let tn_clone = track_name.clone(); - - tokio::task::spawn_blocking(move || { - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&file_path)?; - - file.lock_exclusive()?; - - let mut data = read_data(&file)?; - let key = CoordinatorData::track_key(&ns_clone, &tn_clone); - - log::info!("registering track: {}", key); - data.tracks.insert(key, track_alias); - - write_data(&file, &data)?; - file.unlock()?; - - Ok::<_, anyhow::Error>(()) - }) - .await??; - - let handle = TrackUnregisterHandle { - namespace, - track_name, - file_path: self.file_path.clone(), - }; - - Ok(TrackRegistration::new(handle)) - } - - async fn unregister_track( - &self, - namespace: &TrackNamespace, - track_name: &str, - ) -> CoordinatorResult<()> { - let namespace = namespace.clone(); - let track_name = track_name.to_string(); - let file_path = self.file_path.clone(); - - tokio::task::spawn_blocking(move || { - unregister_track_sync(&file_path, &namespace, &track_name) - }) - .await??; - - Ok(()) - } - async fn lookup<'a>( &self, namespace: &TrackNamespace, diff --git a/moq-relay-ietf/src/coordinator.rs b/moq-relay-ietf/src/coordinator.rs index 5fed0c2c..feb0a123 100644 --- a/moq-relay-ietf/src/coordinator.rs +++ b/moq-relay-ietf/src/coordinator.rs @@ -11,12 +11,6 @@ pub enum CoordinatorError { #[error("namespace already registered")] NamespaceAlreadyRegistered, - #[error("track not found")] - TrackNotFound, - - #[error("track already registered")] - TrackAlreadyRegistered, - #[error("Internal Error: {0}")] Other(anyhow::Error), } @@ -70,23 +64,6 @@ impl NamespaceRegistration { } } -/// Handle returned when a track is registered under a namespace. -/// -/// Dropping this handle automatically unregisters the track. -/// The namespace remains registered even after all tracks are dropped. -pub struct TrackRegistration { - _inner: Box, -} - -impl TrackRegistration { - /// Create a new track registration handle. - pub fn new(inner: T) -> Self { - Self { - _inner: Box::new(inner), - } - } -} - /// Result of a namespace lookup. #[derive(Debug, Clone, PartialEq, Eq)] pub struct NamespaceOrigin { @@ -130,24 +107,10 @@ impl NamespaceOrigin { } } -/// Information about a track within a namespace. -#[derive(Debug, Clone)] -pub struct TrackInfo { - /// The track namespace - pub namespace: TrackNamespace, - - /// The track name within the namespace - pub track_name: String, - - /// Track alias for quick lookup - pub track_alias: u64, -} - -/// Coordinator handles namespace and track registration/discovery across relays. +/// Coordinator handles namespace registration/discovery across relays. /// /// Implementations are responsible for: /// - Tracking which namespaces are served locally -/// - Tracking which tracks are available under each namespace /// - Caching remote namespace lookups /// - Communicating with external registries (HTTP API, Redis, etc.) /// - Periodic refresh/heartbeat of registrations @@ -192,36 +155,6 @@ pub trait Coordinator: Send + Sync { /// * `namespace` - The namespace to unregister async fn unregister_namespace(&self, namespace: &TrackNamespace) -> CoordinatorResult<()>; - /// Register a track as available under a namespace. - /// - /// Called when a publisher sends PUBLISH for a track. - /// The namespace must already be registered. - /// - /// # Arguments - /// - /// * `track_info` - Information about the track being registered - /// - /// # Returns - /// - /// A `TrackRegistration` handle. The track remains registered - /// as long as this handle is held. - async fn register_track(&self, track_info: TrackInfo) -> CoordinatorResult; - - /// Unregister a track. - /// - /// Called when a publisher sends PUBLISH_DONE. - /// Only the track is removed, not the namespace. - /// - /// # Arguments - /// - /// * `namespace` - The namespace containing the track - /// * `track_name` - The track name to unregister - async fn unregister_track( - &self, - namespace: &TrackNamespace, - track_name: &str, - ) -> CoordinatorResult<()>; - /// Lookup where a namespace is served from. /// /// Called when a subscriber requests a namespace. From f690c3195495996f25975f47f100807f6eab8555 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 2 Dec 2025 19:51:51 +0530 Subject: [PATCH 09/10] fix: update lookup signature to return owned Client instead of reference --- moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs | 6 +++--- moq-relay-ietf/src/coordinator.rs | 4 ++-- moq-relay-ietf/src/remote.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs index 176b602c..8e403693 100644 --- a/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs +++ b/moq-relay-ietf/src/bin/moq-relay-ietf/file_coordinator.rs @@ -179,15 +179,15 @@ impl Coordinator for FileCoordinator { Ok(()) } - async fn lookup<'a>( + async fn lookup( &self, namespace: &TrackNamespace, - ) -> CoordinatorResult<(NamespaceOrigin, Option<&'a Client>)> { + ) -> CoordinatorResult<(NamespaceOrigin, Option)> { let namespace = namespace.clone(); let file_path = self.file_path.clone(); let result = tokio::task::spawn_blocking( - move || -> Result)>> { + move || -> Result)>> { let file = OpenOptions::new() .read(true) .write(true) diff --git a/moq-relay-ietf/src/coordinator.rs b/moq-relay-ietf/src/coordinator.rs index feb0a123..b84e5b65 100644 --- a/moq-relay-ietf/src/coordinator.rs +++ b/moq-relay-ietf/src/coordinator.rs @@ -171,10 +171,10 @@ pub trait Coordinator: Send + Sync { /// /// - `Ok(NamespaceOrigin, Option)` - Namespace origin and optional client if available /// - `Err` - Namespace not found anywhere - async fn lookup<'a>( + async fn lookup( &self, namespace: &TrackNamespace, - ) -> CoordinatorResult<(NamespaceOrigin, Option<&'a quic::Client>)>; + ) -> CoordinatorResult<(NamespaceOrigin, Option)>; /// Graceful shutdown of the coordinator. /// diff --git a/moq-relay-ietf/src/remote.rs b/moq-relay-ietf/src/remote.rs index 56c269ed..8d2c645b 100644 --- a/moq-relay-ietf/src/remote.rs +++ b/moq-relay-ietf/src/remote.rs @@ -56,7 +56,7 @@ impl RemoteManager { let url = origin.url(); // Get or create a connection to the remote relay - let remote = self.get_or_connect(&url, client).await?; + let remote = self.get_or_connect(&url, client.as_ref()).await?; // Subscribe to the track on the remote remote.subscribe(namespace, track_name).await From 395ef6aa1989580c0f84740705a7c1f59a758433 Mon Sep 17 00:00:00 2001 From: Manish Date: Tue, 2 Dec 2025 20:05:21 +0530 Subject: [PATCH 10/10] style: format track namespace test by removing unnecessary line breaks --- moq-transport/src/coding/track_namespace.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/moq-transport/src/coding/track_namespace.rs b/moq-transport/src/coding/track_namespace.rs index ff63d1a4..4179038a 100644 --- a/moq-transport/src/coding/track_namespace.rs +++ b/moq-transport/src/coding/track_namespace.rs @@ -269,10 +269,7 @@ mod tests { #[test] fn try_from_vec_tuple_field() { - let fields = vec![ - TupleField::from_utf8("test"), - TupleField::from_utf8("path"), - ]; + let fields = vec![TupleField::from_utf8("test"), TupleField::from_utf8("path")]; let ns: TrackNamespace = fields.try_into().unwrap(); assert_eq!(ns.fields.len(), 2); assert_eq!(ns.to_utf8_path(), "/test/path");