diff --git a/music_assistant/providers/squeezelite/multi_client_stream.py b/music_assistant/providers/squeezelite/multi_client_stream.py index 11acf23c0..b2b15cacb 100644 --- a/music_assistant/providers/squeezelite/multi_client_stream.py +++ b/music_assistant/providers/squeezelite/multi_client_stream.py @@ -25,7 +25,7 @@ def __init__( """Initialize MultiClientStream.""" self.audio_source = audio_source self.audio_format = audio_format - self.subscribers: list[asyncio.Queue] = [] + self.subscribers: list[asyncio.Queue[bytes]] = [] self.expected_clients = expected_clients self.task = asyncio.create_task(self._runner()) @@ -60,8 +60,8 @@ async def get_stream( async def subscribe_raw(self) -> AsyncGenerator[bytes, None]: """Subscribe to the raw/unaltered audio stream.""" + queue: asyncio.Queue[bytes] = asyncio.Queue(2) try: - queue = asyncio.Queue(2) self.subscribers.append(queue) while True: chunk = await queue.get() diff --git a/music_assistant/providers/squeezelite/player.py b/music_assistant/providers/squeezelite/player.py index 12211c624..fc225ee25 100644 --- a/music_assistant/providers/squeezelite/player.py +++ b/music_assistant/providers/squeezelite/player.py @@ -36,6 +36,7 @@ CONF_ENTRY_OUTPUT_CODEC, CONF_ENTRY_SUPPORT_CROSSFADE_DIFFERENT_SAMPLE_RATES, CONF_ENTRY_SYNC_ADJUST, + CONF_SAMPLE_RATES, INTERNAL_PCM_FORMAT, VERBOSE_LOG_LEVEL, create_sample_rates_config_entry, @@ -239,10 +240,31 @@ async def play_media(self, media: PlayerMedia) -> None: return # this is a syncgroup, we need to handle this with a multi client stream + # Get the minimum supported sample rate across all group members (LCD) + min_sample_rate = 192000 # Start high + for member_id in [self.player_id, *self.group_members]: + supported_rates_conf = cast( + "list[tuple[str, str]]", + await self.mass.config.get_player_config_value( + member_id, CONF_SAMPLE_RATES, unpack_splitted_values=True + ), + ) + if supported_rates_conf: + member_max_rate = max(int(x[0]) for x in supported_rates_conf) + min_sample_rate = min(min_sample_rate, member_max_rate) + + # For queue streams, further cap to content sample rate + if media.source_id and media.queue_item_id: + queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id) + min_sample_rate = min( + min_sample_rate, queue_item.streamdetails.audio_format.sample_rate + ) + master_audio_format = AudioFormat( content_type=INTERNAL_PCM_FORMAT.content_type, - sample_rate=INTERNAL_PCM_FORMAT.sample_rate, - bit_depth=INTERNAL_PCM_FORMAT.bit_depth, + sample_rate=min_sample_rate, + bit_depth=INTERNAL_PCM_FORMAT.bit_depth, # 32-bit float for processing + channels=2, ) if media.media_type == MediaType.ANNOUNCEMENT: # special case: stream announcement @@ -272,7 +294,8 @@ async def play_media(self, media: PlayerMedia) -> None: audio_source = self.mass.streams.get_queue_flow_stream( queue=self.mass.player_queues.get(media.source_id), start_queue_item=self.mass.player_queues.get_item( - media.source_id, media.queue_item_id + media.source_id, + media.queue_item_id, ), pcm_format=master_audio_format, ) @@ -292,11 +315,14 @@ async def play_media(self, media: PlayerMedia) -> None: f"{self.mass.streams.base_url}/slimproto/multi?player_id={self.player_id}&fmt=flac" ) + # Count how many clients will connect + expected_clients = len(list(self._get_sync_clients())) + stream.expected_clients = expected_clients + # forward to downstream play_media commands async with TaskManager(self.mass) as tg: for slimplayer in self._get_sync_clients(): url = f"{base_url}&child_player_id={slimplayer.player_id}" - stream.expected_clients += 1 tg.create_task( self._handle_play_url_for_slimplayer( slimplayer, @@ -361,7 +387,11 @@ async def set_members( # always update the state after modifying group members self.update_state() - if players_added and self.current_media and self.playback_state == PlaybackState.PLAYING: + if ( + (players_added or player_ids_to_remove) + and self.current_media + and self.playback_state == PlaybackState.PLAYING + ): # restart stream session if it was already playing # for now, we dont support late joining into an existing stream self.mass.create_task(self.mass.players.cmd_resume(self.player_id)) @@ -681,12 +711,16 @@ def _get_sync_clients(self) -> Iterator[SlimClient]: """Get all sync clients for a player.""" yield self.client for member_id in self.group_members: - if slimplayer := self.provider.slimproto.get_player(member_id): + if member_id == self.player_id: # ← Skip if it's the leader itself + continue + if self._provider.slimproto and ( + slimplayer := self._provider.slimproto.get_player(member_id) + ): yield slimplayer async def _patched_send_strm( # noqa: PLR0913 - self, + self: SlimClient, command: bytes = b"q", autostart: bytes = b"0", codec_details: bytes = b"p1321", diff --git a/music_assistant/providers/squeezelite/provider.py b/music_assistant/providers/squeezelite/provider.py index fe3d0d058..62cb0b07e 100644 --- a/music_assistant/providers/squeezelite/provider.py +++ b/music_assistant/providers/squeezelite/provider.py @@ -9,9 +9,7 @@ from aioslimproto.models import EventType as SlimEventType from aioslimproto.models import SlimEvent from aioslimproto.server import SlimServer -from music_assistant_models.enums import ContentType from music_assistant_models.errors import SetupFailedError -from music_assistant_models.media_items import AudioFormat from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL from music_assistant.helpers.audio import get_player_filter_params @@ -147,10 +145,9 @@ def _handle_slimproto_event( self.mass.create_task(player.setup()) return - if not (player := self.mass.players.get(event.player_id)): + if not (mass_player := self.mass.players.get(event.player_id)): return # guard for unknown player - if TYPE_CHECKING: - player = cast("SqueezelitePlayer", player) + player = cast("SqueezelitePlayer", mass_player) # Handle player disconnect if event.type == SlimEventType.PLAYER_DISCONNECTED: @@ -160,12 +157,19 @@ def _handle_slimproto_event( # forward all other events to the player itself player.handle_slim_event(event) - async def _serve_multi_client_stream(self, request: web.Request) -> web.Response: + async def _serve_multi_client_stream(self, request: web.Request) -> web.StreamResponse: """Serve the multi-client flow stream audio to a player.""" player_id = request.query.get("player_id") fmt = request.query.get("fmt") child_player_id = request.query.get("child_player_id") + if not player_id: + raise web.HTTPNotFound(reason="Missing player_id parameter") + if not fmt: + raise web.HTTPNotFound(reason="Missing fmt parameter") + if not child_player_id: + raise web.HTTPNotFound(reason="Missing child_player_id parameter") + if not (sync_parent := self.mass.players.get(player_id)): raise web.HTTPNotFound(reason=f"Unknown player: {player_id}") sync_parent = cast("SqueezelitePlayer", sync_parent) @@ -174,7 +178,7 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}") if not (stream := sync_parent.multi_client_stream) or stream.done: - raise web.HTTPNotFound(f"There is no active stream for {player_id}!") + raise web.HTTPNotFound(reason=f"There is no active stream for {player_id}!") resp = web.StreamResponse( status=200, @@ -194,7 +198,14 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response "Start serving multi-client flow audio stream to %s", child_player.display_name, ) - output_format = AudioFormat(content_type=ContentType.try_parse(fmt)) + + output_format = await self.mass.streams.get_output_format( + output_format_str=fmt, + player=child_player, + content_sample_rate=stream.audio_format.sample_rate, # Flow PCM sample rate + content_bit_depth=stream.audio_format.bit_depth, # Flow PCM bit depth (32) + ) + async for chunk in stream.get_stream( output_format=output_format, filter_params=get_player_filter_params( @@ -208,5 +219,4 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response except (BrokenPipeError, ConnectionResetError, ConnectionError): # race condition break - return resp