diff --git a/music_assistant/providers/vban_receiver/__init__.py b/music_assistant/providers/vban_receiver/__init__.py new file mode 100644 index 000000000..6a7ae62f8 --- /dev/null +++ b/music_assistant/providers/vban_receiver/__init__.py @@ -0,0 +1,283 @@ +"""VBAN protocol receiver plugin for Music Assistant.""" + +from __future__ import annotations + +import asyncio +import re +from collections.abc import AsyncGenerator +from contextlib import suppress +from typing import TYPE_CHECKING, cast + +from aiovban.asyncio.util import BackPressureStrategy +from aiovban.enums import VBANSampleRate +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption +from music_assistant_models.enums import ( + ConfigEntryType, + ContentType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import SetupFailedError +from music_assistant_models.media_items import AudioFormat +from music_assistant_models.streamdetails import StreamMetadata + +from music_assistant.constants import ( + CONF_BIND_IP, + CONF_BIND_PORT, + CONF_ENTRY_WARN_PREVIEW, +) +from music_assistant.helpers.util import ( + get_ip_addresses, +) +from music_assistant.models.plugin import PluginProvider, PluginSource + +from .vban import AsyncVBANClientMod + +if TYPE_CHECKING: + from aiovban.asyncio.device import VBANDevice + from aiovban.asyncio.streams import VBANIncomingStream + from music_assistant_models.config_entries import ConfigValueType, ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + +DEFAULT_UDP_PORT = 6980 +DEFAULT_PCM_AUDIO_FORMAT = "S16LE" +DEFAULT_PCM_SAMPLE_RATE = 44100 + +CONF_VBAN_STREAM_NAME = "vban_stream_name" +CONF_SENDER_HOST = "sender_host" +CONF_PCM_AUDIO_FORMAT = "audio_format" +CONF_PCM_SAMPLE_RATE = "sample_rate" + +SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE} + + +def _get_supported_pcm_formats() -> dict[str, int]: + """Return supported PCM formats.""" + pcm_formats = {} + for content_type in ContentType.__members__: + if match := re.match(r"PCM_([S|F](\d{2})LE)", content_type): + pcm_formats[match.group(1)] = int(match.group(2)) + return pcm_formats + + +def _get_vban_sample_rates() -> list[str]: + """Return supported VBAN sample rates.""" + return [member.split("_")[1] for member in VBANSampleRate.__members__] + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return VBANReceiverProvider(mass, manifest, config) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + instance_id: id of an existing provider instance (None if new instance setup). + action: [optional] action key called from config entries UI. + values: the (intermediate) raw values for config entries sent with the action. + """ + ip_addresses = await get_ip_addresses() + + def _validate_stream_name(config_value: str) -> bool: + """Validate stream name.""" + try: + config_value.encode("ascii") + except UnicodeEncodeError: + return False + return len(config_value) < 17 + + return ( + CONF_ENTRY_WARN_PREVIEW, + ConfigEntry( + key=CONF_BIND_PORT, + type=ConfigEntryType.INTEGER, + default_value=DEFAULT_UDP_PORT, + label="Receiver: UDP Port", + description="The UDP port the VBAN receiver will listen on for connections. " + "Make sure that this server can be reached " + "on the given IP and UDP port by remote VBAN senders.", + ), + ConfigEntry( + key=CONF_VBAN_STREAM_NAME, + type=ConfigEntryType.STRING, + label="Sender: VBAN Stream Name", + default_value="Network AUX", + description="Max 16 ASCII chars.\n" + "The VBAN stream name to expect from the remote VBAN sender.\n" + "This MUST match what the remote VBAN sender has set for the session name " + "otherwise audio streaming will not work.", + required=True, + validate=_validate_stream_name, # type: ignore[arg-type] + ), + ConfigEntry( + key=CONF_SENDER_HOST, + type=ConfigEntryType.STRING, + default_value="127.0.0.1", + label="Sender: VBAN Sender hostname/IP address", + description="The hostname/IP Address of the remote VBAN SENDER.", + required=True, + ), + ConfigEntry( + key=CONF_PCM_AUDIO_FORMAT, + type=ConfigEntryType.STRING, + default_value=DEFAULT_PCM_AUDIO_FORMAT, + options=[ConfigValueOption(x, x) for x in _get_supported_pcm_formats()], + label="PCM audio format", + description="The VBAN PCM audio format to expect from the remote VBAN sender. " + "This MUST match what the remote VBAN sender has set otherwise audio streaming " + "will not work.", + required=True, + ), + ConfigEntry( + key=CONF_PCM_SAMPLE_RATE, + type=ConfigEntryType.STRING, + default_value=DEFAULT_PCM_SAMPLE_RATE, + options=[ConfigValueOption(x, x) for x in _get_vban_sample_rates()], + label="PCM sample rate", + description="The VBAN PCM sample rate to expect from the remote VBAN sender. " + "This MUST match what the remote VBAN sender has set otherwise audio streaming " + "will not work.", + required=True, + ), + ConfigEntry( + key=CONF_BIND_IP, + type=ConfigEntryType.STRING, + default_value="0.0.0.0", + options=[ConfigValueOption(x, x) for x in {"0.0.0.0", *ip_addresses}], + label="Receiver: Bind to IP/interface", + description="Start the VBAN receiver on this specific interface. \n" + "Use 0.0.0.0 to bind to all interfaces, which is the default. \n" + "This is an advanced setting that should normally " + "not be adjusted in regular setups.", + category="advanced", + required=True, + ), + ) + + +class VBANReceiverProvider(PluginProvider): + """Implementation of a VBAN protocol receiver plugin.""" + + def __init__( + self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig + ) -> None: + """Initialize MusicProvider.""" + super().__init__(mass, manifest, config, SUPPORTED_FEATURES) + self.logger = self.logger.getChild(self.instance_id.split("--")[1]) + self._bind_port: int = cast("int", self.config.get_value(CONF_BIND_PORT)) + self._bind_ip: str = cast("str", self.config.get_value(CONF_BIND_IP)) + self._sender_host: str = cast("str", self.config.get_value(CONF_SENDER_HOST)) + self._vban_stream_name: str = cast("str", self.config.get_value(CONF_VBAN_STREAM_NAME)) + self._pcm_audio_format: str = cast("str", self.config.get_value(CONF_PCM_AUDIO_FORMAT)) + self._pcm_sample_rate: int = cast("int", self.config.get_value(CONF_PCM_SAMPLE_RATE)) + + self._vban_receiver: AsyncVBANClientMod | None = None + self._vban_sender: VBANDevice | None = None + self._vban_stream: VBANIncomingStream | None = None + + self._source_details = PluginSource( + id=self.instance_id, + name=f"{self.manifest.name}: {self._vban_stream_name}", + passive=False, + can_play_pause=False, + can_seek=False, + can_next_previous=False, + audio_format=AudioFormat( + content_type=ContentType(self._pcm_audio_format.lower()), + codec_type=ContentType(self._pcm_audio_format.lower()), + sample_rate=self._pcm_sample_rate, + bit_depth=_get_supported_pcm_formats()[self._pcm_audio_format], + channels=2, + ), + metadata=StreamMetadata( + title=self._vban_stream_name, + artist=self._sender_host, + ), + stream_type=StreamType.CUSTOM, + ) + + @property + def supported_features(self) -> set[ProviderFeature]: + """Return the features supported by this Provider.""" + return {ProviderFeature.AUDIO_SOURCE} + + @property + def instance_name_postfix(self) -> str | None: + """Return a (default) instance name postfix for this provider instance.""" + return self._vban_stream_name + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + self._vban_receiver = AsyncVBANClientMod(ignore_audio_streams=False) + try: + self._udp_socket_task = asyncio.create_task( + self._vban_receiver.listen(self._bind_ip, self._bind_port) + ) + except OSError as err: + raise SetupFailedError(f"Failed to start VBAN receiver plugin: {err}") from err + + self._vban_sender = self._vban_receiver.register_device(self._sender_host) + if self._vban_sender: + self._vban_stream = self._vban_sender.receive_stream( + self._vban_stream_name, back_pressure_strategy=BackPressureStrategy.DRAIN_OLDEST + ) + + async def unload(self, is_removed: bool = False) -> None: + """Handle close/cleanup of the provider.""" + self.logger.debug("Unloading plugin") + if self._vban_receiver: + self.logger.debug("Closing UDP transport") + self._vban_receiver.close() + with suppress(asyncio.CancelledError): + await self._udp_socket_task + + self._vban_receiver = None + self._vban_sender = None + self._vban_stream = None + await asyncio.sleep(0.1) + + def get_source(self) -> PluginSource: + """Get (audio)source details for this plugin.""" + return self._source_details + + async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]: + """Yield raw PCM chunks from the VBANIncomingStream queue.""" + self.logger.debug( + "Getting VBAN PCM audio stream for Player: %s//Stream: %s//Config: %s", + player_id, + self._vban_stream_name, + self._source_details.audio_format.output_format_str, + ) + while ( + self._source_details.in_use_by + and self._vban_stream + and not self._udp_socket_task.done() + ): + try: + packet = await self._vban_stream.get_packet() + except asyncio.QueueShutDown: # type: ignore[attr-defined] + self.logger.error( + "Found VBANIncomingStream queue shut down when attempting to get VBAN packet" + ) + break + + # Skip processing full null packets. + # pipewire vban-send module constantly sends full null VBAN packets when a "Stream" + # is established e.g when squeezelite starts up with the vban-send sink as its + # output device. + if packet.body.data == bytes(len(packet.body.data)): + continue + + yield packet.body.data diff --git a/music_assistant/providers/vban_receiver/manifest.json b/music_assistant/providers/vban_receiver/manifest.json new file mode 100644 index 000000000..dc42ca241 --- /dev/null +++ b/music_assistant/providers/vban_receiver/manifest.json @@ -0,0 +1,10 @@ +{ + "type": "plugin", + "domain": "vban_receiver", + "stage": "alpha", + "name": "VBAN Receiver", + "description": "VBAN protocol receiver - receive PCM-over-UDP streams from a VBAN protocol sender", + "codeowners": ["@sprocket-9"], + "documentation": "https://music-assistant.io/plugins/vban-receiver/", + "multi_instance": true +} diff --git a/music_assistant/providers/vban_receiver/vban.py b/music_assistant/providers/vban_receiver/vban.py new file mode 100644 index 000000000..0d27ab12f --- /dev/null +++ b/music_assistant/providers/vban_receiver/vban.py @@ -0,0 +1,86 @@ +"""VBAN subclasses to workaround issues in aiovban 0.6.3.""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from typing import Any + +from aiovban.asyncio import AsyncVBANClient +from aiovban.packet import VBANPacket +from aiovban.packet.headers import VBANHeaderException + +logger = logging.getLogger(__name__) + + +@dataclass +class VBANBaseProtocolMod(asyncio.DatagramProtocol): + """VBANBaseProtocol workaround.""" + + client: AsyncVBANClientMod + + def __post_init__(self) -> None: + """Initialize.""" + # WORKAROUND: each instance gets it's own Future. + self.done: asyncio.Future[Any] = asyncio.get_event_loop().create_future() + # self.done = asyncio.get_event_loop().create_future() + self.background_tasks: set[asyncio.Task[Any]] = set() + + def error_received(self, exc: Exception) -> None: + """Handle error.""" + self.done.set_exception(exc) + + def connection_lost(self, exc: Exception | None) -> None: + """Handle lost connection.""" + if self.done.done(): + return + # WORKAROUND: handle exc properly. + if exc: + self.done.set_exception(exc) + else: + self.done.set_result(None) + + +@dataclass +class VBANListenerProtocolMod(VBANBaseProtocolMod): + """VBANListenerProcotol workaround.""" + + def connection_made(self, transport) -> None: # type: ignore[no-untyped-def] + """Handle connection made.""" + logger.debug(f"Connection made to {transport}") + + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: + """Handle received datagram.""" + try: + if self.client.quick_reject(addr[0]): + return + packet = VBANPacket.unpack(data) + task = asyncio.create_task(self.client.process_packet(addr[0], addr[1], packet)) + self.background_tasks.add(task) + task.add_done_callback(self.background_tasks.discard) + except VBANHeaderException as e: + logger.error(f"Error unpacking packet: {e}") + + +class AsyncVBANClientMod(AsyncVBANClient): # type: ignore[misc] + """AsyncVBANClient workaround.""" + + async def listen( + self, + address: str = "0.0.0.0", + port: int = 6980, + loop: asyncio.AbstractEventLoop | None = None, + ) -> None: + """Create UDP listener.""" + loop = loop or asyncio.get_running_loop() + + # Create a socket and set the options + self._transport, proto = await loop.create_datagram_endpoint( + lambda: VBANListenerProtocolMod(self), + local_addr=(address, port), + allow_broadcast=not self.ignore_audio_streams, + ) + + # WORKAROUND: await, not return. + await proto.done diff --git a/pyproject.toml b/pyproject.toml index a9a9904e1..f22a41eef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dependencies = [ "llvmlite==0.44.0", "numpy==2.2.6", "gql[all]==4.0.0", + "aiovban>=0.6.3", ] description = "Music Assistant" license = {text = "Apache-2.0"} diff --git a/requirements_all.txt b/requirements_all.txt index 6d3578378..fc88e0d8b 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -14,6 +14,7 @@ aiorun==2025.1.1 aioslimproto==3.1.1 aiosonos==0.1.9 aiosqlite==0.21.0 +aiovban>=0.6.3 alexapy==1.29.8 async-upnp-client==0.45.0 audible==0.10.0