Skip to content

Commit

Permalink
Fix parsing of HLS (sub)streams (#1727)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt committed Oct 19, 2024
1 parent 47ead7b commit 0f8c8fc
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 90 deletions.
118 changes: 36 additions & 82 deletions music_assistant/server/helpers/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import re
import struct
import time
from collections import deque
from collections.abc import AsyncGenerator
from io import BytesIO
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -44,7 +43,6 @@
from .ffmpeg import FFMpeg, get_ffmpeg_stream
from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from .process import AsyncProcess, check_output, communicate
from .tags import parse_tags
from .throttle_retry import BYPASS_THROTTLER
from .util import TimedAsyncGenerator, create_tempfile, detect_charset

Expand Down Expand Up @@ -294,7 +292,7 @@ async def get_media_stream(
try:
await ffmpeg_proc.start()
async for chunk in TimedAsyncGenerator(
ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 60
ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), 300
):
# for radio streams we just yield all chunks directly
if streamdetails.media_type == MediaType.RADIO:
Expand Down Expand Up @@ -580,87 +578,37 @@ async def get_hls_radio_stream(
"""Get radio audio stream from HTTP HLS playlist."""
logger = LOGGER.getChild("hls_stream")
logger.debug("Start streaming HLS stream for url %s", url)
timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
prev_chunks: deque[str] = deque(maxlen=50)
has_playlist_metadata: bool | None = None
has_id3_metadata: bool | None = None
# we simply select the best quality substream here
# if we ever want to support adaptive stream selection based on bandwidth
# we need to move the substream selection into the loop below and make it
# bandwidth aware. For now we just assume domestic high bandwidth where
# the user wants the best quality possible at all times.
playlist_item = await get_hls_substream(mass, url)
substream_url = playlist_item.path
empty_loops = 0
while True:
loops = 50 if streamdetails.media_type != MediaType.RADIO else 1
while loops:
logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
async with mass.http_session.get(
substream_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
resp.raise_for_status()
raw_data = await resp.read()
encoding = resp.charset or await detect_charset(raw_data)
substream_m3u_data = raw_data.decode(encoding)
# get chunk-parts from the substream
hls_chunks = parse_m3u(substream_m3u_data)
chunk_seconds = 0
time_start = time.time()
for chunk_item in hls_chunks:
if chunk_item.path in prev_chunks:
continue
chunk_length = int(chunk_item.length) if chunk_item.length else 6
chunk_item_url = chunk_item.path
if not chunk_item_url.startswith("http"):
# path is relative, stitch it together
base_path = substream_url.rsplit("/", 1)[0]
chunk_item_url = base_path + "/" + chunk_item.path
# handle (optional) in-playlist (timed) metadata
if has_playlist_metadata is None:
has_playlist_metadata = chunk_item.title not in (None, "")
logger.debug("Station support for in-playlist metadata: %s", has_playlist_metadata)
if has_playlist_metadata and chunk_item.title != "no desc":
# bbc (and maybe others?) set the title to 'no desc'
cleaned_stream_title = clean_stream_title(chunk_item.title)
if cleaned_stream_title != streamdetails.stream_title:
logger.log(
VERBOSE_LOG_LEVEL, "HLS Radio streamtitle original: %s", chunk_item.title
)
logger.log(
VERBOSE_LOG_LEVEL, "HLS Radio streamtitle cleaned: %s", cleaned_stream_title
)
streamdetails.stream_title = cleaned_stream_title
logger.log(VERBOSE_LOG_LEVEL, "playing chunk %s", chunk_item)
# prevent that we play this chunk again if we loop through
prev_chunks.append(chunk_item.path)
async with mass.http_session.get(
chunk_item_url, headers=HTTP_HEADERS, timeout=timeout
) as resp:
yield await resp.content.read()
chunk_seconds += chunk_length
# handle (optional) in-band (m3u) metadata
if has_id3_metadata is not None and has_playlist_metadata:
# We simply let ffmpeg deal with parsing the HLS playlist and stichting chunks together.
# However we do not feed the playlist URL to ffmpeg directly to give us the possibility
# to monitor the stream title and other metadata for radio streams in the future.
# Also, we've seen cases where ffmpeg sometimes chokes in a stream and aborts, which is not
# very useful for radio streams which you want to simply go on forever, so we need to loop
# and restart ffmpeg in case of an error.
input_format = AudioFormat(content_type=ContentType.UNKNOWN)
audio_format_detected = False
async for chunk in get_ffmpeg_stream(
audio_input=substream_url,
input_format=input_format,
output_format=AudioFormat(content_type=ContentType.WAV),
):
yield chunk
if audio_format_detected:
continue
if has_id3_metadata in (None, True):
tags = await parse_tags(chunk_item_url)
has_id3_metadata = tags.title and tags.title not in chunk_item.path
logger.debug("Station support for in-band (ID3) metadata: %s", has_id3_metadata)

# end of playlist reached - we loop around to get the next playlist with chunks
# safeguard for an endless loop
# this may happen if we're simply going too fast for the live stream
# we already throttle it a bit but we may end up in a situation where something is wrong
# and we want to break out of this loop, hence this check
if chunk_seconds == 0:
empty_loops += 1
await asyncio.sleep(1)
else:
empty_loops = 0
if empty_loops == 50:
logger.warning("breaking out of endless loop")
break
# ensure that we're not going to fast - otherwise we get the same substream playlist
while (time.time() - time_start) < (chunk_seconds - 1):
await asyncio.sleep(0.5)
if input_format.content_type not in (ContentType.UNKNOWN, ContentType.WAV):
# we need to determine the audio format from the first chunk
streamdetails.audio_format = input_format
audio_format_detected = True
loops -= 1


async def get_hls_substream(
Expand All @@ -679,15 +627,21 @@ async def get_hls_substream(
raw_data = await resp.read()
encoding = resp.charset or await detect_charset(raw_data)
master_m3u_data = raw_data.decode(encoding)
if not allow_encrypted and "EXT-X-KEY:METHOD=AES-128" in master_m3u_data:
# for now we don't support encrypted HLS streams
if not allow_encrypted and "EXT-X-KEY:METHOD=" in master_m3u_data:
# for now we do not (yet) support encrypted HLS streams
raise InvalidDataError("HLS stream is encrypted, not supported")
substreams = parse_m3u(master_m3u_data)
if any(x for x in substreams if x.length or x.key):
# this is already a substream!
return PlaylistItem(
path=url,
)
# There is a chance that we did not get a master playlist with subplaylists
# but just a single master/sub playlist with the actual audio stream(s)
# so we need to detect if the playlist child's contain audio streams or
# sub-playlists.
if any(
x
for x in substreams
if (x.length or x.path.endswith((".mp4", ".aac")))
and not x.path.endswith((".m3u", ".m3u8"))
):
return PlaylistItem(path=url, key=substreams[0].key)
# sort substreams on best quality (highest bandwidth) when available
if any(x for x in substreams if x.stream_info):
substreams.sort(key=lambda x: int(x.stream_info.get("BANDWIDTH", "0")), reverse=True)
Expand Down
4 changes: 2 additions & 2 deletions music_assistant/server/helpers/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def _feed_stdin(self) -> None:
generator_exhausted = False
audio_received = False
try:
async for chunk in TimedAsyncGenerator(self.audio_input, 30):
async for chunk in TimedAsyncGenerator(self.audio_input, 300):
audio_received = True
await self.write(chunk)
generator_exhausted = True
Expand Down Expand Up @@ -169,7 +169,7 @@ async def get_ffmpeg_stream(
) as ffmpeg_proc:
# read final chunks from stdout
iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
async for chunk in TimedAsyncGenerator(iterator, 60):
async for chunk in iterator:
yield chunk


Expand Down
10 changes: 7 additions & 3 deletions music_assistant/server/helpers/playlists.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ def parse_pls(pls_data: str) -> list[PlaylistItem]:
return playlist


async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]:
async def fetch_playlist(
mass: MusicAssistant, url: str, raise_on_hls: bool = True
) -> list[PlaylistItem]:
"""Parse an online m3u or pls playlist."""
try:
async with mass.http_session.get(url, allow_redirects=True, timeout=5) as resp:
Expand All @@ -164,8 +166,10 @@ async def fetch_playlist(mass: MusicAssistant, url: str) -> list[PlaylistItem]:
msg = f"Error while fetching playlist {url}"
raise InvalidDataError(msg) from err

if "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data:
raise IsHLSPlaylist(encrypted="#EXT-X-KEY:" in playlist_data)
if raise_on_hls and "#EXT-X-VERSION:" in playlist_data or "#EXT-X-STREAM-INF:" in playlist_data:
exc = IsHLSPlaylist()
exc.encrypted = "#EXT-X-KEY:" in playlist_data
raise exc

if url.endswith((".m3u", ".m3u8")):
playlist = parse_m3u(playlist_data)
Expand Down
12 changes: 9 additions & 3 deletions music_assistant/server/providers/apple_music/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_PASSWORD
from music_assistant.server.helpers.app_vars import app_var
from music_assistant.server.helpers.audio import get_hls_substream
from music_assistant.server.helpers.playlists import fetch_playlist
from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.server.models.music_provider import MusicProvider

Expand Down Expand Up @@ -721,8 +721,14 @@ async def _parse_stream_url_and_uri(self, stream_assets: list[dict]) -> str:
ctrp256_urls = [asset["URL"] for asset in stream_assets if asset["flavor"] == "28:ctrp256"]
if len(ctrp256_urls) == 0:
raise MediaNotFoundError("No ctrp256 URL found for song.")
playlist_item = await get_hls_substream(self.mass, ctrp256_urls[0])
track_url = playlist_item.path
playlist_url = ctrp256_urls[0]
playlist_items = await fetch_playlist(self.mass, ctrp256_urls[0], raise_on_hls=False)
# Apple returns a HLS (substream) playlist but instead of chunks,
# each item is just the whole file. So we simply grab the first playlist item.
playlist_item = playlist_items[0]
# path is relative, stitch it together
base_path = playlist_url.rsplit("/", 1)[0]
track_url = base_path + "/" + playlist_items[0].path
key = playlist_item.key
return (track_url, key)

Expand Down

0 comments on commit 0f8c8fc

Please sign in to comment.