Skip to content

Commit a11f7d2

Browse files
authored
Add a mechanism to update stream information for applied audio filters after AudioStream initialization (#596)
* add a way to update stream_info after AudioStream is initialized * fmt * add a changeset
1 parent dde59ce commit a11f7d2

File tree

5 files changed

+82
-13
lines changed

5 files changed

+82
-13
lines changed

.nanpa/candy-blimp-twice.kdl

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
patch package="livekit-ffi" type="fixed" "Fixed metric report issue on audio filter where room_id is sometimes empty"

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

livekit-ffi/src/server/audio_stream.rs

+55-10
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl FfiAudioStream {
5757
return Err(FfiError::InvalidRequest("not an audio track".into()));
5858
};
5959

60-
let (audio_filter, stream_info) = match &new_stream.audio_filter_module_id {
60+
let (audio_filter, info) = match &new_stream.audio_filter_module_id {
6161
Some(module_id) => {
6262
let Some(room_handle) = ffi_track.room_handle else {
6363
return Err(FfiError::InvalidRequest(
@@ -83,9 +83,9 @@ impl FfiAudioStream {
8383
track_id: rtc_track.id(),
8484
};
8585

86-
(Some(filter), stream_info)
86+
(Some(filter), Some(AudioFilterInfo { stream_info, room_handle }))
8787
}
88-
None => (None, AudioFilterStreamInfo::default()),
88+
None => (None, None),
8989
};
9090

9191
let stream_type = new_stream.r#type();
@@ -104,7 +104,7 @@ impl FfiAudioStream {
104104
let session = audio_filter.clone().new_session(
105105
sample_rate,
106106
new_stream.audio_filter_options.unwrap_or("".into()),
107-
stream_info,
107+
info.as_ref().map(|i| i.stream_info.clone()).unwrap(),
108108
);
109109

110110
match session {
@@ -134,6 +134,7 @@ impl FfiAudioStream {
134134
self_dropped_rx,
135135
server.watch_handle_dropped(new_stream.track_handle),
136136
true,
137+
info,
137138
));
138139
server.watch_panic(handle);
139140
Ok::<FfiAudioStream, FfiError>(audio_stream)
@@ -208,7 +209,8 @@ impl FfiAudioStream {
208209
// track_tx is no longer held, so the track_rx will be closed when track_changed_trigger is done
209210

210211
let url = ffi_participant.room.url();
211-
let room_sid = ffi_participant.room.room.sid().await;
212+
let room_sid =
213+
ffi_participant.room.room.maybe_sid().map(|id| id.to_string()).unwrap_or("".into());
212214
let room_name = ffi_participant.room.room.name();
213215
let participant_identity = ffi_participant.participant.identity();
214216
let participant_id = ffi_participant.participant.sid();
@@ -247,7 +249,7 @@ impl FfiAudioStream {
247249
}
248250
});
249251

250-
let mut audio_filter_session = match &filter {
252+
let (mut audio_filter_session, info) = match &filter {
251253
Some(filter) => match &request.audio_filter_options {
252254
Some(options) => {
253255
let stream_info = AudioFilterStreamInfo {
@@ -259,19 +261,24 @@ impl FfiAudioStream {
259261
track_id: track.sid().into(),
260262
};
261263

264+
let info = AudioFilterInfo {
265+
stream_info,
266+
room_handle: ffi_participant.room.handle_id,
267+
};
268+
262269
let session = filter.clone().new_session(
263270
sample_rate as u32,
264271
&options,
265-
stream_info,
272+
info.stream_info.clone(),
266273
);
267274
if session.is_none() {
268275
log::error!("failed to initialize the audio filter. it will not be enabled for this session.");
269276
}
270-
session
277+
(session, Some(info))
271278
}
272-
None => None,
279+
None => (None, None),
273280
},
274-
None => None,
281+
None => (None, None),
275282
};
276283

277284
let native_stream = NativeAudioStream::new(rtc_track, sample_rate, num_channels);
@@ -297,6 +304,7 @@ impl FfiAudioStream {
297304
c_rx,
298305
handle_dropped_rx,
299306
false,
307+
info,
300308
)
301309
.await;
302310
let _ = done_tx.send(());
@@ -332,6 +340,7 @@ impl FfiAudioStream {
332340
mut self_dropped_rx: oneshot::Receiver<()>,
333341
mut handle_dropped_rx: oneshot::Receiver<()>,
334342
send_eos: bool,
343+
mut filter_info: Option<AudioFilterInfo>,
335344
) {
336345
loop {
337346
tokio::select! {
@@ -346,6 +355,21 @@ impl FfiAudioStream {
346355
break;
347356
};
348357

358+
if let Some(ref mut info) = filter_info {
359+
if info.stream_info.room_id == "" {
360+
// check if room_id is updated
361+
if info.update_room_id(server) {
362+
if info.stream_info.room_id != "" {
363+
if let AudioStreamKind::Filtered(ref mut filter) = native_stream {
364+
filter.update_stream_info(info.stream_info.clone());
365+
}
366+
// room_id is updated, this check is no longer needed.
367+
filter_info = None;
368+
}
369+
}
370+
}
371+
}
372+
349373
let handle_id = server.next_id();
350374
let buffer_info = proto::AudioFrameBufferInfo::from(&frame);
351375
server.store_handle(handle_id, frame);
@@ -383,3 +407,24 @@ impl FfiAudioStream {
383407
}
384408
}
385409
}
410+
411+
// Used to update audio filter session when the stream info is changed. (Mainly room_id
412+
#[derive(Default)]
413+
struct AudioFilterInfo {
414+
stream_info: AudioFilterStreamInfo,
415+
room_handle: FfiHandleId,
416+
}
417+
418+
impl AudioFilterInfo {
419+
fn update_room_id(&mut self, server: &'static server::FfiServer) -> bool {
420+
let Ok(room) = server.retrieve_handle::<FfiRoom>(self.room_handle) else {
421+
return false;
422+
};
423+
let room_id = room.inner.room.maybe_sid().map(|id| id.to_string()).unwrap_or("".into());
424+
if room_id != "" {
425+
self.stream_info.room_id = room_id.into();
426+
return true;
427+
}
428+
false
429+
}
430+
}

livekit-ffi/src/server/room.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub struct FfiRoom {
5656

5757
pub struct RoomInner {
5858
pub room: Room,
59-
handle_id: FfiHandleId,
59+
pub(crate) handle_id: FfiHandleId,
6060
data_tx: mpsc::UnboundedSender<FfiDataPacket>,
6161
transcription_tx: mpsc::UnboundedSender<FfiTranscription>,
6262
dtmf_tx: mpsc::UnboundedSender<FfiSipDtmfPacket>,

livekit/src/plugin.rs

+24-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type CreateFn = unsafe extern "C" fn(
3333
type DestroyFn = unsafe extern "C" fn(*const c_void);
3434
type ProcessI16Fn = unsafe extern "C" fn(*const c_void, usize, *const i16, *mut i16);
3535
type ProcessF32Fn = unsafe extern "C" fn(*const c_void, usize, *const f32, *mut f32);
36+
type UpdateStreamInfoFn = unsafe extern "C" fn(*const c_void, *const c_char);
3637

3738
static REGISTERED_PLUGINS: LazyLock<RwLock<HashMap<String, Arc<AudioFilterPlugin>>>> =
3839
LazyLock::new(|| RwLock::new(HashMap::new()));
@@ -57,6 +58,7 @@ pub struct AudioFilterPlugin {
5758
destroy_fn_ptr: *const c_void,
5859
process_i16_fn_ptr: *const c_void,
5960
process_f32_fn_ptr: *const c_void,
61+
update_stream_info_fn_ptr: *const c_void,
6062
}
6163

6264
impl AudioFilterPlugin {
@@ -116,6 +118,11 @@ impl AudioFilterPlugin {
116118
.try_as_raw_ptr()
117119
.unwrap()
118120
};
121+
let update_stream_info_fn_ptr = unsafe {
122+
lib.get::<Symbol<UpdateStreamInfoFn>>(b"audio_filter_update_stream_info")?
123+
.try_as_raw_ptr()
124+
.unwrap()
125+
};
119126

120127
Ok(Self {
121128
lib,
@@ -125,6 +132,7 @@ impl AudioFilterPlugin {
125132
destroy_fn_ptr,
126133
process_i16_fn_ptr,
127134
process_f32_fn_ptr,
135+
update_stream_info_fn_ptr,
128136
})
129137
}
130138

@@ -196,6 +204,17 @@ impl AudioFilterSession {
196204
let process: ProcessF32Fn = unsafe { std::mem::transmute(self.plugin.process_f32_fn_ptr) };
197205
unsafe { process(self.ptr, num_samples, input.as_ptr(), output.as_mut_ptr()) };
198206
}
207+
208+
pub fn update_stream_info(&self, info: AudioFilterStreamInfo) {
209+
if self.plugin.update_stream_info_fn_ptr.is_null() {
210+
return;
211+
}
212+
let update_stream_info_fn: UpdateStreamInfoFn =
213+
unsafe { std::mem::transmute(self.plugin.update_stream_info_fn_ptr) };
214+
let info_json = serde_json::to_string(&info).unwrap();
215+
let info_json = CString::new(info_json).unwrap_or(CString::new("").unwrap());
216+
unsafe { update_stream_info_fn(self.ptr, info_json.as_ptr()) }
217+
}
199218
}
200219

201220
impl Drop for AudioFilterSession {
@@ -234,6 +253,10 @@ impl AudioFilterAudioStream {
234253
frame_size,
235254
}
236255
}
256+
257+
pub fn update_stream_info(&mut self, info: AudioFilterStreamInfo) {
258+
self.session.update_stream_info(info);
259+
}
237260
}
238261

239262
impl Stream for AudioFilterAudioStream {
@@ -267,7 +290,7 @@ impl Stream for AudioFilterAudioStream {
267290
}
268291
}
269292

270-
#[derive(Debug, Serialize, Default)]
293+
#[derive(Debug, Serialize, Default, Clone)]
271294
#[serde(rename_all = "camelCase")]
272295
pub struct AudioFilterStreamInfo {
273296
pub url: String,

0 commit comments

Comments
 (0)