Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 98 additions & 10 deletions rs/hang-c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ use tokio::runtime::Runtime;
use url::Url;

use bytes::Bytes;
use hang::catalog::{Video, VideoConfig, H264};
use hang::catalog::{Audio, AudioConfig, Video, VideoConfig, AAC, H264};
use hang::model::{Frame, Timestamp, TrackProducer};
use hang::{Catalog, CatalogProducer};
use moq_lite::{BroadcastProducer, Track};
use std::ffi::CStr;
use std::os::raw::c_char;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex, OnceLock};
use std::sync::Mutex;
use std::thread::JoinHandle;
use std::{collections::HashMap, time::Duration};

static IMPORT: OnceLock<Mutex<ImportJoy>> = OnceLock::new();
static HANDLE: OnceLock<Mutex<JoinHandle<()>>> = OnceLock::new();
static IMPORT: Mutex<Option<ImportJoy>> = Mutex::new(None);
static HANDLE: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);
static RUNNING: AtomicBool = AtomicBool::new(false);

/// # Safety
Expand Down Expand Up @@ -63,12 +63,22 @@ pub unsafe extern "C" fn hang_start_from_c(
});
});

let _ = HANDLE.set(Mutex::new(handle));
if let Ok(mut guard) = HANDLE.lock() {
*guard = Some(handle);
}
}

#[no_mangle]
pub extern "C" fn hang_stop_from_c() {
RUNNING.store(false, Ordering::Relaxed);
if let Ok(mut guard) = HANDLE.lock() {
if let Some(handle) = guard.take() {
let _ = handle.join();
}
}
if let Ok(mut guard) = IMPORT.lock() {
*guard = None;
}
}

/// # Safety
Expand All @@ -83,14 +93,34 @@ pub unsafe extern "C" fn hang_write_video_packet_from_c(data: *const u8, size: u
return;
}

if let Some(import_mutex) = IMPORT.get() {
if let Ok(mut import) = import_mutex.lock() {
if let Ok(mut guard) = IMPORT.lock() {
if let Some(import) = guard.as_mut() {
// SAFETY: Caller of hang_write_video_packet_from_c guarantees data is valid
import.write_video_frame(data, size, keyframe > 0, dts);
}
}
}

/// # Safety
///
/// The caller must ensure that:
/// - `data` points to a valid buffer of at least `size` bytes
/// - The buffer remains valid for the duration of this function call
#[no_mangle]
pub unsafe extern "C" fn hang_write_audio_packet_from_c(data: *const u8, size: usize, dts: u64) {
// Validate pointer and size
if data.is_null() || size == 0 {
return;
}

if let Ok(mut guard) = IMPORT.lock() {
if let Some(import) = guard.as_mut() {
// SAFETY: Caller of hang_write_audio_packet_from_c guarantees data is valid
import.write_audio_frame(data, size, dts);
}
}
}

pub async fn client(url: Url, name: String) -> anyhow::Result<()> {
let broadcast = moq_lite::Broadcast::produce();
let config = moq_native::ClientConfig::default();
Expand All @@ -104,7 +134,9 @@ pub async fn client(url: Url, name: String) -> anyhow::Result<()> {

let mut import = ImportJoy::new(broadcast.producer);
import.init();
let _ = IMPORT.set(Mutex::new(import));
if let Ok(mut guard) = IMPORT.lock() {
*guard = Some(import);
}

origin.producer.publish_broadcast(&name, broadcast.consumer);

Expand Down Expand Up @@ -147,11 +179,12 @@ impl ImportJoy {
pub fn init(&mut self) {
// Produce the catalog
let mut video_renditions = HashMap::new();
let mut audio_renditions = HashMap::new();

let (track_name, config) = Self::init_video();
let track = Track {
name: track_name.clone(),
priority: 2,
priority: 1,
};
let track_produce = track.produce();
self.broadcast.insert_track(track_produce.consumer);
Expand All @@ -162,14 +195,33 @@ impl ImportJoy {
if !video_renditions.is_empty() {
let video = Video {
renditions: video_renditions,
priority: 2,
priority: 1,
display: None,
rotation: None,
flip: None,
};
self.catalog.set_video(Some(video));
}

let (track_name, config) = Self::init_audio();
let track = Track {
name: track_name.clone(),
priority: 2,
};
let track_produce = track.produce();
self.broadcast.insert_track(track_produce.consumer);
audio_renditions.insert(track_name, config);

self.tracks.insert(1, track_produce.producer.into());

if !audio_renditions.is_empty() {
let audio = Audio {
renditions: audio_renditions,
priority: 2,
};
self.catalog.set_audio(Some(audio));
}

self.catalog.publish();
}

Expand Down Expand Up @@ -197,6 +249,20 @@ impl ImportJoy {
(name, config)
}

pub fn init_audio() -> (String, AudioConfig) {
let name = String::from("audio1");

let config = AudioConfig {
codec: AAC { profile: 2 }.into(),
sample_rate: 48000,
channel_count: 2,
bitrate: Some(128000),
description: None,
};

(name, config)
}

/// # Safety
///
/// The caller must ensure that `data` points to a valid buffer of at least `size` bytes
Expand Down Expand Up @@ -225,4 +291,26 @@ impl ImportJoy {

track.write(frame);
}

/// # Safety
///
/// The caller must ensure that `data` points to a valid buffer of at least `size` bytes
pub unsafe fn write_audio_frame(&mut self, data: *const u8, size: usize, dts: u64) {
let Some(track) = self.tracks.get_mut(&1) else {
return;
};

// Use copy_from_slice to own the data, avoiding use-after-free when C caller frees the buffer
let payload = Bytes::copy_from_slice(std::slice::from_raw_parts(data, size));

let timestamp = Timestamp::from_micros(dts);

let frame = Frame {
timestamp,
keyframe: false,
payload,
};

track.write(frame);
}
}
Loading