Skip to content

Commit

Permalink
Fix announcements to (universal) group players
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Oct 22, 2024
1 parent 87e6d2d commit d212ea8
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 82 deletions.
54 changes: 23 additions & 31 deletions music_assistant/server/controllers/players.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,30 +492,13 @@ async def play_announcement(
player_id,
CONF_TTS_PRE_ANNOUNCE,
)
if not native_announce_support and player.active_group:
for group_member in self.iter_group_members(player, True, True):
if PlayerFeature.PLAY_ANNOUNCEMENT in group_member.supported_features:
native_announce_support = True
break
# redirect to group player if playergroup is active
self.logger.warning(
"Detected announcement request to a player which has a group active, "
"this will be redirected to the group."
)
await self.play_announcement(
player.active_group, url, use_pre_announce, volume_level
)
return

# if player type is group with all members supporting announcements
# or if the groupplayer is not powered, we forward the request to each individual player
# if player type is group with all members supporting announcements,
# we forward the request to each individual player
if player.type == PlayerType.GROUP and (
all(
x
PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
for x in self.iter_group_members(player)
if PlayerFeature.PLAY_ANNOUNCEMENT in x.supported_features
)
or not player.powered
):
# forward the request to each individual player
async with TaskManager(self.mass) as tg:
Expand All @@ -529,7 +512,6 @@ async def play_announcement(
)
)
return

self.logger.info(
"Playback announcement to player %s (with pre-announce: %s): %s",
player.display_name,
Expand Down Expand Up @@ -1095,14 +1077,14 @@ async def _play_announcement(
- restore the previous power and volume
- restore playback (if needed and if possible)
This default implementation will only be used if the player's
provider has no native support for the PLAY_ANNOUNCEMENT feature.
This default implementation will only be used if the player
(provider) has no native support for the PLAY_ANNOUNCEMENT feature.
"""
prev_power = player.powered
prev_state = player.state
prev_synced_to = player.synced_to
queue = self.mass.player_queues.get_active_queue(player.player_id)
prev_queue_active = queue.active
queue = self.mass.player_queues.get(player.active_source)
prev_queue_active = queue and queue.active
prev_item_id = player.current_item_id
# unsync player if its currently synced
if prev_synced_to:
Expand All @@ -1128,13 +1110,23 @@ async def _play_announcement(
for volume_player_id in player.group_childs or (player.player_id,):
if not (volume_player := self.get(volume_player_id)):
continue
# filter out players that have a different source active
if volume_player.active_source not in (
player.active_source,
volume_player.player_id,
None,
# catch any players that have a different source active
if (
volume_player.active_source
not in (
player.active_source,
volume_player.player_id,
None,
)
and volume_player.state == PlayerState.PLAYING
):
continue
self.logger.warning(
"Detected announcement to playergroup %s while group member %s is playing "
"other content, this may lead to unexpected behavior.",
player.display_name,
volume_player.display_name,
)
tg.create_task(self.cmd_stop(volume_player.player_id))
prev_volume = volume_player.volume_level
announcement_volume = self.get_announcement_volume(volume_player_id, volume_level)
temp_volume = announcement_volume or player.volume_level
Expand Down
2 changes: 2 additions & 0 deletions music_assistant/server/models/player_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
This will NOT be called if the end of the queue is reached (and repeat disabled).
This will NOT be called if the player is using flow mode to playback the queue.
"""
# will only be called for players with ENQUEUE NEXT feature set.
raise NotImplementedError

async def play_announcement(
self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
Expand Down
15 changes: 8 additions & 7 deletions music_assistant/server/providers/player_group/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ async def cmd_power(self, player_id: str, powered: bool) -> None:
member.active_source = group_player.active_source
else:
# handle TURN_OFF of the group player by turning off all members
# optimistically set the group state to prevent race conditions
# with the unsync command
group_player.powered = False
for member in self.mass.players.iter_group_members(
group_player, only_powered=True, active_only=True
):
Expand Down Expand Up @@ -461,7 +464,7 @@ async def play_media(

# start the stream task
self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac"
base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.mp3"

# set the state optimistically
group_player.current_media = media
Expand Down Expand Up @@ -595,7 +598,7 @@ async def cmd_unsync_member(self, player_id: str, target_player: str) -> None:
CONFIG_ENTRY_DYNAMIC_MEMBERS.key,
CONFIG_ENTRY_DYNAMIC_MEMBERS.default_value,
)
if not dynamic_members_enabled:
if group_player.powered and not dynamic_members_enabled:
raise UnsupportedFeaturedException(
f"Adjusting group members is not allowed for group {group_player.display_name}"
)
Expand Down Expand Up @@ -645,7 +648,7 @@ async def _register_group_player(
model_name = "Universal Group"
manufacturer = self.name
# register dynamic route for the ugp stream
route_path = f"/ugp/{group_player_id}.aac"
route_path = f"/ugp/{group_player_id}.mp3"
self._on_unload.append(
self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
)
Expand All @@ -659,7 +662,7 @@ async def _register_group_player(
PlayerFeature.PAUSE,
PlayerFeature.VOLUME_MUTE,
):
if all(x for x in player_provider.players if feature in x.supported_features):
if all(feature in x.supported_features for x in player_provider.players):
player_features.add(feature)
else:
raise PlayerUnavailableError(f"Provider for syncgroup {group_type} is not available!")
Expand Down Expand Up @@ -758,8 +761,6 @@ def _update_attributes(self, player: Player) -> None:
"""Update attributes of a player."""
for child_player in self.mass.players.iter_group_members(player, active_only=True):
# just grab the first active player
if child_player.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
continue
if child_player.synced_to:
continue
player.state = child_player.state
Expand Down Expand Up @@ -789,7 +790,7 @@ async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
)
headers = {
**DEFAULT_STREAM_HEADERS,
"Content-Type": "audio/aac",
"Content-Type": "audio/mp3",
"Accept-Ranges": "none",
"Cache-Control": "no-cache",
"Connection": "close",
Expand Down
29 changes: 17 additions & 12 deletions music_assistant/server/providers/player_group/ugp_stream.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""
Implementation of a Stream for the Universal Group Player.
Basically this is like a fake radio radio stream (AAC) format with multiple subscribers.
The AAC format is chosen because it is widely supported and has a good balance between
quality and bandwidth and also allows for mid-stream joining of (extra) players.
Basically this is like a fake radio radio stream (MP3) format with multiple subscribers.
The MP3 format is chosen because it is widely supported.
"""

from __future__ import annotations

import asyncio
from collections.abc import AsyncGenerator, Awaitable, Callable

from music_assistant.common.helpers.util import empty_queue
from music_assistant.common.models.enums import ContentType
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.server.helpers.audio import get_ffmpeg_stream
Expand All @@ -19,7 +19,7 @@

UGP_FORMAT = AudioFormat(
content_type=ContentType.PCM_F32LE,
sample_rate=44100,
sample_rate=48000,
bit_depth=32,
)

Expand All @@ -28,9 +28,8 @@ class UGPStream:
"""
Implementation of a Stream for the Universal Group Player.
Basically this is like a fake radio radio stream (AAC) format with multiple subscribers.
The AAC format is chosen because it is widely supported and has a good balance between
quality and bandwidth and also allows for mid-stream joining of (extra) players.
Basically this is like a fake radio radio stream (MP3) format with multiple subscribers.
The MP3 format is chosen because it is widely supported.
"""

def __init__(
Expand All @@ -41,7 +40,7 @@ def __init__(
"""Initialize UGP Stream."""
self.audio_source = audio_source
self.input_format = audio_format
self.output_format = AudioFormat(content_type=ContentType.AAC)
self.output_format = AudioFormat(content_type=ContentType.MP3)
self.subscribers: list[Callable[[bytes], Awaitable]] = []
self._task: asyncio.Task | None = None
self._done: asyncio.Event = asyncio.Event()
Expand All @@ -64,7 +63,7 @@ async def subscribe(self) -> AsyncGenerator[bytes, None]:
# start the runner as soon as the (first) client connects
if not self._task:
self._task = asyncio.create_task(self._runner())
queue = asyncio.Queue(1)
queue = asyncio.Queue(10)
try:
self.subscribers.append(queue.put)
while True:
Expand All @@ -74,6 +73,8 @@ async def subscribe(self) -> AsyncGenerator[bytes, None]:
yield chunk
finally:
self.subscribers.remove(queue.put)
empty_queue(queue)
del queue

async def _runner(self) -> None:
"""Run the stream for the given audio source."""
Expand All @@ -82,10 +83,14 @@ async def _runner(self) -> None:
audio_input=self.audio_source,
input_format=self.input_format,
output_format=self.output_format,
# TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version
# extra_input_args=["-readrate", "1.15"],
# enable realtime to prevent too much buffering ahead
# TODO: enable initial burst once we have a newer ffmpeg version
extra_input_args=["-re"],
):
await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True)
await asyncio.gather(
*[sub(chunk) for sub in self.subscribers],
return_exceptions=True,
)
# empty chunk when done
await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
self._done.set()
41 changes: 38 additions & 3 deletions music_assistant/server/providers/sonos/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
from aiosonos.const import SonosEvent
from aiosonos.exceptions import ConnectionFailed, FailedCommand

from music_assistant.common.models.enums import EventType, PlayerFeature, PlayerState, PlayerType
from music_assistant.common.models.enums import (
EventType,
PlayerFeature,
PlayerState,
PlayerType,
RepeatMode,
)
from music_assistant.common.models.event import MassEvent
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import CONF_CROSSFADE

from .const import (
CONF_AIRPLAY_MODE,
Expand Down Expand Up @@ -145,7 +152,7 @@ async def setup(self) -> None:
# register callback for playerqueue state changes
self._on_cleanup_callbacks.append(
self.mass.subscribe(
self._on_mass_queue_event,
self._on_mass_queue_items_event,
EventType.QUEUE_ITEMS_UPDATED,
self.player_id,
)
Expand Down Expand Up @@ -416,11 +423,39 @@ def _on_airplay_player_event(self, event: MassEvent) -> None:
self.update_attributes()
self.mass.players.update(self.player_id)

async def _on_mass_queue_event(self, event: MassEvent) -> None:
async def _on_mass_queue_items_event(self, event: MassEvent) -> None:
"""Handle incoming event from linked MA playerqueue."""
# If the queue items changed and we have an active sonos queue,
# we need to inform the sonos queue to refresh the items.
if self.mass_player.active_source != event.object_id:
return
if not self.connected:
return
queue = self.mass.player_queues.get(event.object_id)
if not queue or queue.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
return
if session_id := self.client.player.group.active_session_id:
await self.client.api.playback_session.refresh_cloud_queue(session_id)

async def _on_mass_queue_event(self, event: MassEvent) -> None:
"""Handle incoming event from linked MA playerqueue."""
if self.mass_player.active_source != event.object_id:
return
if not self.connected:
return
# sync crossfade and repeat modes
queue = self.mass.player_queues.get(event.object_id)
if not queue or queue.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
return
crossfade = await self.mass.config.get_player_config_value(queue.queue_id, CONF_CROSSFADE)
repeat_single_enabled = queue.repeat_mode == RepeatMode.ONE
repeat_all_enabled = queue.repeat_mode == RepeatMode.ALL
play_modes = self.client.player.group.play_modes
if (
play_modes.crossfade != crossfade
or play_modes.repeat != repeat_all_enabled
or play_modes.repeat_one != repeat_single_enabled
):
await self.client.player.group.set_play_modes(
crossfade=crossfade, repeat=repeat_all_enabled, repeat_one=repeat_single_enabled
)
35 changes: 6 additions & 29 deletions music_assistant/server/providers/sonos/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@
ConfigEntry,
create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
ConfigEntryType,
ContentType,
ProviderFeature,
RepeatMode,
)
from music_assistant.common.models.enums import ConfigEntryType, ContentType, ProviderFeature
from music_assistant.common.models.errors import PlayerCommandFailed
from music_assistant.common.models.player import DeviceInfo, PlayerMedia
from music_assistant.constants import CONF_CROSSFADE, MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL
from music_assistant.constants import MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL
from music_assistant.server.models.player_provider import PlayerProvider

from .const import CONF_AIRPLAY_MODE
Expand Down Expand Up @@ -262,6 +257,7 @@ async def play_media(
return

# play a single uri/url
# note that this most probably will only work for (long running) radio streams
if self.mass.config.get_raw_player_config_value(
player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value
):
Expand All @@ -282,28 +278,9 @@ async def cmd_previous(self, player_id: str) -> None:

async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of the next queue item on the player."""
sonos_player = self.sonos_players[player_id]
if sonos_player.get_linked_airplay_player(True):
# linked airplay player is active, ignore this command
return
if session_id := sonos_player.client.player.group.active_session_id:
await sonos_player.client.api.playback_session.refresh_cloud_queue(session_id)
# sync play modes from player queue --> sonos
mass_queue = self.mass.player_queues.get(media.queue_id)
crossfade = await self.mass.config.get_player_config_value(
mass_queue.queue_id, CONF_CROSSFADE
)
repeat_single_enabled = mass_queue.repeat_mode == RepeatMode.ONE
repeat_all_enabled = mass_queue.repeat_mode == RepeatMode.ALL
play_modes = sonos_player.client.player.group.play_modes
if (
play_modes.crossfade != crossfade
or play_modes.repeat != repeat_all_enabled
or play_modes.repeat_one != repeat_single_enabled
):
await sonos_player.client.player.group.set_play_modes(
crossfade=crossfade, repeat=repeat_all_enabled, repeat_one=repeat_single_enabled
)
# We do nothing here as we handle the queue in the cloud queue endpoint.
# For sonos s2, instead of enqueuing tracks one by one, the sonos player itself
# can interact with our queue directly through the cloud queue endpoint.

async def play_announcement(
self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
Expand Down

0 comments on commit d212ea8

Please sign in to comment.