Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions moq-relay-ietf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ edition = "2021"
keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

[lib]
name = "moq_relay_ietf"
path = "src/lib.rs"

[[bin]]
name = "moq-relay-ietf"
path = "src/main.rs"

[dependencies]
moq-transport = { path = "../moq-transport", version = "0.11" }
moq-native-ietf = { path = "../moq-native-ietf", version = "0.5" }
Expand All @@ -22,6 +30,7 @@ url = "2"
# Async stuff
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"

# Web server to serve the fingerprint
axum = { version = "0.7", features = ["tokio"] }
Expand Down
88 changes: 0 additions & 88 deletions moq-relay-ietf/src/api.rs

This file was deleted.

44 changes: 30 additions & 14 deletions moq-relay-ietf/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,36 @@
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use moq_transport::{
serve::Tracks,
session::{Announced, SessionError, Subscriber},

Check warning on line 5 in moq-relay-ietf/src/consumer.rs

View workflow job for this annotation

GitHub Actions / build

Diff in /home/runner/work/moq-rs/moq-rs/moq-relay-ietf/src/consumer.rs
};

use crate::{Api, Locals, Producer};
use crate::{Locals, Producer};
use crate::control_plane::{ControlPlane, Origin};
use url::Url;

/// Consumer of tracks from a remote Publisher
#[derive(Clone)]
pub struct Consumer {
pub struct Consumer<CP: ControlPlane> {
remote: Subscriber,
locals: Locals,
api: Option<Api>,
forward: Option<Producer>, // Forward all announcements to this subscriber
control_plane: Option<CP>,
node_url: Option<Url>,
forward: Option<Producer<CP>>, // Forward all announcements to this subscriber
}

impl Consumer {
impl<CP: ControlPlane> Consumer<CP> {
pub fn new(
remote: Subscriber,
locals: Locals,
api: Option<Api>,
forward: Option<Producer>,
control_plane: Option<CP>,
node_url: Option<Url>,
forward: Option<Producer<CP>>,
) -> Self {
Self {
remote,
locals,
api,
control_plane,
node_url,
forward,
}
}
Expand Down Expand Up @@ -64,12 +69,23 @@
// Produce the tracks for this announce and return the reader
let (_, mut request, reader) = Tracks::new(announce.namespace.clone()).produce();

// Start refreshing the API origin, if any
if let Some(api) = self.api.as_ref() {
let mut refresh = api.set_origin(reader.namespace.to_utf8_path()).await?;
tasks.push(
async move { refresh.run().await.context("failed refreshing origin") }.boxed(),
);
// Start refreshing the control plane origin, if any
if let Some(control_plane) = self.control_plane.as_ref() {
if let Some(node_url) = &self.node_url {
let origin = Origin {
url: node_url.clone(),

Check warning on line 76 in moq-relay-ietf/src/consumer.rs

View workflow job for this annotation

GitHub Actions / build

Diff in /home/runner/work/moq-rs/moq-rs/moq-relay-ietf/src/consumer.rs
};
let namespace = reader.namespace.to_utf8_path();

// Set the origin initially
control_plane.set_origin(&namespace, origin.clone()).await?;

// Create and spawn refresher task
let mut refresh = control_plane.create_refresher(namespace, origin);
tasks.push(
async move { refresh.run().await.context("failed refreshing origin") }.boxed(),
);
}
}

// Register the local tracks, unregister on drop
Expand Down
37 changes: 37 additions & 0 deletions moq-relay-ietf/src/control_plane.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use anyhow::Result;
use async_trait::async_trait;
use url::Url;

/// Origin information for routing
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Origin {
pub url: Url,
}

/// Trait for control plane operations that enable cross-relay routing and state sharing
#[async_trait]
pub trait ControlPlane: Send + Sync + Clone + Default + 'static {
/// Get the origin URL for a given namespace
async fn get_origin(&self, namespace: &str) -> Result<Option<Origin>>;

/// Set/register the origin for a given namespace
async fn set_origin(&self, namespace: &str, origin: Origin) -> Result<()>;

/// Delete/unregister the origin for a given namespace
async fn delete_origin(&self, namespace: &str) -> Result<()>;

Check warning on line 22 in moq-relay-ietf/src/control_plane.rs

View workflow job for this annotation

GitHub Actions / build

Diff in /home/runner/work/moq-rs/moq-rs/moq-relay-ietf/src/control_plane.rs
/// Create a refresher that periodically updates the origin registration
/// Returns a future that runs the refresh loop
fn create_refresher(
&self,
namespace: String,
origin: Origin,
) -> Box<dyn ControlPlaneRefresher>;
}

/// Trait for periodically refreshing origin registrations
#[async_trait]
pub trait ControlPlaneRefresher: Send + 'static {
/// Run the refresh loop (should run indefinitely until dropped)
async fn run(&mut self) -> Result<()>;
}
127 changes: 127 additions & 0 deletions moq-relay-ietf/src/control_plane_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use anyhow::Result;
use async_trait::async_trait;
use url::Url;

use crate::control_plane::{ControlPlane, ControlPlaneRefresher, Origin};

/// HTTP-based control plane implementation using moq-api
#[derive(Clone)]
pub struct HttpControlPlane {
client: moq_api::Client,
node: Url,
}

impl HttpControlPlane {
pub fn new(api_url: Url, node_url: Url) -> Self {
let client = moq_api::Client::new(api_url);
Self {
client,
node: node_url,
}
}

pub fn node_url(&self) -> &Url {
&self.node
}
}

impl Default for HttpControlPlane {
fn default() -> Self {

Check warning on line 29 in moq-relay-ietf/src/control_plane_http.rs

View workflow job for this annotation

GitHub Actions / build

Diff in /home/runner/work/moq-rs/moq-rs/moq-relay-ietf/src/control_plane_http.rs
// This is a stub implementation - in practice you'd need valid URLs
// The actual instance should be created via `new()` with proper config
panic!("HttpControlPlane requires API URL and node URL - use HttpControlPlane::new() instead")
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Default implementation for HttpControlPlane panics at runtime, which is problematic since ControlPlane trait requires Default. This creates a trait implementation that violates the principle of least surprise and will cause runtime panics. Consider either removing the Default requirement from the ControlPlane trait, or providing a safe default implementation (e.g., with placeholder URLs or an Option-wrapped state).

Suggested change
// This is a stub implementation - in practice you'd need valid URLs
// The actual instance should be created via `new()` with proper config
panic!("HttpControlPlane requires API URL and node URL - use HttpControlPlane::new() instead")
// Safe default using placeholder URLs
let api_url = Url::parse("http://localhost").unwrap();
let node_url = Url::parse("http://localhost").unwrap();
HttpControlPlane::new(api_url, node_url)

Copilot uses AI. Check for mistakes.
}
}

#[async_trait]
impl ControlPlane for HttpControlPlane {
async fn get_origin(&self, namespace: &str) -> Result<Option<Origin>> {
match self.client.get_origin(namespace).await? {
Some(origin) => Ok(Some(Origin { url: origin.url })),
None => Ok(None),
}
}

async fn set_origin(&self, namespace: &str, origin: Origin) -> Result<()> {
let moq_origin = moq_api::Origin { url: origin.url };
self.client.set_origin(namespace, moq_origin).await?;
Ok(())
}

async fn delete_origin(&self, namespace: &str) -> Result<()> {
self.client.delete_origin(namespace).await?;
Ok(())
}

fn create_refresher(
&self,
namespace: String,

Check warning on line 58 in moq-relay-ietf/src/control_plane_http.rs

View workflow job for this annotation

GitHub Actions / build

Diff in /home/runner/work/moq-rs/moq-rs/moq-relay-ietf/src/control_plane_http.rs
origin: Origin,
) -> Box<dyn ControlPlaneRefresher> {
Box::new(HttpRefresher::new(
self.client.clone(),
namespace,
origin,
))
}
}

/// Periodically refreshes the origin registration via HTTP
pub struct HttpRefresher {
client: moq_api::Client,
namespace: String,
origin: Origin,
refresh: tokio::time::Interval,
}

impl HttpRefresher {
fn new(client: moq_api::Client, namespace: String, origin: Origin) -> Self {
// Refresh every 5 minutes
let duration = tokio::time::Duration::from_secs(300);
let mut refresh = tokio::time::interval(duration);
refresh.reset_after(duration); // skip the first tick

Self {
client,
namespace,
origin,
refresh,
}
}

async fn update(&self) -> Result<()> {
log::debug!(
"registering origin: namespace={} url={}",
self.namespace,
self.origin.url
);
let moq_origin = moq_api::Origin {

Check warning on line 98 in moq-relay-ietf/src/control_plane_http.rs

View workflow job for this annotation

GitHub Actions / build

Diff in /home/runner/work/moq-rs/moq-rs/moq-relay-ietf/src/control_plane_http.rs
url: self.origin.url.clone(),
};
self.client
.set_origin(&self.namespace, moq_origin)
.await?;
Ok(())
}
}

#[async_trait]
impl ControlPlaneRefresher for HttpRefresher {
async fn run(&mut self) -> Result<()> {
loop {
self.refresh.tick().await;
self.update().await?;
}
}
}

impl Drop for HttpRefresher {
fn drop(&mut self) {
let namespace = self.namespace.clone();
let client = self.client.clone();
log::debug!("removing origin: namespace={}", namespace);
tokio::spawn(async move {
let _ = client.delete_origin(&namespace).await;
});
}
}
Loading
Loading