Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PulseAudio support #957

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Add PulseAudio support
This adds support for PulseAudio on hosts with a PA or PipeWire server
(the latter via pipewire-pulse).

Since the underlying client is async, some amount of bridging has to be
done.
colinmarc committed Feb 27, 2025
commit a76575ba9c9a0cc2c59ec48d113c659def727a58
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ rust-version = "1.70"
[features]
asio = ["asio-sys", "num-traits"] # Only available on Windows. See README for setup instructions.
oboe-shared-stdcxx = ["oboe/shared-stdcxx"] # Only available on Android. See README for what it does.
pulseaudio = ["dep:pulseaudio", "dep:futures"] # Only available on some Unix platforms.

[dependencies]
dasp_sample = "0.11"
@@ -46,6 +47,8 @@ num-traits = { version = "0.2.6", optional = true }
alsa = "0.9"
libc = "0.2"
jack = { version = "0.13.0", optional = true }
pulseaudio = { git = "https://github.com/colinmarc/pulseaudio-rs", branch = "client", optional = true }
futures = { version = "0.3", optional = true }

[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
core-foundation-sys = "0.8.2" # For linking to CoreFoundation.framework and handling device name `CFString`s.
77 changes: 38 additions & 39 deletions examples/beep.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::Parser;
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
FromSample, Sample, SizedSample, I24,
FromSample, HostUnavailable, Sample, SizedSample, I24,
};

#[derive(Parser, Debug)]
@@ -11,58 +11,57 @@ struct Opt {
#[arg(short, long, default_value_t = String::from("default"))]
device: String,

/// Use the JACK host
#[cfg(all(
any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
),
feature = "jack"
))]
#[arg(short, long)]
#[allow(dead_code)]
/// Use the JACK host. Requires `--features jack`.
#[arg(long, default_value_t = false)]
jack: bool,

/// Use the PulseAudio host. Requires `--features pulseaudio`.
#[arg(long, default_value_t = false)]
pulseaudio: bool,
}

fn main() -> anyhow::Result<()> {
let opt = Opt::parse();

// Conditionally compile with jack if the feature is specified.
#[cfg(all(
any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
),
feature = "jack"
// Jack/PulseAudio support must be enabled at compile time, and is
// only available on some platforms.
#[allow(unused_mut, unused_assignments)]
let mut jack_host_id = Err(HostUnavailable);
#[allow(unused_mut, unused_assignments)]
let mut pulseaudio_host_id = Err(HostUnavailable);

#[cfg(any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
))]
{
#[cfg(feature = "jack")]
{
jack_host_id = Ok(cpal::HostId::Jack);
}

#[cfg(feature = "pulseaudio")]
{
pulseaudio_host_id = Ok(cpal::HostId::PulseAudio);
}
}

// Manually check for flags. Can be passed through cargo with -- e.g.
// cargo run --release --example beep --features jack -- --jack
let host = if opt.jack {
cpal::host_from_id(cpal::available_hosts()
.into_iter()
.find(|id| *id == cpal::HostId::Jack)
.expect(
"make sure --features jack is specified. only works on OSes where jack is available",
)).expect("jack host unavailable")
jack_host_id
.and_then(cpal::host_from_id)
.expect("make sure `--features jack` is specified, and the platform is supported")
} else if opt.pulseaudio {
pulseaudio_host_id
.and_then(cpal::host_from_id)
.expect("make sure `--features pulseaudio` is specified, and the platform is supported")
} else {
cpal::default_host()
};

#[cfg(any(
not(any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
)),
not(feature = "jack")
))]
let host = cpal::default_host();

let device = if opt.device == "default" {
host.default_output_device()
} else {
84 changes: 43 additions & 41 deletions examples/feedback.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,10 @@
//! precisely synchronised.

use clap::Parser;
use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
HostUnavailable,
};
use ringbuf::{
traits::{Consumer, Producer, Split},
HeapRb,
@@ -28,58 +31,57 @@ struct Opt {
#[arg(short, long, value_name = "DELAY_MS", default_value_t = 150.0)]
latency: f32,

/// Use the JACK host
#[cfg(all(
any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
),
feature = "jack"
))]
#[arg(short, long)]
#[allow(dead_code)]
/// Use the JACK host. Requires `--features jack`.
#[arg(long, default_value_t = false)]
jack: bool,

/// Use the PulseAudio host. Requires `--features pulseaudio`.
#[arg(long, default_value_t = false)]
pulseaudio: bool,
}

fn main() -> anyhow::Result<()> {
let opt = Opt::parse();

// Conditionally compile with jack if the feature is specified.
#[cfg(all(
any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
),
feature = "jack"
// Jack/PulseAudio support must be enabled at compile time, and is
// only available on some platforms.
#[allow(unused_mut, unused_assignments)]
let mut jack_host_id = Err(HostUnavailable);
#[allow(unused_mut, unused_assignments)]
let mut pulseaudio_host_id = Err(HostUnavailable);

#[cfg(any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
))]
{
#[cfg(feature = "jack")]
{
jack_host_id = Ok(cpal::HostId::Jack);
}

#[cfg(feature = "pulseaudio")]
{
pulseaudio_host_id = Ok(cpal::HostId::PulseAudio);
}
}

// Manually check for flags. Can be passed through cargo with -- e.g.
// cargo run --release --example beep --features jack -- --jack
let host = if opt.jack {
cpal::host_from_id(cpal::available_hosts()
.into_iter()
.find(|id| *id == cpal::HostId::Jack)
.expect(
"make sure --features jack is specified. only works on OSes where jack is available",
)).expect("jack host unavailable")
jack_host_id
.and_then(cpal::host_from_id)
.expect("make sure `--features jack` is specified, and the platform is supported")
} else if opt.pulseaudio {
pulseaudio_host_id
.and_then(cpal::host_from_id)
.expect("make sure `--features pulseaudio` is specified, and the platform is supported")
} else {
cpal::default_host()
};

#[cfg(any(
not(any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
)),
not(feature = "jack")
))]
let host = cpal::default_host();

// Find devices.
let input_device = if opt.input_device == "default" {
host.default_input_device()
@@ -164,8 +166,8 @@ fn main() -> anyhow::Result<()> {
output_stream.play()?;

// Run for 3 seconds before closing.
println!("Playing for 3 seconds... ");
std::thread::sleep(std::time::Duration::from_secs(3));
println!("Playing for 10 seconds... ");
std::thread::sleep(std::time::Duration::from_secs(10));
drop(input_stream);
drop(output_stream);
println!("Done!");
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -70,12 +70,15 @@ impl From<BackendSpecificError> for DevicesError {
pub enum DeviceNameError {
/// See the [`BackendSpecificError`] docs for more information about this error variant.
BackendSpecific { err: BackendSpecificError },
/// The name is not valid UTF-8.
InvalidUtf8,
}

impl Display for DeviceNameError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::BackendSpecific { err } => err.fmt(f),
Self::InvalidUtf8 => write!(f, "The name is not valid UTF-8"),
}
}
}
10 changes: 10 additions & 0 deletions src/host/mod.rs
Original file line number Diff line number Diff line change
@@ -24,6 +24,16 @@ pub(crate) mod jack;
pub(crate) mod null;
#[cfg(target_os = "android")]
pub(crate) mod oboe;
#[cfg(all(
any(
target_os = "linux",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "netbsd"
),
feature = "pulseaudio"
))]
pub(crate) mod pulseaudio;
#[cfg(windows)]
pub(crate) mod wasapi;
#[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))]
396 changes: 396 additions & 0 deletions src/host/pulseaudio/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,396 @@
extern crate pulseaudio;

use futures::executor::block_on;
use pulseaudio::protocol;

mod stream;
use core::str;

pub use stream::Stream;

use crate::{
traits::{DeviceTrait, HostTrait},
BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceNameError,
DevicesError, HostUnavailable, InputCallbackInfo, OutputCallbackInfo, SampleFormat, SampleRate,
StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig,
SupportedStreamConfigRange, SupportedStreamConfigsError,
};

const PULSE_FORMATS: &[SampleFormat] = &[
SampleFormat::U8,
SampleFormat::I16,
SampleFormat::I24,
SampleFormat::I32,
SampleFormat::F32,
];

impl TryFrom<protocol::SampleFormat> for SampleFormat {
type Error = ();

fn try_from(spec: protocol::SampleFormat) -> Result<Self, Self::Error> {
match spec {
protocol::SampleFormat::U8 => Ok(SampleFormat::U8),
protocol::SampleFormat::S16Le | protocol::SampleFormat::S16Be => Ok(SampleFormat::I16),
protocol::SampleFormat::S24Le | protocol::SampleFormat::S24Be => Ok(SampleFormat::I24),
protocol::SampleFormat::S32Le | protocol::SampleFormat::S32Be => Ok(SampleFormat::I32),
protocol::SampleFormat::Float32Le | protocol::SampleFormat::Float32Be => {
Ok(SampleFormat::F32)
}
_ => Err(()),
}
}
}

impl TryFrom<SampleFormat> for protocol::SampleFormat {
type Error = ();

fn try_from(format: SampleFormat) -> Result<Self, Self::Error> {
#[cfg(target_endian = "little")]
match (format, cfg!(target_endian = "little")) {
(SampleFormat::U8, _) => Ok(protocol::SampleFormat::U8),
(SampleFormat::I16, true) => Ok(protocol::SampleFormat::S16Le),
(SampleFormat::I16, false) => Ok(protocol::SampleFormat::S16Be),
(SampleFormat::I24, true) => Ok(protocol::SampleFormat::S24Le),
(SampleFormat::I24, false) => Ok(protocol::SampleFormat::S24Be),
(SampleFormat::I32, true) => Ok(protocol::SampleFormat::S32Le),
(SampleFormat::I32, false) => Ok(protocol::SampleFormat::S32Be),
(SampleFormat::F32, true) => Ok(protocol::SampleFormat::Float32Le),
(SampleFormat::F32, false) => Ok(protocol::SampleFormat::Float32Be),
_ => Err(()),
}
}
}

impl From<pulseaudio::ClientError> for BackendSpecificError {
fn from(err: pulseaudio::ClientError) -> Self {
BackendSpecificError {
description: err.to_string(),
}
}
}

/// A Host for connecting to the popular PulseAudio and PipeWire (via
/// pipewire-pulse) audio servers on linux.
pub struct Host {
client: pulseaudio::Client,
}

impl Host {
pub fn new() -> Result<Self, HostUnavailable> {
let client =
pulseaudio::Client::from_env(c"cpal-pulseaudio").map_err(|_| HostUnavailable)?;

Ok(Self { client })
}
}

impl HostTrait for Host {
type Devices = std::vec::IntoIter<Device>;
type Device = Device;

fn is_available() -> bool {
pulseaudio::socket_path_from_env().is_some()
}

fn devices(&self) -> Result<Self::Devices, DevicesError> {
let sinks = block_on(self.client.list_sinks()).map_err(|_| BackendSpecificError {
description: "Failed to list sinks".to_owned(),
})?;

let sources = block_on(self.client.list_sources()).map_err(|_| BackendSpecificError {
description: "Failed to list sources".to_owned(),
})?;

Ok(sinks
.into_iter()
.map(|sink_info| Device::Sink {
client: self.client.clone(),
info: sink_info,
})
.chain(sources.into_iter().map(|source_info| Device::Source {
client: self.client.clone(),
info: source_info,
}))
.collect::<Vec<_>>()
.into_iter())
}

fn default_input_device(&self) -> Option<Self::Device> {
let source_info = block_on(
self.client
.source_info_by_name(protocol::DEFAULT_SOURCE.to_owned()),
)
.ok()?;

Some(Device::Source {
client: self.client.clone(),
info: source_info,
})
}

fn default_output_device(&self) -> Option<Self::Device> {
let sink_info = block_on(
self.client
.sink_info_by_name(protocol::DEFAULT_SINK.to_owned()),
)
.ok()?;

Some(Device::Sink {
client: self.client.clone(),
info: sink_info,
})
}
}

/// A PulseAudio sink or source.
#[derive(Debug, Clone)]
pub enum Device {
Sink {
client: pulseaudio::Client,
info: protocol::SinkInfo,
},
Source {
client: pulseaudio::Client,
info: protocol::SourceInfo,
},
}

impl DeviceTrait for Device {
type SupportedInputConfigs = std::vec::IntoIter<SupportedStreamConfigRange>;
type SupportedOutputConfigs = std::vec::IntoIter<SupportedStreamConfigRange>;
type Stream = Stream;

fn name(&self) -> Result<String, DeviceNameError> {
let name = match self {
Device::Sink { info, .. } => &info.name,
Device::Source { info, .. } => &info.name,
};

match str::from_utf8(name.as_bytes()) {
Ok(name) => Ok(name.to_string()),
Err(_) => Err(DeviceNameError::InvalidUtf8),
}
}

fn supported_input_configs(
&self,
) -> Result<Self::SupportedInputConfigs, SupportedStreamConfigsError> {
let Device::Source { .. } = self else {
return Ok(vec![].into_iter());
};

let mut ranges = vec![];
for format in PULSE_FORMATS {
for channel_count in 1..protocol::sample_spec::MAX_CHANNELS {
ranges.push(SupportedStreamConfigRange {
channels: channel_count as _,
min_sample_rate: SampleRate(1),
max_sample_rate: SampleRate(protocol::sample_spec::MAX_RATE),
buffer_size: SupportedBufferSize::Range {
min: 0,
max: protocol::MAX_MEMBLOCKQ_LENGTH as _,
},
sample_format: *format,
})
}
}

Ok(ranges.into_iter())
}

fn supported_output_configs(
&self,
) -> Result<Self::SupportedOutputConfigs, SupportedStreamConfigsError> {
let Device::Sink { .. } = self else {
return Ok(vec![].into_iter());
};

let mut ranges = vec![];
for format in PULSE_FORMATS {
for channel_count in 1..protocol::sample_spec::MAX_CHANNELS {
ranges.push(SupportedStreamConfigRange {
channels: channel_count as _,
min_sample_rate: SampleRate(1),
max_sample_rate: SampleRate(protocol::sample_spec::MAX_RATE),
buffer_size: SupportedBufferSize::Range {
min: 0,
max: protocol::MAX_MEMBLOCKQ_LENGTH as _,
},
sample_format: *format,
})
}
}

Ok(ranges.into_iter())
}

fn default_input_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
let Device::Source { info, .. } = self else {
return Err(DefaultStreamConfigError::StreamTypeNotSupported);
};

Ok(SupportedStreamConfig {
channels: info.channel_map.num_channels() as _,
sample_rate: SampleRate(info.sample_spec.sample_rate),
buffer_size: SupportedBufferSize::Range {
min: 0,
max: protocol::MAX_MEMBLOCKQ_LENGTH as _,
},
sample_format: info
.sample_spec
.format
.try_into()
.unwrap_or(SampleFormat::F32),
})
}

fn default_output_config(&self) -> Result<SupportedStreamConfig, DefaultStreamConfigError> {
let Device::Sink { info, .. } = self else {
return Err(DefaultStreamConfigError::StreamTypeNotSupported);
};

Ok(SupportedStreamConfig {
channels: info.channel_map.num_channels() as _,
sample_rate: SampleRate(info.sample_spec.sample_rate),
buffer_size: SupportedBufferSize::Range {
min: 0,
max: protocol::MAX_MEMBLOCKQ_LENGTH as _,
},
sample_format: info
.sample_spec
.format
.try_into()
.unwrap_or(SampleFormat::F32),
})
}

fn build_input_stream_raw<D, E>(
&self,
config: &StreamConfig,
sample_format: SampleFormat,
data_callback: D,
error_callback: E,
_timeout: Option<std::time::Duration>,
) -> Result<Self::Stream, BuildStreamError>
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let Device::Source { client, info } = self else {
return Err(BuildStreamError::StreamConfigNotSupported);
};

let format: protocol::SampleFormat = sample_format
.try_into()
.map_err(|_| BuildStreamError::StreamConfigNotSupported)?;

let sample_spec = make_sample_spec(config, format);
let channel_map = make_channel_map(config);
let buffer_attr = make_buffer_attr(config, format);

let params = protocol::RecordStreamParams {
sample_spec,
channel_map,
source_index: Some(info.index),
buffer_attr,
flags: protocol::stream::StreamFlags {
// Start the stream suspended.
start_corked: true,
..Default::default()
},
..Default::default()
};

stream::Stream::new_record(
client.clone(),
params,
sample_format,
data_callback,
error_callback,
)
}

fn build_output_stream_raw<D, E>(
&self,
config: &StreamConfig,
sample_format: SampleFormat,
data_callback: D,
error_callback: E,
_timeout: Option<std::time::Duration>,
) -> Result<Self::Stream, BuildStreamError>
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let Device::Sink { client, info } = self else {
return Err(BuildStreamError::StreamConfigNotSupported);
};

let format: protocol::SampleFormat = sample_format
.try_into()
.map_err(|_| BuildStreamError::StreamConfigNotSupported)?;

let sample_spec = make_sample_spec(config, format);
let channel_map = make_channel_map(config);
let buffer_attr = make_buffer_attr(config, format);

let params = protocol::PlaybackStreamParams {
sink_index: Some(info.index),
sample_spec,
channel_map,
buffer_attr,
flags: protocol::stream::StreamFlags {
// Start the stream suspended.
start_corked: true,
..Default::default()
},
..Default::default()
};

stream::Stream::new_playback(
client.clone(),
params,
sample_format,
data_callback,
error_callback,
)
}
}

fn make_sample_spec(config: &StreamConfig, format: protocol::SampleFormat) -> protocol::SampleSpec {
protocol::SampleSpec {
format,
sample_rate: config.sample_rate.0,
channels: config.channels as _,
}
}

fn make_channel_map(config: &StreamConfig) -> protocol::ChannelMap {
if config.channels == 2 {
return protocol::ChannelMap::stereo();
}

let mut map = protocol::ChannelMap::empty();
for _ in 0..config.channels {
map.push(protocol::ChannelPosition::Mono);
}

map
}

fn make_buffer_attr(
config: &StreamConfig,
format: protocol::SampleFormat,
) -> protocol::stream::BufferAttr {
match config.buffer_size {
crate::BufferSize::Default => Default::default(),
crate::BufferSize::Fixed(frame_count) => {
let len = frame_count * config.channels as u32 * format.bytes_per_sample() as u32;
protocol::stream::BufferAttr {
max_length: len,
target_length: len,
..Default::default()
}
}
}
}
216 changes: 216 additions & 0 deletions src/host/pulseaudio/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
use std::{
sync::{
atomic::{self, AtomicU64},
Arc,
},
time::{self, SystemTime},
};

use futures::executor::block_on;
use pulseaudio::{protocol, AsPlaybackSource};

use crate::{
traits::StreamTrait, BackendSpecificError, BuildStreamError, Data, InputCallbackInfo,
InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, PlayStreamError, SampleFormat,
StreamError, StreamInstant,
};

pub enum Stream {
Playback(pulseaudio::PlaybackStream),
Record(pulseaudio::RecordStream),
}

impl StreamTrait for Stream {
fn play(&self) -> Result<(), PlayStreamError> {
match self {
Stream::Playback(stream) => {
block_on(stream.uncork()).map_err(Into::<BackendSpecificError>::into)?;
}
Stream::Record(stream) => {
block_on(stream.uncork()).map_err(Into::<BackendSpecificError>::into)?;
block_on(stream.started()).map_err(Into::<BackendSpecificError>::into)?;
}
};

Ok(())
}

fn pause(&self) -> Result<(), crate::PauseStreamError> {
let res = match self {
Stream::Playback(stream) => block_on(stream.cork()),
Stream::Record(stream) => block_on(stream.cork()),
};

res.map_err(Into::<BackendSpecificError>::into)?;
Ok(())
}
}

impl Stream {
pub fn new_playback<D, E>(
client: pulseaudio::Client,
params: protocol::PlaybackStreamParams,
sample_format: SampleFormat,
mut data_callback: D,
_error_callback: E,
) -> Result<Self, BuildStreamError>
where
D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let epoch = std::time::SystemTime::now();

let current_latency_micros = Arc::new(AtomicU64::new(0));
let latency_clone = current_latency_micros.clone();
let sample_spec = params.sample_spec.clone();

// Wrap the write callback to match the pulseaudio signature.
let callback = move |buf: &mut [u8]| {
let now = SystemTime::now().duration_since(epoch).unwrap_or_default();
let latency = latency_clone.load(atomic::Ordering::Relaxed);
let playback_time = now + time::Duration::from_micros(latency as u64);

let timestamp = OutputStreamTimestamp {
callback: StreamInstant {
secs: now.as_secs() as i64,
nanos: now.subsec_nanos(),
},
playback: StreamInstant {
secs: playback_time.as_secs() as i64,
nanos: playback_time.subsec_nanos(),
},
};

let bps = sample_spec.format.bytes_per_sample();
let n_samples = buf.len() / bps;
let mut data =
unsafe { Data::from_parts(buf.as_mut_ptr().cast(), n_samples, sample_format) };

data_callback(&mut data, &OutputCallbackInfo { timestamp });

// We always consider the full buffer filled, because cpal's
// user-facing api doesn't allow for short writes.
// TODO: should we preemptively zero the output buffer before
// passing it to the user?
n_samples * bps
};

let stream = block_on(client.create_playback_stream(params, callback.as_playback_source()))
.map_err(Into::<BackendSpecificError>::into)?;

// Spawn a thread to drive the stream future.
let stream_clone = stream.clone();
let _worker_thread = std::thread::spawn(move || block_on(stream_clone.play_all()));

// Spawn a thread to monitor the stream's latency in a loop.
let stream_clone = stream.clone();
let latency_clone = current_latency_micros.clone();
std::thread::spawn(move || loop {
let Ok(timing_info) = block_on(stream_clone.timing_info()) else {
break;
};

store_latency(
&latency_clone,
sample_spec,
timing_info.sink_usec,
timing_info.write_offset,
timing_info.read_offset,
);

std::thread::sleep(time::Duration::from_millis(100));
});

Ok(Self::Playback(stream))
}

pub fn new_record<D, E>(
client: pulseaudio::Client,
params: protocol::RecordStreamParams,
sample_format: SampleFormat,
mut data_callback: D,
_error_callback: E,
) -> Result<Self, BuildStreamError>
where
D: FnMut(&Data, &InputCallbackInfo) + Send + 'static,
E: FnMut(StreamError) + Send + 'static,
{
let epoch = std::time::SystemTime::now();

let current_latency_micros = Arc::new(AtomicU64::new(0));
let latency_clone = current_latency_micros.clone();
let sample_spec = params.sample_spec.clone();

let callback = move |buf: &[u8]| {
let now = SystemTime::now().duration_since(epoch).unwrap_or_default();
let latency = latency_clone.load(atomic::Ordering::Relaxed);
let capture_time = now
.checked_sub(time::Duration::from_micros(latency as u64))
.unwrap_or_default();

let timestamp = InputStreamTimestamp {
callback: StreamInstant {
secs: now.as_secs() as i64,
nanos: now.subsec_nanos(),
},
capture: StreamInstant {
secs: capture_time.as_secs() as i64,
nanos: capture_time.subsec_nanos(),
},
};

let bps = sample_spec.format.bytes_per_sample();
let n_samples = buf.len() / bps;
let data =
unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, sample_format) };

data_callback(&data, &InputCallbackInfo { timestamp });
};

let stream = block_on(client.create_record_stream(params, callback))
.map_err(Into::<BackendSpecificError>::into)?;

// Spawn a thread to monitor the stream's latency in a loop.
let stream_clone = stream.clone();
let latency_clone = current_latency_micros.clone();
std::thread::spawn(move || loop {
let Ok(timing_info) = block_on(stream_clone.timing_info()) else {
break;
};

store_latency(
&latency_clone,
sample_spec,
timing_info.sink_usec,
timing_info.write_offset,
timing_info.read_offset,
);

std::thread::sleep(time::Duration::from_millis(100));
});

Ok(Self::Record(stream))
}
}

fn store_latency(
latency_micros: &AtomicU64,
sample_spec: protocol::SampleSpec,
device_latency_usec: u64,
write_offset: i64,
read_offset: i64,
) -> time::Duration {
let offset = (write_offset as u64)
.checked_sub(read_offset as u64)
.unwrap_or(0);

let latency = time::Duration::from_micros(device_latency_usec)
+ sample_spec.bytes_to_duration(offset as usize);

latency_micros.store(
latency.as_micros().try_into().unwrap_or(u64::MAX),
atomic::Ordering::Relaxed,
);

latency
}
4 changes: 3 additions & 1 deletion src/platform/mod.rs
Original file line number Diff line number Diff line change
@@ -588,7 +588,6 @@ macro_rules! impl_platform_host {
};
}

// TODO: Add pulseaudio and jack here eventually.
#[cfg(any(
target_os = "linux",
target_os = "dragonfly",
@@ -599,8 +598,11 @@ mod platform_impl {
pub use crate::host::alsa::Host as AlsaHost;
#[cfg(feature = "jack")]
pub use crate::host::jack::Host as JackHost;
#[cfg(feature = "pulseaudio")]
pub use crate::host::pulseaudio::Host as PulseAudioHost;

impl_platform_host!(
#[cfg(feature = "pulseaudio")] PulseAudio => PulseAudioHost,
#[cfg(feature = "jack")] Jack => JackHost,
Alsa => AlsaHost,
);