Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions music_assistant/providers/squeezelite/multi_client_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(
"""Initialize MultiClientStream."""
self.audio_source = audio_source
self.audio_format = audio_format
self.subscribers: list[asyncio.Queue] = []
self.subscribers: list[asyncio.Queue[bytes]] = []
self.expected_clients = expected_clients
self.task = asyncio.create_task(self._runner())

Expand Down Expand Up @@ -60,8 +60,8 @@ async def get_stream(

async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
"""Subscribe to the raw/unaltered audio stream."""
queue: asyncio.Queue[bytes] = asyncio.Queue(2)
try:
queue = asyncio.Queue(2)
self.subscribers.append(queue)
while True:
chunk = await queue.get()
Expand Down
48 changes: 41 additions & 7 deletions music_assistant/providers/squeezelite/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
CONF_ENTRY_OUTPUT_CODEC,
CONF_ENTRY_SUPPORT_CROSSFADE_DIFFERENT_SAMPLE_RATES,
CONF_ENTRY_SYNC_ADJUST,
CONF_SAMPLE_RATES,
INTERNAL_PCM_FORMAT,
VERBOSE_LOG_LEVEL,
create_sample_rates_config_entry,
Expand Down Expand Up @@ -239,10 +240,31 @@ async def play_media(self, media: PlayerMedia) -> None:
return

# this is a syncgroup, we need to handle this with a multi client stream
# Get the minimum supported sample rate across all group members (LCD)
min_sample_rate = 192000 # Start high
for member_id in [self.player_id, *self.group_members]:
supported_rates_conf = cast(
"list[tuple[str, str]]",
await self.mass.config.get_player_config_value(
member_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
),
)
if supported_rates_conf:
member_max_rate = max(int(x[0]) for x in supported_rates_conf)
min_sample_rate = min(min_sample_rate, member_max_rate)

# For queue streams, further cap to content sample rate
if media.source_id and media.queue_item_id:
queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
min_sample_rate = min(
min_sample_rate, queue_item.streamdetails.audio_format.sample_rate
)

master_audio_format = AudioFormat(
content_type=INTERNAL_PCM_FORMAT.content_type,
sample_rate=INTERNAL_PCM_FORMAT.sample_rate,
bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
sample_rate=min_sample_rate,
bit_depth=INTERNAL_PCM_FORMAT.bit_depth, # 32-bit float for processing
channels=2,
)
if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
Expand Down Expand Up @@ -272,7 +294,8 @@ async def play_media(self, media: PlayerMedia) -> None:
audio_source = self.mass.streams.get_queue_flow_stream(
queue=self.mass.player_queues.get(media.source_id),
start_queue_item=self.mass.player_queues.get_item(
media.source_id, media.queue_item_id
media.source_id,
media.queue_item_id,
),
pcm_format=master_audio_format,
)
Expand All @@ -292,11 +315,14 @@ async def play_media(self, media: PlayerMedia) -> None:
f"{self.mass.streams.base_url}/slimproto/multi?player_id={self.player_id}&fmt=flac"
)

# Count how many clients will connect
expected_clients = len(list(self._get_sync_clients()))
stream.expected_clients = expected_clients

# forward to downstream play_media commands
async with TaskManager(self.mass) as tg:
for slimplayer in self._get_sync_clients():
url = f"{base_url}&child_player_id={slimplayer.player_id}"
stream.expected_clients += 1
tg.create_task(
self._handle_play_url_for_slimplayer(
slimplayer,
Expand Down Expand Up @@ -361,7 +387,11 @@ async def set_members(
# always update the state after modifying group members
self.update_state()

if players_added and self.current_media and self.playback_state == PlaybackState.PLAYING:
if (
(players_added or player_ids_to_remove)
and self.current_media
and self.playback_state == PlaybackState.PLAYING
):
# restart stream session if it was already playing
# for now, we dont support late joining into an existing stream
self.mass.create_task(self.mass.players.cmd_resume(self.player_id))
Expand Down Expand Up @@ -681,12 +711,16 @@ def _get_sync_clients(self) -> Iterator[SlimClient]:
"""Get all sync clients for a player."""
yield self.client
for member_id in self.group_members:
if slimplayer := self.provider.slimproto.get_player(member_id):
if member_id == self.player_id: # ← Skip if it's the leader itself
continue
if self._provider.slimproto and (
slimplayer := self._provider.slimproto.get_player(member_id)
):
yield slimplayer


async def _patched_send_strm( # noqa: PLR0913
self,
self: SlimClient,
command: bytes = b"q",
autostart: bytes = b"0",
codec_details: bytes = b"p1321",
Expand Down
28 changes: 19 additions & 9 deletions music_assistant/providers/squeezelite/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
from aioslimproto.models import EventType as SlimEventType
from aioslimproto.models import SlimEvent
from aioslimproto.server import SlimServer
from music_assistant_models.enums import ContentType
from music_assistant_models.errors import SetupFailedError
from music_assistant_models.media_items import AudioFormat

from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
from music_assistant.helpers.audio import get_player_filter_params
Expand Down Expand Up @@ -147,10 +145,9 @@ def _handle_slimproto_event(
self.mass.create_task(player.setup())
return

if not (player := self.mass.players.get(event.player_id)):
if not (mass_player := self.mass.players.get(event.player_id)):
return # guard for unknown player
if TYPE_CHECKING:
player = cast("SqueezelitePlayer", player)
player = cast("SqueezelitePlayer", mass_player)

# Handle player disconnect
if event.type == SlimEventType.PLAYER_DISCONNECTED:
Expand All @@ -160,12 +157,19 @@ def _handle_slimproto_event(
# forward all other events to the player itself
player.handle_slim_event(event)

async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
async def _serve_multi_client_stream(self, request: web.Request) -> web.StreamResponse:
"""Serve the multi-client flow stream audio to a player."""
player_id = request.query.get("player_id")
fmt = request.query.get("fmt")
child_player_id = request.query.get("child_player_id")

if not player_id:
raise web.HTTPNotFound(reason="Missing player_id parameter")
if not fmt:
raise web.HTTPNotFound(reason="Missing fmt parameter")
if not child_player_id:
raise web.HTTPNotFound(reason="Missing child_player_id parameter")

if not (sync_parent := self.mass.players.get(player_id)):
raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
sync_parent = cast("SqueezelitePlayer", sync_parent)
Expand All @@ -174,7 +178,7 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response
raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")

if not (stream := sync_parent.multi_client_stream) or stream.done:
raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
raise web.HTTPNotFound(reason=f"There is no active stream for {player_id}!")

resp = web.StreamResponse(
status=200,
Expand All @@ -194,7 +198,14 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response
"Start serving multi-client flow audio stream to %s",
child_player.display_name,
)
output_format = AudioFormat(content_type=ContentType.try_parse(fmt))

output_format = await self.mass.streams.get_output_format(
output_format_str=fmt,
player=child_player,
content_sample_rate=stream.audio_format.sample_rate, # Flow PCM sample rate
content_bit_depth=stream.audio_format.bit_depth, # Flow PCM bit depth (32)
)

async for chunk in stream.get_stream(
output_format=output_format,
filter_params=get_player_filter_params(
Expand All @@ -208,5 +219,4 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response
except (BrokenPipeError, ConnectionResetError, ConnectionError):
# race condition
break

return resp