Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 283 additions & 0 deletions music_assistant/providers/vban_receiver/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
"""VBAN protocol receiver plugin for Music Assistant."""

from __future__ import annotations

import asyncio
import re
from collections.abc import AsyncGenerator
from contextlib import suppress
from typing import TYPE_CHECKING, cast

from aiovban.asyncio.util import BackPressureStrategy
from aiovban.enums import VBANSampleRate
from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
from music_assistant_models.enums import (
ConfigEntryType,
ContentType,
ProviderFeature,
StreamType,
)
from music_assistant_models.errors import SetupFailedError
from music_assistant_models.media_items import AudioFormat
from music_assistant_models.streamdetails import StreamMetadata

from music_assistant.constants import (
CONF_BIND_IP,
CONF_BIND_PORT,
CONF_ENTRY_WARN_PREVIEW,
)
from music_assistant.helpers.util import (
get_ip_addresses,
)
from music_assistant.models.plugin import PluginProvider, PluginSource

from .vban import AsyncVBANClientMod

if TYPE_CHECKING:
from aiovban.asyncio.device import VBANDevice
from aiovban.asyncio.streams import VBANIncomingStream
from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
from music_assistant_models.provider import ProviderManifest

from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType

DEFAULT_UDP_PORT = 6980
DEFAULT_PCM_AUDIO_FORMAT = "S16LE"
DEFAULT_PCM_SAMPLE_RATE = 44100

CONF_VBAN_STREAM_NAME = "vban_stream_name"
CONF_SENDER_HOST = "sender_host"
CONF_PCM_AUDIO_FORMAT = "audio_format"
CONF_PCM_SAMPLE_RATE = "sample_rate"

SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE}


def _get_supported_pcm_formats() -> dict[str, int]:
"""Return supported PCM formats."""
pcm_formats = {}
for content_type in ContentType.__members__:
if match := re.match(r"PCM_([S|F](\d{2})LE)", content_type):
pcm_formats[match.group(1)] = int(match.group(2))
return pcm_formats


def _get_vban_sample_rates() -> list[str]:
"""Return supported VBAN sample rates."""
return [member.split("_")[1] for member in VBANSampleRate.__members__]


async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
"""Initialize provider(instance) with given configuration."""
return VBANReceiverProvider(mass, manifest, config)


async def get_config_entries(
mass: MusicAssistant, # noqa: ARG001
instance_id: str | None = None, # noqa: ARG001
action: str | None = None, # noqa: ARG001
values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
) -> tuple[ConfigEntry, ...]:
"""
Return Config entries to setup this provider.

instance_id: id of an existing provider instance (None if new instance setup).
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
ip_addresses = await get_ip_addresses()

def _validate_stream_name(config_value: str) -> bool:
"""Validate stream name."""
try:
config_value.encode("ascii")
except UnicodeEncodeError:
return False
return len(config_value) < 17

return (
CONF_ENTRY_WARN_PREVIEW,
ConfigEntry(
key=CONF_BIND_PORT,
type=ConfigEntryType.INTEGER,
default_value=DEFAULT_UDP_PORT,
label="Receiver: UDP Port",
description="The UDP port the VBAN receiver will listen on for connections. "
"Make sure that this server can be reached "
"on the given IP and UDP port by remote VBAN senders.",
),
ConfigEntry(
key=CONF_VBAN_STREAM_NAME,
type=ConfigEntryType.STRING,
label="Sender: VBAN Stream Name",
default_value="Network AUX",
description="Max 16 ASCII chars.\n"
"The VBAN stream name to expect from the remote VBAN sender.\n"
"This MUST match what the remote VBAN sender has set for the session name "
"otherwise audio streaming will not work.",
required=True,
validate=_validate_stream_name, # type: ignore[arg-type]
),
ConfigEntry(
key=CONF_SENDER_HOST,
type=ConfigEntryType.STRING,
default_value="127.0.0.1",
label="Sender: VBAN Sender hostname/IP address",
description="The hostname/IP Address of the remote VBAN SENDER.",
required=True,
),
ConfigEntry(
key=CONF_PCM_AUDIO_FORMAT,
type=ConfigEntryType.STRING,
default_value=DEFAULT_PCM_AUDIO_FORMAT,
options=[ConfigValueOption(x, x) for x in _get_supported_pcm_formats()],
label="PCM audio format",
description="The VBAN PCM audio format to expect from the remote VBAN sender. "
"This MUST match what the remote VBAN sender has set otherwise audio streaming "
"will not work.",
required=True,
),
ConfigEntry(
key=CONF_PCM_SAMPLE_RATE,
type=ConfigEntryType.STRING,
default_value=DEFAULT_PCM_SAMPLE_RATE,
options=[ConfigValueOption(x, x) for x in _get_vban_sample_rates()],
label="PCM sample rate",
description="The VBAN PCM sample rate to expect from the remote VBAN sender. "
"This MUST match what the remote VBAN sender has set otherwise audio streaming "
"will not work.",
required=True,
),
ConfigEntry(
key=CONF_BIND_IP,
type=ConfigEntryType.STRING,
default_value="0.0.0.0",
options=[ConfigValueOption(x, x) for x in {"0.0.0.0", *ip_addresses}],
label="Receiver: Bind to IP/interface",
description="Start the VBAN receiver on this specific interface. \n"
"Use 0.0.0.0 to bind to all interfaces, which is the default. \n"
"This is an advanced setting that should normally "
"not be adjusted in regular setups.",
category="advanced",
required=True,
),
)


class VBANReceiverProvider(PluginProvider):
"""Implementation of a VBAN protocol receiver plugin."""

def __init__(
self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> None:
"""Initialize MusicProvider."""
super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
self.logger = self.logger.getChild(self.instance_id.split("--")[1])
self._bind_port: int = cast("int", self.config.get_value(CONF_BIND_PORT))
self._bind_ip: str = cast("str", self.config.get_value(CONF_BIND_IP))
self._sender_host: str = cast("str", self.config.get_value(CONF_SENDER_HOST))
self._vban_stream_name: str = cast("str", self.config.get_value(CONF_VBAN_STREAM_NAME))
self._pcm_audio_format: str = cast("str", self.config.get_value(CONF_PCM_AUDIO_FORMAT))
self._pcm_sample_rate: int = cast("int", self.config.get_value(CONF_PCM_SAMPLE_RATE))

self._vban_receiver: AsyncVBANClientMod | None = None
self._vban_sender: VBANDevice | None = None
self._vban_stream: VBANIncomingStream | None = None

self._source_details = PluginSource(
id=self.instance_id,
name=f"{self.manifest.name}: {self._vban_stream_name}",
passive=False,
can_play_pause=False,
can_seek=False,
can_next_previous=False,
audio_format=AudioFormat(
content_type=ContentType(self._pcm_audio_format.lower()),
codec_type=ContentType(self._pcm_audio_format.lower()),
sample_rate=self._pcm_sample_rate,
bit_depth=_get_supported_pcm_formats()[self._pcm_audio_format],
channels=2,
),
metadata=StreamMetadata(
title=self._vban_stream_name,
artist=self._sender_host,
),
stream_type=StreamType.CUSTOM,
)

@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return {ProviderFeature.AUDIO_SOURCE}

@property
def instance_name_postfix(self) -> str | None:
"""Return a (default) instance name postfix for this provider instance."""
return self._vban_stream_name

async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._vban_receiver = AsyncVBANClientMod(ignore_audio_streams=False)
try:
self._udp_socket_task = asyncio.create_task(
self._vban_receiver.listen(self._bind_ip, self._bind_port)
)
except OSError as err:
raise SetupFailedError(f"Failed to start VBAN receiver plugin: {err}") from err

self._vban_sender = self._vban_receiver.register_device(self._sender_host)
if self._vban_sender:
self._vban_stream = self._vban_sender.receive_stream(
self._vban_stream_name, back_pressure_strategy=BackPressureStrategy.DRAIN_OLDEST
)

async def unload(self, is_removed: bool = False) -> None:
"""Handle close/cleanup of the provider."""
self.logger.debug("Unloading plugin")
if self._vban_receiver:
self.logger.debug("Closing UDP transport")
self._vban_receiver.close()
with suppress(asyncio.CancelledError):
await self._udp_socket_task

self._vban_receiver = None
self._vban_sender = None
self._vban_stream = None
await asyncio.sleep(0.1)

def get_source(self) -> PluginSource:
"""Get (audio)source details for this plugin."""
return self._source_details

async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
"""Yield raw PCM chunks from the VBANIncomingStream queue."""
self.logger.debug(
"Getting VBAN PCM audio stream for Player: %s//Stream: %s//Config: %s",
player_id,
self._vban_stream_name,
self._source_details.audio_format.output_format_str,
)
while (
self._source_details.in_use_by
and self._vban_stream
and not self._udp_socket_task.done()
):
try:
packet = await self._vban_stream.get_packet()
except asyncio.QueueShutDown: # type: ignore[attr-defined]
self.logger.error(
"Found VBANIncomingStream queue shut down when attempting to get VBAN packet"
)
break

# Skip processing full null packets.
# pipewire vban-send module constantly sends full null VBAN packets when a "Stream"
# is established e.g when squeezelite starts up with the vban-send sink as its
# output device.
if packet.body.data == bytes(len(packet.body.data)):
continue

yield packet.body.data
10 changes: 10 additions & 0 deletions music_assistant/providers/vban_receiver/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "plugin",
"domain": "vban_receiver",
"stage": "alpha",
"name": "VBAN Receiver",
"description": "VBAN protocol receiver - receive PCM-over-UDP streams from a VBAN protocol sender",
"codeowners": ["@sprocket-9"],
"documentation": "https://music-assistant.io/plugins/vban-receiver/",
"multi_instance": true
}
86 changes: 86 additions & 0 deletions music_assistant/providers/vban_receiver/vban.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""VBAN subclasses to workaround issues in aiovban 0.6.3."""

from __future__ import annotations

import asyncio
import logging
from dataclasses import dataclass
from typing import Any

from aiovban.asyncio import AsyncVBANClient
from aiovban.packet import VBANPacket
from aiovban.packet.headers import VBANHeaderException

logger = logging.getLogger(__name__)


@dataclass
class VBANBaseProtocolMod(asyncio.DatagramProtocol):
"""VBANBaseProtocol workaround."""

client: AsyncVBANClientMod

def __post_init__(self) -> None:
"""Initialize."""
# WORKAROUND: each instance gets it's own Future.
self.done: asyncio.Future[Any] = asyncio.get_event_loop().create_future()
# self.done = asyncio.get_event_loop().create_future()
self.background_tasks: set[asyncio.Task[Any]] = set()

def error_received(self, exc: Exception) -> None:
"""Handle error."""
self.done.set_exception(exc)

def connection_lost(self, exc: Exception | None) -> None:
"""Handle lost connection."""
if self.done.done():
return
# WORKAROUND: handle exc properly.
if exc:
self.done.set_exception(exc)
else:
self.done.set_result(None)


@dataclass
class VBANListenerProtocolMod(VBANBaseProtocolMod):
"""VBANListenerProcotol workaround."""

def connection_made(self, transport) -> None: # type: ignore[no-untyped-def]
"""Handle connection made."""
logger.debug(f"Connection made to {transport}")

def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
"""Handle received datagram."""
try:
if self.client.quick_reject(addr[0]):
return
packet = VBANPacket.unpack(data)
task = asyncio.create_task(self.client.process_packet(addr[0], addr[1], packet))
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
except VBANHeaderException as e:
logger.error(f"Error unpacking packet: {e}")


class AsyncVBANClientMod(AsyncVBANClient): # type: ignore[misc]
"""AsyncVBANClient workaround."""

async def listen(
self,
address: str = "0.0.0.0",
port: int = 6980,
loop: asyncio.AbstractEventLoop | None = None,
) -> None:
"""Create UDP listener."""
loop = loop or asyncio.get_running_loop()

# Create a socket and set the options
self._transport, proto = await loop.create_datagram_endpoint(
lambda: VBANListenerProtocolMod(self),
local_addr=(address, port),
allow_broadcast=not self.ignore_audio_streams,
)

# WORKAROUND: await, not return.
await proto.done
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"llvmlite==0.44.0",
"numpy==2.2.6",
"gql[all]==4.0.0",
"aiovban>=0.6.3",
]
description = "Music Assistant"
license = {text = "Apache-2.0"}
Expand Down
1 change: 1 addition & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ aiorun==2025.1.1
aioslimproto==3.1.1
aiosonos==0.1.9
aiosqlite==0.21.0
aiovban>=0.6.3
alexapy==1.29.8
async-upnp-client==0.45.0
audible==0.10.0
Expand Down