Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions music_assistant/providers/squeezelite/multi_client_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from music_assistant_models.media_items import AudioFormat

from music_assistant.helpers.audio import get_ffmpeg_stream
from music_assistant.helpers.audio import get_ffmpeg_stream # type: ignore[attr-defined]
from music_assistant.helpers.util import empty_queue

LOGGER = logging.getLogger(__name__)
Expand All @@ -20,12 +20,14 @@ def __init__(
self,
audio_source: AsyncGenerator[bytes, None],
audio_format: AudioFormat,
content_format: AudioFormat,
expected_clients: int = 0,
) -> None:
"""Initialize MultiClientStream."""
self.audio_source = audio_source
self.audio_format = audio_format
self.subscribers: list[asyncio.Queue] = []
self.content_format = content_format
self.subscribers: list[asyncio.Queue[bytes]] = []
self.expected_clients = expected_clients
self.task = asyncio.create_task(self._runner())

Expand Down Expand Up @@ -61,7 +63,7 @@ async def get_stream(
async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
"""Subscribe to the raw/unaltered audio stream."""
try:
queue = asyncio.Queue(2)
queue: asyncio.Queue[bytes] = asyncio.Queue(2)
self.subscribers.append(queue)
while True:
chunk = await queue.get()
Expand Down
19 changes: 13 additions & 6 deletions music_assistant/providers/squeezelite/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,14 @@ async def play_media(self, media: PlayerMedia) -> None:
return

# this is a syncgroup, we need to handle this with a multi client stream
flow_format = await self.mass.streams._select_flow_format(self)
master_audio_format = AudioFormat(
content_type=INTERNAL_PCM_FORMAT.content_type,
sample_rate=INTERNAL_PCM_FORMAT.sample_rate,
sample_rate=flow_format.sample_rate,
bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
channels=2,
)
content_format = master_audio_format
if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
audio_source = self.mass.streams.get_announcement_stream(
Expand All @@ -265,24 +268,28 @@ async def play_media(self, media: PlayerMedia) -> None:
audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
elif media.source_id and media.queue_item_id:
# regular queue stream request
queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
audio_source = self.mass.streams.get_queue_flow_stream(
queue=self.mass.player_queues.get(media.source_id),
start_queue_item=self.mass.player_queues.get_item(
media.source_id, media.queue_item_id
),
start_queue_item=queue_item,
pcm_format=master_audio_format,
)
content_format = queue_item.streamdetails.audio_format
else:
# assume url or some other direct path
# NOTE: this will fail if its an uri not playable by ffmpeg
audio_source = get_ffmpeg_stream(
audio_input=media.uri,
input_format=AudioFormat(ContentType.try_parse(media.uri)),
input_format=AudioFormat(
content_type=ContentType.try_parse(media.uri) or ContentType.UNKNOWN
),
output_format=master_audio_format,
)
# start the stream task
self.multi_client_stream = stream = MultiClientStream(
audio_source=audio_source, audio_format=master_audio_format
audio_source=audio_source,
audio_format=master_audio_format,
content_format=content_format,
)
base_url = (
f"{self.mass.streams.base_url}/slimproto/multi?player_id={self.player_id}&fmt=flac"
Expand Down
32 changes: 20 additions & 12 deletions music_assistant/providers/squeezelite/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
from aioslimproto.models import EventType as SlimEventType
from aioslimproto.models import SlimEvent
from aioslimproto.server import SlimServer
from music_assistant_models.enums import ContentType
from music_assistant_models.errors import SetupFailedError
from music_assistant_models.media_items import AudioFormat

from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
from music_assistant.helpers.audio import get_player_filter_params
Expand Down Expand Up @@ -147,10 +145,9 @@ def _handle_slimproto_event(
self.mass.create_task(player.setup())
return

if not (player := self.mass.players.get(event.player_id)):
if not (mass_player := self.mass.players.get(event.player_id)):
return # guard for unknown player
if TYPE_CHECKING:
player = cast("SqueezelitePlayer", player)
player = cast("SqueezelitePlayer", mass_player)

# Handle player disconnect
if event.type == SlimEventType.PLAYER_DISCONNECTED:
Expand All @@ -160,12 +157,19 @@ def _handle_slimproto_event(
# forward all other events to the player itself
player.handle_slim_event(event)

async def _serve_multi_client_stream(self, request: web.Request) -> web.Response:
async def _serve_multi_client_stream(self, request: web.Request) -> web.StreamResponse:
"""Serve the multi-client flow stream audio to a player."""
player_id = request.query.get("player_id")
fmt = request.query.get("fmt")
child_player_id = request.query.get("child_player_id")

if not player_id:
raise web.HTTPNotFound(reason="Missing player_id parameter")
if not fmt:
raise web.HTTPNotFound(reason="Missing fmt parameter")
if not child_player_id:
raise web.HTTPNotFound(reason="Missing child_player_id parameter")

if not (sync_parent := self.mass.players.get(player_id)):
raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
sync_parent = cast("SqueezelitePlayer", sync_parent)
Expand All @@ -174,7 +178,7 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response
raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")

if not (stream := sync_parent.multi_client_stream) or stream.done:
raise web.HTTPNotFound(f"There is no active stream for {player_id}!")
raise web.HTTPNotFound(reason=f"There is no active stream for {player_id}!")

resp = web.StreamResponse(
status=200,
Expand All @@ -194,19 +198,23 @@ async def _serve_multi_client_stream(self, request: web.Request) -> web.Response
"Start serving multi-client flow audio stream to %s",
child_player.display_name,
)
output_format = AudioFormat(content_type=ContentType.try_parse(fmt))

output_format = await self.mass.streams.get_output_format(
output_format_str=fmt,
player=child_player,
content_sample_rate=stream.content_format.sample_rate, # ← CHANGE
content_bit_depth=stream.content_format.bit_depth,
)

async for chunk in stream.get_stream(
output_format=output_format,
filter_params=get_player_filter_params(
self.mass, child_player_id, stream.audio_format, output_format
)
if child_player_id
else None,
),
):
try:
await resp.write(chunk)
except (BrokenPipeError, ConnectionResetError, ConnectionError):
# race condition
break

return resp
Loading