diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 35854ac5e..7aed50f41 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -310,12 +310,12 @@ label="Enable Smart Fades", options=[ ConfigValueOption("Disabled", "disabled"), - ConfigValueOption("Smart Fades", "smart_fades"), + ConfigValueOption("Smart Crossfade", "smart_crossfade"), ConfigValueOption("Standard Crossfade", "standard_crossfade"), ], default_value="disabled", description="Select the crossfade mode to use when transitioning between tracks.\n\n" - "- 'Smart Fades': Uses beat matching and DJ-like EQ filters to create smooth transitions" + "- 'Smart Crossfade': Uses beat matching and EQ filters to create smooth transitions" " between tracks.\n" "- 'Standard Crossfade': Regular crossfade that crossfades the last/first x-seconds of a " "track.", diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index fc3d81db3..ec3bef6be 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -986,7 +986,7 @@ async def get_queue_flow_stream( # calculate crossfade buffer size crossfade_buffer_duration = ( SMART_CROSSFADE_DURATION - if smart_fades_mode == SmartFadesMode.SMART_FADES + if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE else standard_crossfade_duration ) crossfade_buffer_duration = min( @@ -1362,7 +1362,7 @@ async def get_queue_item_stream_with_smartfade( self, queue_item: QueueItem, pcm_format: AudioFormat, - smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_FADES, + smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE, standard_crossfade_duration: int = 10, ) -> AsyncGenerator[bytes, None]: """Get the audio stream for a single queue item with (smart) crossfade to the next item.""" @@ -1397,7 +1397,7 @@ async def get_queue_item_stream_with_smartfade( # calculate crossfade buffer size crossfade_buffer_duration = ( SMART_CROSSFADE_DURATION - if smart_fades_mode == SmartFadesMode.SMART_FADES + if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE else standard_crossfade_duration ) crossfade_buffer_duration = min( diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 0d42d357b..39ae376d1 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -80,105 +80,6 @@ STREAMDETAILS_EXPIRATION: Final[int] = 60 * 15 # 15 minutes -async def crossfade_pcm_parts( - fade_in_part: bytes, - fade_out_part: bytes, - pcm_format: AudioFormat, - fade_out_pcm_format: AudioFormat | None = None, -) -> bytes: - """Crossfade two chunks of pcm/raw audio using ffmpeg.""" - if fade_out_pcm_format is None: - fade_out_pcm_format = pcm_format - - # calculate the fade_length from the smallest chunk - fade_length = min( - len(fade_in_part) / pcm_format.pcm_sample_size, - len(fade_out_part) / fade_out_pcm_format.pcm_sample_size, - ) - # write the fade_out_part to a temporary file - fadeout_filename = f"/tmp/{shortuuid.random(20)}.pcm" # noqa: S108 - async with aiofiles.open(fadeout_filename, "wb") as outfile: - await outfile.write(fade_out_part) - - args = [ - # generic args - "ffmpeg", - "-hide_banner", - "-loglevel", - "quiet", - # fadeout part (as file) - "-acodec", - fade_out_pcm_format.content_type.name.lower(), - "-ac", - str(fade_out_pcm_format.channels), - "-ar", - str(fade_out_pcm_format.sample_rate), - "-channel_layout", - "mono" if fade_out_pcm_format.channels == 1 else "stereo", - "-f", - fade_out_pcm_format.content_type.value, - "-i", - fadeout_filename, - # fade_in part (stdin) - "-acodec", - pcm_format.content_type.name.lower(), - "-ac", - str(pcm_format.channels), - "-channel_layout", - "mono" if pcm_format.channels == 1 else "stereo", - "-ar", - str(pcm_format.sample_rate), - "-f", - pcm_format.content_type.value, - "-i", - "-", - # filter args - "-filter_complex", - f"[0][1]acrossfade=d={fade_length}", - # output args - "-acodec", - pcm_format.content_type.name.lower(), - "-ac", - str(pcm_format.channels), - "-channel_layout", - "mono" if pcm_format.channels == 1 else "stereo", - "-ar", - str(pcm_format.sample_rate), - "-f", - pcm_format.content_type.value, - "-", - ] - _, crossfaded_audio, _ = await communicate(args, fade_in_part) - await remove_file(fadeout_filename) - if crossfaded_audio: - LOGGER.log( - VERBOSE_LOG_LEVEL, - "crossfaded 2 pcm chunks. fade_in_part: %s - " - "fade_out_part: %s - fade_length: %s seconds", - len(fade_in_part), - len(fade_out_part), - fade_length, - ) - return crossfaded_audio - # no crossfade_data, return original data instead - LOGGER.debug( - "crossfade of pcm chunks failed: not enough data? - fade_in_part: %s - fade_out_part: %s", - len(fade_in_part), - len(fade_out_part), - ) - if fade_out_pcm_format.sample_rate != pcm_format.sample_rate: - # Edge case: the sample rates are different, - # we need to resample the fade_out part to the same sample rate as the fade_in part - async with FFMpeg( - audio_input="-", - input_format=fade_out_pcm_format, - output_format=pcm_format, - ) as ffmpeg: - res = await ffmpeg.communicate(fade_out_part) - return res[0] + fade_in_part - return fade_out_part + fade_in_part - - async def strip_silence( mass: MusicAssistant, # noqa: ARG001 audio_data: bytes, diff --git a/music_assistant/helpers/smart_fades.py b/music_assistant/helpers/smart_fades.py index 32b76dfa1..0c2973102 100644 --- a/music_assistant/helpers/smart_fades.py +++ b/music_assistant/helpers/smart_fades.py @@ -1,14 +1,11 @@ """Smart Fades - Object-oriented implementation with intelligent fades and adaptive filtering.""" -# TODO: Figure out if we can achieve shared buffer with StreamController on full -# current and next track for more EQ options. -# TODO: Refactor the Analyzer into a metadata controller after we have split the controllers -# TODO: Refactor the Mixer into a stream controller after we have split the controllers from __future__ import annotations import asyncio import logging import time +from abc import ABC, abstractmethod from typing import TYPE_CHECKING import aiofiles @@ -18,7 +15,7 @@ import shortuuid from music_assistant.constants import VERBOSE_LOG_LEVEL -from music_assistant.helpers.audio import crossfade_pcm_parts, strip_silence +from music_assistant.helpers.audio import strip_silence from music_assistant.helpers.process import communicate from music_assistant.helpers.util import remove_file from music_assistant.models.smart_fades import ( @@ -35,8 +32,6 @@ SMART_CROSSFADE_DURATION = 45 ANALYSIS_FPS = 100 -# Only apply time stretching if BPM difference is < this % -TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD = 5.0 class SmartFadesAnalyzer: @@ -69,8 +64,6 @@ async def analyze( fragment_duration, len(audio_data), ) - # Perform beat analysis - # Convert PCM bytes to numpy array and then to mono for analysis audio_array = np.frombuffer(audio_data, dtype=np.float32) if pcm_format.channels > 1: @@ -232,123 +225,258 @@ async def _analyze_track_beats( return None -class SmartFadesMixer: - """Smart fades mixer class that mixes tracks based on analysis data.""" +############################# +# SMART FADES EQ LOGIC +############################# - def __init__(self, mass: MusicAssistant) -> None: - """Initialize smart fades mixer.""" - self.mass = mass - self.logger = logging.getLogger(__name__) - # TODO: Refactor into stream (or metadata) controller after we have split the controllers - self.analyzer = SmartFadesAnalyzer(mass) - async def mix( +class Filter(ABC): + """Abstract base class for audio filters.""" + + output_fadeout_label: str + output_fadein_label: str + + @abstractmethod + def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]: + """Apply the filter and return the FFmpeg filter strings.""" + + +class TimeStretchFilter(Filter): + """Filter that applies time stretching to match BPM using rubberband.""" + + output_fadeout_label: str = "fadeout_stretched" + output_fadein_label: str = "fadein_unchanged" + + def __init__( self, - fade_in_part: bytes, - fade_out_part: bytes, - fade_in_streamdetails: StreamDetails, - fade_out_streamdetails: StreamDetails, - pcm_format: AudioFormat, - standard_crossfade_duration: int = 10, - mode: SmartFadesMode = SmartFadesMode.SMART_FADES, - ) -> bytes: - """Apply crossfade with internal state management and smart/standard fallback logic.""" - if mode == SmartFadesMode.DISABLED: - # No crossfade, just concatenate - # Note that this should not happen since we check this before calling mix() - # but just to be sure... - return fade_out_part + fade_in_part + stretch_ratio: float, + ): + """Initialize time stretch filter.""" + self.stretch_ratio = stretch_ratio - # strip silence from end of audio of fade_out_part - fade_out_part = await strip_silence( - self.mass, - fade_out_part, - pcm_format=pcm_format, - reverse=True, - ) - # strip silence from begin of audio of fade_in_part - fade_in_part = await strip_silence( - self.mass, - fade_in_part, - pcm_format=pcm_format, - reverse=False, - ) - if mode == SmartFadesMode.STANDARD_CROSSFADE: - # crossfade with standard crossfade - return await self._default_crossfade( - fade_in_part, - fade_out_part, - pcm_format, - standard_crossfade_duration, - ) - # Attempt smart crossfade with analysis data - fade_out_analysis: SmartFadesAnalysis | None - if stored_analysis := await self.mass.music.get_smart_fades_analysis( - fade_out_streamdetails.item_id, - fade_out_streamdetails.provider, - SmartFadesAnalysisFragment.OUTRO, - ): - fade_out_analysis = stored_analysis + def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]: + """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM.""" + return [ + f"{input_fadeout_label}rubberband=tempo={self.stretch_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality" + f"[{self.output_fadeout_label}]", + f"{input_fadein_label}anull[{self.output_fadein_label}]", # codespell:ignore anull + ] + + def __repr__(self) -> str: + """Return string representation of TimeStretchFilter.""" + return f"TimeStretch(ratio={self.stretch_ratio:.2f})" + + +class TrimFilter(Filter): + """Filter that trims incoming track to align with downbeats.""" + + output_fadeout_label: str = "fadeout_beatalign" + output_fadein_label: str = "fadein_beatalign" + + def __init__(self, fadein_start_pos: float): + """Initialize beat align filter. + + Args: + fadein_start_pos: Position in seconds to trim the incoming track to + """ + self.fadein_start_pos = fadein_start_pos + + def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]: + """Trim the incoming track to align with downbeats.""" + return [ + f"{input_fadeout_label}anull[{self.output_fadeout_label}]", # codespell:ignore anull + f"{input_fadein_label}atrim=start={self.fadein_start_pos},asetpts=PTS-STARTPTS[{self.output_fadein_label}]", + ] + + def __repr__(self) -> str: + """Return string representation of TrimFilter.""" + return f"Trim(trim={self.fadein_start_pos:.2f}s)" + + +class FrequencySweepFilter(Filter): + """Filter that creates frequency sweep effects (lowpass/highpass transitions).""" + + output_fadeout_label: str = "frequency_sweep" + output_fadein_label: str = "frequency_sweep" + + def __init__( + self, + sweep_type: str, + target_freq: int, + duration: float, + start_time: float, + sweep_direction: str, + poles: int, + curve_type: str, + stream_type: str = "fadeout", + ): + """Initialize frequency sweep filter. + + Args: + sweep_type: 'lowpass' or 'highpass' + target_freq: Target frequency for the filter + duration: Duration of the sweep in seconds + start_time: When to start the sweep + sweep_direction: 'fade_in' (unfiltered->filtered) or 'fade_out' (filtered->unfiltered) + poles: Number of poles for the filter + curve_type: 'linear', 'exponential', or 'logarithmic' + stream_type: 'fadeout' or 'fadein' - which stream to process + """ + self.sweep_type = sweep_type + self.target_freq = target_freq + self.duration = duration + self.start_time = start_time + self.sweep_direction = sweep_direction + self.poles = poles + self.curve_type = curve_type + self.stream_type = stream_type + + # Set output labels based on stream type + if stream_type == "fadeout": + self.output_fadeout_label = f"fadeout_{sweep_type}" + self.output_fadein_label = "fadein_passthrough" else: - fade_out_analysis = await self.analyzer.analyze( - fade_out_streamdetails.item_id, - fade_out_streamdetails.provider, - SmartFadesAnalysisFragment.OUTRO, - fade_out_part, - pcm_format, - ) + self.output_fadeout_label = "fadeout_passthrough" + self.output_fadein_label = f"fadein_{sweep_type}" - fade_in_analysis: SmartFadesAnalysis | None - if stored_analysis := await self.mass.music.get_smart_fades_analysis( - fade_in_streamdetails.item_id, - fade_in_streamdetails.provider, - SmartFadesAnalysisFragment.INTRO, - ): - fade_in_analysis = stored_analysis + def _generate_volume_expr(self, start: float, dur: float, direction: str, curve: str) -> str: + t_expr = f"t-{start}" # Time relative to start + norm_t = f"min(max({t_expr},0),{dur})/{dur}" # Normalized 0-1 + + if curve == "exponential": + # Exponential curve for smoother transitions + if direction == "up": + return f"'pow({norm_t},2)':eval=frame" + else: + return f"'1-pow({norm_t},2)':eval=frame" + elif curve == "logarithmic": + # Logarithmic curve for more aggressive initial change + if direction == "up": + return f"'sqrt({norm_t})':eval=frame" + else: + return f"'1-sqrt({norm_t})':eval=frame" + elif direction == "up": + return f"'{norm_t}':eval=frame" else: - fade_in_analysis = await self.analyzer.analyze( - fade_in_streamdetails.item_id, - fade_in_streamdetails.provider, - SmartFadesAnalysisFragment.INTRO, - fade_in_part, - pcm_format, - ) - if ( - fade_out_analysis - and fade_in_analysis - and fade_out_analysis.confidence > 0.3 - and fade_in_analysis.confidence > 0.3 - and mode == SmartFadesMode.SMART_FADES - ): - try: - return await self._apply_smart_crossfade( - fade_out_analysis, - fade_in_analysis, - fade_out_part, - fade_in_part, - pcm_format, - ) - except Exception as e: - self.logger.warning( - "Smart crossfade failed: %s, falling back to standard crossfade", e - ) + return f"'1-{norm_t}':eval=frame" - return await self._default_crossfade( - fade_in_part, - fade_out_part, - pcm_format, - standard_crossfade_duration, + def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]: + """Generate FFmpeg filters for frequency sweep effect.""" + # Select the correct input based on stream type + if self.stream_type == "fadeout": + input_label = input_fadeout_label + output_label = self.output_fadeout_label + passthrough_label = self.output_fadein_label + passthrough_input = input_fadein_label + else: + input_label = input_fadein_label + output_label = self.output_fadein_label + passthrough_label = self.output_fadeout_label + passthrough_input = input_fadeout_label + + orig_label = f"{output_label}_orig" + filter_label = f"{output_label}_to{self.sweep_type[:2]}" + filtered_label = f"{output_label}_filtered" + orig_faded_label = f"{output_label}_orig_faded" + filtered_faded_label = f"{output_label}_filtered_faded" + + # Determine volume ramp directions based on sweep direction + if self.sweep_direction == "fade_in": + # Fade from dry to wet (unfiltered to filtered) + orig_direction = "down" + filter_direction = "up" + else: # fade_out + # Fade from wet to dry (filtered to unfiltered) + orig_direction = "up" + filter_direction = "down" + + # Build filter chain + orig_volume_expr = self._generate_volume_expr( + self.start_time, self.duration, orig_direction, self.curve_type + ) + filtered_volume_expr = self._generate_volume_expr( + self.start_time, self.duration, filter_direction, self.curve_type ) - async def _apply_smart_crossfade( + return [ + # Pass through the other stream unchanged + f"{passthrough_input}anull[{passthrough_label}]", # codespell:ignore anull + # Split input into two paths + f"{input_label}asplit=2[{orig_label}][{filter_label}]", + # Apply frequency filter to one path + f"[{filter_label}]{self.sweep_type}=f={self.target_freq}:poles={self.poles}[{filtered_label}]", + # Apply time-varying volume to original path + f"[{orig_label}]volume={orig_volume_expr}[{orig_faded_label}]", + # Apply time-varying volume to filtered path + f"[{filtered_label}]volume={filtered_volume_expr}[{filtered_faded_label}]", + # Mix the two paths together + f"[{orig_faded_label}][{filtered_faded_label}]amix=inputs=2:duration=longest:normalize=0[{output_label}]", + ] + + def __repr__(self) -> str: + """Return string representation of FrequencySweepFilter.""" + return f"FreqSweep({self.sweep_type}@{self.target_freq}Hz)" + + +class CrossfadeFilter(Filter): + """Filter that applies the final crossfade between fadeout and fadein streams.""" + + output_fadeout_label: str = "crossfade" + output_fadein_label: str = "crossfade" + + def __init__(self, crossfade_duration: float): + """Initialize crossfade filter.""" + self.crossfade_duration = crossfade_duration + + def apply(self, input_fadein_label: str, input_fadeout_label: str) -> list[str]: + """Apply the acrossfade filter.""" + return [f"{input_fadeout_label}{input_fadein_label}acrossfade=d={self.crossfade_duration}"] + + def __repr__(self) -> str: + """Return string representation of CrossfadeFilter.""" + return f"Crossfade(d={self.crossfade_duration:.1f}s)" + + +class SmartFade(ABC): + """Abstract base class for Smart Fades.""" + + filters: list[Filter] + + def __init__(self) -> None: + """Initialize SmartFade base class.""" + self.logger = logging.getLogger(__name__) + self.filters = [] + + @abstractmethod + def _build(self) -> None: + """Build the smart fades filter chain.""" + ... + + def _get_ffmpeg_filters( + self, + input_fadein_label: str = "[1]", + input_fadeout_label: str = "[0]", + ) -> list[str]: + """Get FFmpeg filters for smart fades.""" + if not self.filters: + self._build() + filters = [] + _cur_fadein_label = input_fadein_label + _cur_fadeout_label = input_fadeout_label + for audio_filter in self.filters: + filter_strings = audio_filter.apply(_cur_fadein_label, _cur_fadeout_label) + filters.extend(filter_strings) + _cur_fadein_label = f"[{audio_filter.output_fadein_label}]" + _cur_fadeout_label = f"[{audio_filter.output_fadeout_label}]" + return filters + + async def apply( self, - fade_out_analysis: SmartFadesAnalysis, - fade_in_analysis: SmartFadesAnalysis, fade_out_part: bytes, fade_in_part: bytes, pcm_format: AudioFormat, ) -> bytes: - """Apply smart crossfade with beat-perfect timing and adaptive filtering.""" + """Apply the smart fade to the given PCM audio parts.""" # Write the fade_out_part to a temporary file fadeout_filename = f"/tmp/{shortuuid.random(20)}.pcm" # noqa: S108 async with aiofiles.open(fadeout_filename, "wb") as outfile: @@ -385,10 +513,10 @@ async def _apply_smart_crossfade( "-i", "-", ] - - smart_fade_filters = self._create_enhanced_smart_fade_filters( - fade_out_analysis, - fade_in_analysis, + smart_fade_filters = self._get_ffmpeg_filters() + self.logger.debug( + "Applying smartfade: %s", + self, ) args.extend( [ @@ -408,7 +536,7 @@ async def _apply_smart_crossfade( "-", ] ) - + self.logger.debug("FFmpeg smartfade args: %s", " ".join(args)) self.logger.log(VERBOSE_LOG_LEVEL, "FFmpeg command args: %s", " ".join(args)) # Execute the enhanced smart fade with full buffer @@ -421,113 +549,147 @@ async def _apply_smart_crossfade( stderr_msg = stderr.decode() if stderr else "(no stderr output)" raise RuntimeError(f"Smart crossfade failed. FFmpeg stderr: {stderr_msg}") - # SMART FADE HELPER METHODS - def _create_enhanced_smart_fade_filters( - self, - fade_out_analysis: SmartFadesAnalysis, - fade_in_analysis: SmartFadesAnalysis, - ) -> list[str]: - """Create smart fade filters with perfect timing and adaptive filtering.""" - # Calculate optimal crossfade bars that fit in available buffer - crossfade_bars = self._calculate_optimal_crossfade_bars(fade_out_analysis, fade_in_analysis) + def __repr__(self) -> str: + """Return string representation of SmartFade showing the filter chain.""" + if not self.filters: + return f"<{self.__class__.__name__}: 0 filters>" - # Calculate beat positions for the selected bar count - fadeout_start_pos, fadein_start_pos = self._calculate_optimal_fade_timing( - fade_out_analysis, fade_in_analysis, crossfade_bars - ) + chain = " → ".join(repr(f) for f in self.filters) + return f"<{self.__class__.__name__}: {len(self.filters)} filters> {chain}" - # Log the final selected timing - if fadeout_start_pos is not None and fadein_start_pos is not None: - self.logger.debug( - "Beat timing selected: fadeout=%.2fs, fadein=%.2fs (%d bars)", - fadeout_start_pos, - fadein_start_pos, - crossfade_bars, - ) - filters: list[str] = [] +class SmartCrossFade(SmartFade): + """Smart fades class that implements a Smart Fade mode.""" - # Calculate initial crossfade duration (may be adjusted later for downbeat alignment) - initial_crossfade_duration = self._calculate_crossfade_duration( - crossfade_bars=crossfade_bars, - fade_in_analysis=fade_in_analysis, - ) + # Only apply time stretching if BPM difference is < this % + time_stretch_bpm_percentage_threshold: float = 5.0 + + def __init__( + self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis + ) -> None: + """Initialize SmartFades with analysis data. + + Args: + fade_out_analysis: Analysis data for the outgoing track + fade_in_analysis: Analysis data for the incoming track + logger: Optional logger for debug output + """ + self.fade_out_analysis = fade_out_analysis + self.fade_in_analysis = fade_in_analysis + super().__init__() + + def _build(self) -> None: + """Build the smart fades filter chain.""" + # Calculate tempo factor for time stretching + bpm_ratio = self.fade_in_analysis.bpm / self.fade_out_analysis.bpm + bpm_diff_percent = abs(1.0 - bpm_ratio) * 100 - # Create time stretch filters - needs to know crossfade duration to complete - # tempo ramping before the crossfade starts - time_stretch_filters, tempo_factor = self._create_time_stretch_filters( - fade_out_analysis=fade_out_analysis, - fade_in_analysis=fade_in_analysis, - crossfade_bars=crossfade_bars, - crossfade_duration=initial_crossfade_duration, + # Extrapolate downbeats for better bar calculation + self.extrapolated_fadeout_downbeats = extrapolate_downbeats( + self.fade_out_analysis.downbeats, + tempo_factor=1.0, + bpm=self.fade_out_analysis.bpm, ) - filters.extend(time_stretch_filters) - crossfade_duration = initial_crossfade_duration + # Calculate optimal crossfade bars that fit in available buffer + crossfade_bars = self._calculate_optimal_crossfade_bars() - # Check if we would have enough audio after beat alignment for the crossfade + # Calculate beat positions for the selected bar count + fadein_start_pos = self._calculate_optimal_fade_timing(crossfade_bars) + + # Calculate initial crossfade duration (may be adjusted later for downbeat alignment) + crossfade_duration = self._calculate_crossfade_duration(crossfade_bars=crossfade_bars) + + # Add time stretch filter if needed if ( - fadein_start_pos is not None - and fadein_start_pos + crossfade_duration > SMART_CROSSFADE_DURATION + 0.1 < bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold + and crossfade_bars > 4 ): + self.filters.append(TimeStretchFilter(stretch_ratio=bpm_ratio)) + # Re-extrapolate downbeats with actual tempo factor for time-stretched audio + self.extrapolated_fadeout_downbeats = extrapolate_downbeats( + self.fade_out_analysis.downbeats, + tempo_factor=bpm_ratio, + bpm=self.fade_out_analysis.bpm, + ) + + # Check if we would have enough audio after beat alignment for the crossfade + if fadein_start_pos and fadein_start_pos + crossfade_duration <= SMART_CROSSFADE_DURATION: + self.filters.append(TrimFilter(fadein_start_pos=fadein_start_pos)) + else: self.logger.debug( "Skipping beat alignment: not enough audio after trim (%.1fs + %.1fs > %.1fs)", fadein_start_pos, crossfade_duration, SMART_CROSSFADE_DURATION, ) - # Skip beat alignment - fadein_start_pos = None # Adjust crossfade duration to align with outgoing track's downbeats - # This prevents echo-ey sounds when both tracks have kicks during the crossfade crossfade_duration = self._adjust_crossfade_to_downbeats( - fade_out_analysis=fade_out_analysis, crossfade_duration=crossfade_duration, fadein_start_pos=fadein_start_pos, - tempo_factor=tempo_factor, ) - beat_align_filters = self._trim_incoming_track_to_downbeat( - fadein_start_pos=fadein_start_pos, - fadeout_input_label="[fadeout_stretched]", - fadein_input_label="[1]", - ) - filters.extend(beat_align_filters) + # 90 BPM -> 1500Hz, 140 BPM -> 2500Hz + avg_bpm = (self.fade_out_analysis.bpm + self.fade_in_analysis.bpm) / 2 + crossover_freq = int(np.clip(1500 + (avg_bpm - 90) * 20, 1500, 2500)) - self.logger.debug( - "Smart fade: out_bpm=%.1f, in_bpm=%.1f, %d bars, crossfade: %.2fs%s", - fade_out_analysis.bpm, - fade_in_analysis.bpm, - crossfade_bars, - crossfade_duration, - ", beat-aligned" if fadein_start_pos else "", - ) - frequency_filters = self._apply_eq_filters( - fade_out_analysis=fade_out_analysis, - fade_in_analysis=fade_in_analysis, - fade_out_label="[fadeout_beatalign]", - fade_in_label="[fadein_beatalign]", - crossfade_duration=crossfade_duration, - crossfade_bars=crossfade_bars, + # Adjust for BPM mismatch + if abs(bpm_ratio - 1.0) > 0.3: + crossover_freq = int(crossover_freq * 0.85) + + # For shorter fades, use exp/exp curves to avoid abruptness + if crossfade_bars < 8: + fadeout_curve = "exponential" + fadein_curve = "exponential" + # For long fades, use log/linear curves + else: + # Use logarithmic curve to give the next track more space + fadeout_curve = "logarithmic" + # Use linear curve for transition, predictable and not too abrupt + fadein_curve = "linear" + + # Create lowpass filter on the outgoing track (unfiltered → low-pass) + # Extended lowpass effect to gradually remove bass frequencies + fadeout_eq_duration = min(max(crossfade_duration * 2.5, 8.0), SMART_CROSSFADE_DURATION) + # The crossfade always happens at the END of the buffer + fadeout_eq_start = max(0, SMART_CROSSFADE_DURATION - fadeout_eq_duration) + fadeout_sweep = FrequencySweepFilter( + sweep_type="lowpass", + target_freq=crossover_freq, + duration=fadeout_eq_duration, + start_time=fadeout_eq_start, + sweep_direction="fade_in", + poles=1, + curve_type=fadeout_curve, + stream_type="fadeout", ) - filters.extend(frequency_filters) + self.filters.append(fadeout_sweep) - # Apply linear crossfade for now since we already use EQ sweeps for smoothness - filters.append(f"[fadeout_eq][fadein_eq]acrossfade=d={crossfade_duration}") + # Create high pass filter on the incoming track (high-pass → unfiltered) + # Quicker highpass removal to avoid lingering vocals after crossfade + fadein_eq_duration = crossfade_duration / 1.5 + fadein_sweep = FrequencySweepFilter( + sweep_type="highpass", + target_freq=crossover_freq, + duration=fadein_eq_duration, + start_time=0, + sweep_direction="fade_out", + poles=1, + curve_type=fadein_curve, + stream_type="fadein", + ) + self.filters.append(fadein_sweep) - return filters + # Add final crossfade filter + crossfade_filter = CrossfadeFilter(crossfade_duration=crossfade_duration) + self.filters.append(crossfade_filter) - def _calculate_crossfade_duration( - self, - crossfade_bars: int, - fade_in_analysis: SmartFadesAnalysis, - ) -> float: + def _calculate_crossfade_duration(self, crossfade_bars: int) -> float: """Calculate final crossfade duration based on musical bars and BPM.""" # Calculate crossfade duration based on incoming track's BPM - # This ensures a musically consistent crossfade length regardless of beat positions beats_per_bar = 4 - seconds_per_beat = 60.0 / fade_in_analysis.bpm + seconds_per_beat = 60.0 / self.fade_in_analysis.bpm musical_duration = crossfade_bars * beats_per_bar * seconds_per_beat # Apply buffer constraint @@ -543,117 +705,106 @@ def _calculate_crossfade_duration( return actual_duration - def _extrapolate_downbeats( - self, - downbeats: npt.NDArray[np.float64], - tempo_factor: float, - buffer_size: float = SMART_CROSSFADE_DURATION, - ) -> npt.NDArray[np.float64]: - """Extrapolate downbeats based on actual intervals when detection is incomplete. - - This is needed when we want to perform beat alignment in an 'atmospheric' outro - that does not have any detected downbeats. - """ - if len(downbeats) < 3: - # Need at least 3 downbeats to reliably calculate interval - return downbeats / tempo_factor - - # Adjust detected downbeats for time stretching first - adjusted_downbeats = downbeats / tempo_factor - last_downbeat = adjusted_downbeats[-1] - - # If the last downbeat is close to the buffer end, no extrapolation needed - if last_downbeat >= buffer_size - 5: - return adjusted_downbeats + def _calculate_optimal_crossfade_bars(self) -> int: + """Calculate optimal crossfade bars that fit in available buffer.""" + bpm_in = self.fade_in_analysis.bpm + bpm_out = self.fade_out_analysis.bpm + bpm_diff_percent = abs(1.0 - bpm_in / bpm_out) * 100 - # Calculate intervals from ORIGINAL downbeats (before time stretching) - intervals = np.diff(downbeats) - median_interval = float(np.median(intervals)) - std_interval = float(np.std(intervals)) + # Calculate ideal bars based on BPM compatibility + ideal_bars = 10 if bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold else 6 - # Only extrapolate if intervals are consistent (low standard deviation) - if std_interval > 0.2: - self.logger.debug( - "Downbeat intervals too inconsistent (std=%.3fs) for extrapolation", - std_interval, - ) - return adjusted_downbeats + # Reduce bars until it fits in the fadein buffer + for bars in [ideal_bars, 8, 6, 4, 2, 1]: + if bars > ideal_bars: + continue - # Adjust the interval for time stretching - # When slowing down (tempo_factor < 1.0), intervals get longer - adjusted_interval = median_interval / tempo_factor + fadein_start_pos = self._calculate_optimal_fade_timing(bars) + if fadein_start_pos is None: + continue - # Extrapolate forward from last adjusted downbeat using adjusted interval - extrapolated = [] - current_pos = last_downbeat + adjusted_interval - max_extrapolation_distance = 25.0 # Don't extrapolate more than 25s + # Calculate what the duration would be + test_duration = self._calculate_crossfade_duration(crossfade_bars=bars) - while ( - current_pos < buffer_size - and (current_pos - last_downbeat) <= max_extrapolation_distance - ): - extrapolated.append(current_pos) - current_pos += adjusted_interval + # Check if it fits in fadein buffer + fadein_buffer = SMART_CROSSFADE_DURATION - fadein_start_pos + if test_duration <= fadein_buffer: + if bars < ideal_bars: + self.logger.debug( + "Reduced crossfade from %d to %d bars (fadein buffer=%.1fs, needed=%.1fs)", + ideal_bars, + bars, + fadein_buffer, + test_duration, + ) + return bars - if extrapolated: - self.logger.debug( - "Extrapolated %d downbeats (adjusted_interval=%.3fs, original=%.3fs) " - "from %.2fs to %.2fs", - len(extrapolated), - adjusted_interval, - median_interval, - last_downbeat, - extrapolated[-1], - ) - # Combine adjusted detected downbeats and extrapolated downbeats - return np.concatenate([adjusted_downbeats, np.array(extrapolated)]) + # Fall back to 1 bar if nothing else fits + return 1 - return adjusted_downbeats + def _calculate_optimal_fade_timing(self, crossfade_bars: int) -> float | None: + """Calculate beat positions for alignment.""" + beats_per_bar = 4 + + def calculate_beat_positions( + fade_out_beats: npt.NDArray[np.float64], + fade_in_beats: npt.NDArray[np.float64], + num_beats: int, + ) -> float | None: + """Calculate start positions from beat arrays.""" + if len(fade_out_beats) < num_beats or len(fade_in_beats) < num_beats: + return None + + fade_in_slice = fade_in_beats[:num_beats] + return float(fade_in_slice[0]) + + # Try downbeats first for most musical timing + downbeat_positions = calculate_beat_positions( + self.extrapolated_fadeout_downbeats, self.fade_in_analysis.downbeats, crossfade_bars + ) + if downbeat_positions: + return downbeat_positions + + # Try regular beats if downbeats insufficient + required_beats = crossfade_bars * beats_per_bar + beat_positions = calculate_beat_positions( + self.fade_out_analysis.beats, self.fade_in_analysis.beats, required_beats + ) + if beat_positions: + return beat_positions + + # Fallback: No beat alignment possible + self.logger.debug("No beat alignment possible (insufficient beats)") + return None def _adjust_crossfade_to_downbeats( self, - fade_out_analysis: SmartFadesAnalysis, crossfade_duration: float, fadein_start_pos: float | None, - tempo_factor: float, ) -> float: - """Adjust crossfade duration to align with outgoing track's downbeats. - - This ensures the crossfade starts on a downbeat of the outgoing track, - preventing echo-ey sounds when both tracks have kicks during the crossfade. - - The downbeat positions are adjusted for time stretching - when tempo_factor < 1.0 - (slowing down), beats take longer to reach their position in the stretched audio. - """ + """Adjust crossfade duration to align with outgoing track's downbeats.""" # If we don't have downbeats or beat alignment is disabled, return original duration - if len(fade_out_analysis.downbeats) == 0 or fadein_start_pos is None: + if len(self.extrapolated_fadeout_downbeats) == 0 or fadein_start_pos is None: return crossfade_duration - # Extrapolate downbeats if needed (e.g., when beat detection is incomplete) - # This returns downbeats already adjusted for time stretching - adjusted_downbeats = self._extrapolate_downbeats( - fade_out_analysis.downbeats, tempo_factor=tempo_factor - ) - # Calculate where the crossfade would start in the buffer ideal_start_pos = SMART_CROSSFADE_DURATION - crossfade_duration - # Debug: Show all downbeats and the ideal position + # Debug logging self.logger.debug( "Downbeat adjustment - ideal_start=%.2fs (buffer=%.1fs - crossfade=%.2fs), " - "fadein_start=%.2fs, tempo_factor=%.4f", + "fadein_start=%.2fs", ideal_start_pos, SMART_CROSSFADE_DURATION, crossfade_duration, fadein_start_pos, - tempo_factor, ) # Find the closest downbeats (earlier and later) earlier_downbeat = None later_downbeat = None - for downbeat in adjusted_downbeats: + for downbeat in self.extrapolated_fadeout_downbeats: if downbeat <= ideal_start_pos: earlier_downbeat = downbeat elif downbeat > ideal_start_pos and later_downbeat is None: @@ -663,7 +814,6 @@ def _adjust_crossfade_to_downbeats( # Try earlier downbeat first (longer crossfade) if earlier_downbeat is not None: adjusted_duration = float(SMART_CROSSFADE_DURATION - earlier_downbeat) - # Check if this fits in the buffer if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION: if abs(adjusted_duration - crossfade_duration) > 0.1: self.logger.debug( @@ -678,7 +828,6 @@ def _adjust_crossfade_to_downbeats( # Try later downbeat (shorter crossfade) if later_downbeat is not None: adjusted_duration = float(SMART_CROSSFADE_DURATION - later_downbeat) - # Check if this fits in the buffer if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION: if abs(adjusted_duration - crossfade_duration) > 0.1: self.logger.debug( @@ -697,351 +846,259 @@ def _adjust_crossfade_to_downbeats( ) return crossfade_duration - def _calculate_optimal_crossfade_bars( - self, fade_out_analysis: SmartFadesAnalysis, fade_in_analysis: SmartFadesAnalysis - ) -> int: - """Calculate optimal crossfade bars that fit in available buffer.""" - bpm_in = fade_in_analysis.bpm - bpm_out = fade_out_analysis.bpm - bpm_diff_percent = abs(1.0 - bpm_in / bpm_out) * 100 - - # Calculate ideal bars based on BPM compatibility. We link this to time stretching - # so we avoid extreme tempo changes over short fades. - ideal_bars = 10 if bpm_diff_percent <= TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD else 6 - - # We could encounter songs that have a long athmospheric intro without any downbeats - # In those cases, we need to reduce the bars until it fits in the fadein buffer. - for bars in [ideal_bars, 8, 6, 4, 2, 1]: - if bars > ideal_bars: - continue # Skip bars longer than optimal - - fadeout_start_pos, fadein_start_pos = self._calculate_optimal_fade_timing( - fade_out_analysis, fade_in_analysis, bars - ) - if fadeout_start_pos is None or fadein_start_pos is None: - continue - - # Calculate what the duration would be - test_duration = self._calculate_crossfade_duration( - crossfade_bars=bars, - fade_in_analysis=fade_in_analysis, - ) - # Check if it fits in fadein buffer - fadein_buffer = SMART_CROSSFADE_DURATION - fadein_start_pos - if test_duration <= fadein_buffer: - if bars < ideal_bars: - self.logger.debug( - "Reduced crossfade from %d to %d bars (fadein buffer=%.1fs, needed=%.1fs)", - ideal_bars, - bars, - fadein_buffer, - test_duration, - ) - return bars +class StandardCrossFade(SmartFade): + """Standard crossfade class that implements a standard crossfade mode.""" - # Fall back to 1 bar if nothing else fits - return 1 + def __init__(self, crossfade_duration: float = 10.0) -> None: + """Initialize StandardCrossFade with crossfade duration.""" + self.crossfade_duration = crossfade_duration + super().__init__() - def _calculate_optimal_fade_timing( - self, - fade_out_analysis: SmartFadesAnalysis, - fade_in_analysis: SmartFadesAnalysis, - crossfade_bars: int, - ) -> tuple[float | None, float | None]: - """Calculate beat positions for alignment.""" - beats_per_bar = 4 - - # Helper function to calculate beat positions from beat arrays - def calculate_beat_positions( - fade_out_beats: npt.NDArray[np.float64], - fade_in_beats: npt.NDArray[np.float64], - num_beats: int, - ) -> tuple[float, float] | None: - """Calculate start positions from beat arrays with phantom downbeat support.""" - if len(fade_out_beats) < num_beats or len(fade_in_beats) < num_beats: - return None - - fade_out_slice = fade_out_beats[-num_beats:] - - # For fadein, find the earliest downbeat that fits in buffer - fade_in_slice = fade_in_beats[:num_beats] - fadein_start_pos = fade_in_slice[0] - - fadeout_start_pos = fade_out_slice[0] - return fadeout_start_pos, fadein_start_pos + def _build(self) -> None: + """Build the standard crossfade filter chain.""" + self.filters = [ + CrossfadeFilter(crossfade_duration=self.crossfade_duration), + ] - # Try downbeats first for most musical timing - downbeat_positions = calculate_beat_positions( - fade_out_analysis.downbeats, fade_in_analysis.downbeats, crossfade_bars + async def apply( + self, fade_out_part: bytes, fade_in_part: bytes, pcm_format: AudioFormat + ) -> bytes: + """Apply the standard crossfade to the given PCM audio parts.""" + # We need to override the default apply here, since standard crossfade only needs to be + # applied to the overlapping parts, not the full buffers. + crossfade_size = int(pcm_format.pcm_sample_size * self.crossfade_duration) + # Pre-crossfade: outgoing track minus the crossfaded portion + pre_crossfade = fade_out_part[:-crossfade_size] + # Post-crossfade: incoming track minus the crossfaded portion + post_crossfade = fade_in_part[crossfade_size:] + # Adjust portions to exact crossfade size + adjusted_fade_in_part = fade_in_part[:crossfade_size] + adjusted_fade_out_part = fade_out_part[-crossfade_size:] + # Adjust the duration to match actual sizes + self.crossfade_duration = min( + len(adjusted_fade_in_part) / pcm_format.pcm_sample_size, + len(adjusted_fade_out_part) / pcm_format.pcm_sample_size, ) - if downbeat_positions: - return downbeat_positions - - # Try regular beats if downbeats insufficient - required_beats = crossfade_bars * beats_per_bar - beat_positions = calculate_beat_positions( - fade_out_analysis.beats, fade_in_analysis.beats, required_beats + # Crossfaded portion: user's configured duration + crossfaded_section = await super().apply( + adjusted_fade_out_part, adjusted_fade_in_part, pcm_format ) - if beat_positions: - return beat_positions - - # Fallback: No beat alignment possible - self.logger.debug("No beat alignment possible (insufficient beats)") - return None, None - - def _create_frequency_sweep_filter( - self, - input_label: str, - output_label: str, - sweep_type: str, # 'lowpass' or 'highpass' - target_freq: int, - duration: float, - start_time: float = 0.0, - sweep_direction: str = "fade_in", # 'fade_in' or 'fade_out' - poles: int = 2, - curve_type: str = "linear", # 'linear', 'exponential', 'logarithmic' - ) -> list[str]: - """Generate FFmpeg filters for frequency sweep effect.""" - orig_label = f"{output_label}_orig" - filter_label = f"{output_label}_to{sweep_type[:2]}" - filtered_label = f"{output_label}_filtered" - orig_faded_label = f"{output_label}_orig_faded" - filtered_faded_label = f"{output_label}_filtered_faded" - - # Generate volume expression based on curve type - def generate_volume_expr(start: float, dur: float, direction: str, curve: str) -> str: - t_expr = f"t-{start}" # Time relative to start - norm_t = f"min(max({t_expr},0),{dur})/{dur}" # Normalized 0-1 - - if curve == "exponential": - # Exponential curve for smoother transitions - if direction == "up": - return f"'pow({norm_t},2)':eval=frame" - else: - return f"'1-pow({norm_t},2)':eval=frame" - elif curve == "logarithmic": - # Logarithmic curve for more aggressive initial change - if direction == "up": - return f"'sqrt({norm_t})':eval=frame" - else: - return f"'1-sqrt({norm_t})':eval=frame" - elif direction == "up": - return f"'{norm_t}':eval=frame" - else: - return f"'1-{norm_t}':eval=frame" - - # Determine volume ramp directions based on sweep direction - if sweep_direction == "fade_in": - # Fade from dry to wet (unfiltered to filtered) - orig_direction = "down" - filter_direction = "up" - else: # fade_out - # Fade from wet to dry (filtered to unfiltered) - orig_direction = "up" - filter_direction = "down" - - # Build filter chain - return [ - # Split input into two paths - f"{input_label}asplit=2[{orig_label}][{filter_label}]", - # Apply frequency filter to one path - f"[{filter_label}]{sweep_type}=f={target_freq}:poles={poles}[{filtered_label}]", - # Apply time-varying volume to original path - ( - f"[{orig_label}]volume=" - f"{generate_volume_expr(start_time, duration, orig_direction, curve_type)}" - f"[{orig_faded_label}]" - ), - # Apply time-varying volume to filtered path - ( - f"[{filtered_label}]volume=" - f"{generate_volume_expr(start_time, duration, filter_direction, curve_type)}" - f"[{filtered_faded_label}]" - ), - # Mix the two paths together - ( - f"[{orig_faded_label}][{filtered_faded_label}]" - f"amix=inputs=2:duration=longest:normalize=0[{output_label}]" - ), - ] + # Full result: everything concatenated + return pre_crossfade + crossfaded_section + post_crossfade - def _trim_incoming_track_to_downbeat( - self, - fadein_start_pos: float | None, - fadeout_input_label: str = "[0]", - fadein_input_label: str = "[1]", - ) -> list[str]: - """Perform beat alignment preprocessing. - The incoming track is trimmed to its first downbeat position. - No adjustment is needed for time stretching since the incoming track - is not stretched - it's already at the target BPM. - """ - # Just relabel in case we cannot perform beat alignment - if fadein_start_pos is None: - return [ - f"{fadeout_input_label}anull[fadeout_beatalign]", # codespell:ignore anull - f"{fadein_input_label}anull[fadein_beatalign]", # codespell:ignore anull - ] +############################# +# SMART FADES MIXER LOGIC +############################# +class SmartFadesMixer: + """Smart fades mixer class that mixes tracks based on analysis data.""" - # Trim incoming track to start at first downbeat position - return [ - f"{fadeout_input_label}anull[fadeout_beatalign]", # codespell:ignore anull - f"{fadein_input_label}atrim=start={fadein_start_pos},asetpts=PTS-STARTPTS[fadein_beatalign]", - ] + def __init__(self, mass: MusicAssistant) -> None: + """Initialize smart fades mixer.""" + self.mass = mass + self.logger = logging.getLogger(__name__) + # TODO: Refactor into stream (or metadata) controller after we have split the controllers + self.analyzer = SmartFadesAnalyzer(mass) - def _create_time_stretch_filters( + async def mix( self, - fade_out_analysis: SmartFadesAnalysis, - fade_in_analysis: SmartFadesAnalysis, - crossfade_bars: int, - crossfade_duration: float, - ) -> tuple[list[str], float]: - """Create FFmpeg filters to gradually adjust tempo from original BPM to target BPM. + fade_in_part: bytes, + fade_out_part: bytes, + fade_in_streamdetails: StreamDetails, + fade_out_streamdetails: StreamDetails, + pcm_format: AudioFormat, + standard_crossfade_duration: int = 10, + mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE, + ) -> bytes: + """Apply crossfade with internal state management and smart/standard fallback logic.""" + if mode == SmartFadesMode.DISABLED: + # No crossfade, just concatenate + # Note that this should not happen since we check this before calling mix() + # but just to be sure... + return fade_out_part + fade_in_part - The tempo ramping is completed before the crossfade starts to ensure perfect beat alignment - throughout the entire crossfade region. - """ - # Check if time stretching should be applied (BPM difference < 3%) - original_bpm = fade_out_analysis.bpm - target_bpm = fade_in_analysis.bpm - bpm_ratio = target_bpm / original_bpm - bpm_diff_percent = abs(1.0 - bpm_ratio) * 100 + # strip silence from end of audio of fade_out_part + fade_out_part = await strip_silence( + self.mass, + fade_out_part, + pcm_format=pcm_format, + reverse=True, + ) + # strip silence from begin of audio of fade_in_part + fade_in_part = await strip_silence( + self.mass, + fade_in_part, + pcm_format=pcm_format, + reverse=False, + ) + if mode == SmartFadesMode.STANDARD_CROSSFADE: + smart_fade: SmartFade = StandardCrossFade( + crossfade_duration=standard_crossfade_duration + ) + return await smart_fade.apply( + fade_out_part, + fade_in_part, + pcm_format, + ) + # Attempt smart crossfade with analysis data + fade_out_analysis: SmartFadesAnalysis | None + if stored_analysis := await self.mass.music.get_smart_fades_analysis( + fade_out_streamdetails.item_id, + fade_out_streamdetails.provider, + SmartFadesAnalysisFragment.OUTRO, + ): + fade_out_analysis = stored_analysis + else: + fade_out_analysis = await self.analyzer.analyze( + fade_out_streamdetails.item_id, + fade_out_streamdetails.provider, + SmartFadesAnalysisFragment.OUTRO, + fade_out_part, + pcm_format, + ) - # If no time stretching needed, return passthrough filter and no tempo change - if not ( - 0.1 < bpm_diff_percent <= TIME_STRETCH_BPM_PERCENTAGE_THRESHOLD and crossfade_bars > 4 + fade_in_analysis: SmartFadesAnalysis | None + if stored_analysis := await self.mass.music.get_smart_fades_analysis( + fade_in_streamdetails.item_id, + fade_in_streamdetails.provider, + SmartFadesAnalysisFragment.INTRO, + ): + fade_in_analysis = stored_analysis + else: + fade_in_analysis = await self.analyzer.analyze( + fade_in_streamdetails.item_id, + fade_in_streamdetails.provider, + SmartFadesAnalysisFragment.INTRO, + fade_in_part, + pcm_format, + ) + if ( + fade_out_analysis + and fade_in_analysis + and fade_out_analysis.confidence > 0.3 + and fade_in_analysis.confidence > 0.3 + and mode == SmartFadesMode.SMART_CROSSFADE ): - return ["[0]anull[fadeout_stretched]"], 1.0 # codespell:ignore anull + try: + smart_fade = SmartCrossFade(fade_out_analysis, fade_in_analysis) + return await smart_fade.apply( + fade_out_part, + fade_in_part, + pcm_format, + ) + except Exception as e: + self.logger.warning( + "Smart crossfade failed: %s, falling back to standard crossfade", e + ) - # Log that we're applying time stretching - self.logger.debug( - "Time stretch: %.1f%% BPM diff, adjusting %.1f -> %.1f BPM, crossfade starts at %.1fs", - bpm_diff_percent, - original_bpm, - target_bpm, - SMART_CROSSFADE_DURATION - crossfade_duration, + # Always fallback to Standard Crossfade in case something goes wrong + smart_fade = StandardCrossFade(crossfade_duration=standard_crossfade_duration) + return await smart_fade.apply( + fade_out_part, + fade_in_part, + pcm_format, ) - # Use uniform rubberband time stretching for the entire buffer - # This ensures downbeat adjustment calculations are accurate and beat alignment is perfect - # Rubberband is a high-quality music-specific algorithm optimized for music - self.logger.debug( - "Time stretch (rubberband uniform): %.1f BPM -> %.1f BPM (factor=%.4f)", - original_bpm, - target_bpm, - bpm_ratio, - ) - return [ - f"[0]rubberband=tempo={bpm_ratio:.6f}:transients=mixed:detector=soft:pitchq=quality" - "[fadeout_stretched]" - ], bpm_ratio - def _apply_eq_filters( - self, - fade_out_analysis: SmartFadesAnalysis, - fade_in_analysis: SmartFadesAnalysis, - fade_out_label: str, - fade_in_label: str, - crossfade_duration: float, - crossfade_bars: int, - ) -> list[str]: - """Create LP / HP complementary filters using frequency sweeps for smooth transitions.""" - # Calculate target frequency based on average BPM - avg_bpm = (fade_out_analysis.bpm + fade_in_analysis.bpm) / 2 - bpm_ratio = fade_in_analysis.bpm / fade_out_analysis.bpm +# HELPER METHODS +def get_bpm_diff_percentage(bpm1: float, bpm2: float) -> float: + """Calculate BPM difference percentage between two BPM values.""" + return abs(1.0 - bpm1 / bpm2) * 100 + + +def extrapolate_downbeats( + downbeats: npt.NDArray[np.float64], + tempo_factor: float, + buffer_size: float = SMART_CROSSFADE_DURATION, + bpm: float | None = None, +) -> npt.NDArray[np.float64]: + """Extrapolate downbeats based on actual intervals when detection is incomplete. + + This is needed when we want to perform beat alignment in an 'atmospheric' outro + that does not have any detected downbeats. + + Args: + downbeats: Array of detected downbeat positions in seconds + tempo_factor: Tempo adjustment factor for time stretching + buffer_size: Maximum buffer size in seconds + bpm: Optional BPM for validation when extrapolating with only 2 downbeats + """ + # Handle case with exactly 2 downbeats (with BPM validation) + if len(downbeats) == 2 and bpm is not None: + interval = float(downbeats[1] - downbeats[0]) + + # Expected interval for this BPM (assuming 4/4 time signature) + expected_interval = (60.0 / bpm) * 4 + + # Only extrapolate if interval matches BPM within 15% tolerance + if abs(interval - expected_interval) / expected_interval < 0.15: + # Adjust detected downbeats for time stretching first + adjusted_downbeats = downbeats / tempo_factor + last_downbeat = adjusted_downbeats[-1] + + # If the last downbeat is close to the buffer end, no extrapolation needed + if last_downbeat >= buffer_size - 5: + return adjusted_downbeats + + # Adjust the interval for time stretching + adjusted_interval = interval / tempo_factor + + # Extrapolate forward from last adjusted downbeat using adjusted interval + extrapolated = [] + current_pos = last_downbeat + adjusted_interval + max_extrapolation_distance = 125.0 # Don't extrapolate more than 25s + + while ( + current_pos < buffer_size + and (current_pos - last_downbeat) <= max_extrapolation_distance + ): + extrapolated.append(current_pos) + current_pos += adjusted_interval + + if extrapolated: + # Combine adjusted detected downbeats and extrapolated downbeats + return np.concatenate([adjusted_downbeats, np.array(extrapolated)]) - # 90 BPM -> 1500Hz, 140 BPM -> 2500Hz - crossover_freq = int(np.clip(1500 + (avg_bpm - 90) * 20, 1500, 2500)) + return adjusted_downbeats + # else: interval doesn't match BPM, fall through to return original - # Adjust for BPM mismatch - if abs(bpm_ratio - 1.0) > 0.3: - crossover_freq = int(crossover_freq * 0.85) + if len(downbeats) < 2: + # Need at least 2 downbeats to extrapolate + return downbeats / tempo_factor - # Extended lowpass effect to gradually remove bass frequencies - fadeout_eq_duration = min(max(crossfade_duration * 2.5, 8.0), SMART_CROSSFADE_DURATION) + # Adjust detected downbeats for time stretching first + adjusted_downbeats = downbeats / tempo_factor + last_downbeat = adjusted_downbeats[-1] - # Quicker highpass removal to avoid lingering vocals after crossfade - fadein_eq_duration = crossfade_duration / 1.5 + # If the last downbeat is close to the buffer end, no extrapolation needed + if last_downbeat >= buffer_size - 5: + return adjusted_downbeats - # Calculate when the EQ sweep should start - # The crossfade always happens at the END of the buffer, regardless of beat alignment - fadeout_eq_start = max(0, SMART_CROSSFADE_DURATION - fadeout_eq_duration) + # Calculate intervals from ORIGINAL downbeats (before time stretching) + intervals = np.diff(downbeats) + median_interval = float(np.median(intervals)) + std_interval = float(np.std(intervals)) - # For shorter fades, use exp/exp curves to avoid abruptness - if crossfade_bars < 8: - fadeout_curve = "exponential" - fadein_curve = "exponential" - # For long fades, use log/linear curves - else: - # Use logarithmic curve to give the next track more space - fadeout_curve = "logarithmic" - # Use linear curve for transition, predictable and not too abrupt - fadein_curve = "linear" + # Only extrapolate if intervals are consistent (low standard deviation) + if std_interval > 0.2: + return adjusted_downbeats - self.logger.debug( - "EQ: crossover=%dHz, EQ fadeout duration=%.1fs," - " EQ fadein duration=%.1fs, BPM=%.1f, BPM ratio=%.2f," - " EQ curves: %s/%s", - crossover_freq, - fadeout_eq_duration, - fadein_eq_duration, - avg_bpm, - bpm_ratio, - fadeout_curve, - fadein_curve, - ) + # Adjust the interval for time stretching + # When slowing down (tempo_factor < 1.0), intervals get longer + adjusted_interval = median_interval / tempo_factor - # fadeout (unfiltered → low-pass) - fadeout_filters = self._create_frequency_sweep_filter( - input_label=fade_out_label, - output_label="fadeout_eq", - sweep_type="lowpass", - target_freq=crossover_freq, - duration=fadeout_eq_duration, - start_time=fadeout_eq_start, - sweep_direction="fade_in", - poles=1, - curve_type=fadeout_curve, - ) + # Extrapolate forward from last adjusted downbeat using adjusted interval + extrapolated = [] + current_pos = last_downbeat + adjusted_interval + max_extrapolation_distance = 25.0 # Don't extrapolate more than 25s - # fadein (high-pass → unfiltered) - fadein_filters = self._create_frequency_sweep_filter( - input_label=fade_in_label, - output_label="fadein_eq", - sweep_type="highpass", - target_freq=crossover_freq, - duration=fadein_eq_duration, - start_time=0, - sweep_direction="fade_out", - poles=1, - curve_type=fadein_curve, - ) + while current_pos < buffer_size and (current_pos - last_downbeat) <= max_extrapolation_distance: + extrapolated.append(current_pos) + current_pos += adjusted_interval - return fadeout_filters + fadein_filters + if extrapolated: + # Combine adjusted detected downbeats and extrapolated downbeats + return np.concatenate([adjusted_downbeats, np.array(extrapolated)]) - # FALLBACK DEFAULT CROSSFADE - async def _default_crossfade( - self, - fade_in_part: bytes, - fade_out_part: bytes, - pcm_format: AudioFormat, - crossfade_duration: int = 10, - ) -> bytes: - """Apply a standard crossfade without smart analysis.""" - self.logger.debug("Applying standard crossfade of %ds", crossfade_duration) - crossfade_size = int(pcm_format.pcm_sample_size * crossfade_duration) - # Pre-crossfade: outgoing track minus the crossfaded portion - pre_crossfade = fade_out_part[:-crossfade_size] - # Crossfaded portion: user's configured duration - crossfaded_section = await crossfade_pcm_parts( - fade_in_part[:crossfade_size], - fade_out_part[-crossfade_size:], - pcm_format=pcm_format, - fade_out_pcm_format=pcm_format, - ) - # Post-crossfade: incoming track minus the crossfaded portion - post_crossfade = fade_in_part[crossfade_size:] - # Full result: everything concatenated - return pre_crossfade + crossfaded_section + post_crossfade + return adjusted_downbeats diff --git a/music_assistant/models/smart_fades.py b/music_assistant/models/smart_fades.py index 63bd155eb..bff3d67ec 100644 --- a/music_assistant/models/smart_fades.py +++ b/music_assistant/models/smart_fades.py @@ -12,7 +12,7 @@ class SmartFadesMode(StrEnum): """Smart fades modes.""" - SMART_FADES = "smart_fades" # Use smart fades with beat matching and EQ filters + SMART_CROSSFADE = "smart_crossfade" # Use smart crossfade with beat matching and EQ filters STANDARD_CROSSFADE = "standard_crossfade" # Use standard crossfade only DISABLED = "disabled" # No crossfade