From 365b9cdd2f8a531aaf8d8479b8bf1a6cc0d97661 Mon Sep 17 00:00:00 2001 From: Brian Madden Date: Tue, 17 Oct 2023 12:57:21 +0200 Subject: [PATCH] FAST: Fleshed out AUD communicator Still need to write tests --- mpf/platforms/fast/communicators/aud.py | 115 ++++++++++++++++++++---- mpf/platforms/fast/fast_audio.py | 9 +- 2 files changed, 103 insertions(+), 21 deletions(-) diff --git a/mpf/platforms/fast/communicators/aud.py b/mpf/platforms/fast/communicators/aud.py index fb336b9a2..e99c8b677 100644 --- a/mpf/platforms/fast/communicators/aud.py +++ b/mpf/platforms/fast/communicators/aud.py @@ -2,6 +2,7 @@ from mpf.platforms.fast.communicators.base import FastSerialCommunicator from mpf.platforms.fast.fast_audio import FASTAudioInterface +from mpf.core.utility_functions import Util class FastAudCommunicator(FastSerialCommunicator): @@ -18,58 +19,134 @@ class FastAudCommunicator(FastSerialCommunicator): MIN_FW = version.parse('0.10') IGNORED_MESSAGES = ['AV:', 'AS:', 'AH:', 'AM:'] - # __slots__ = [] + __slots__ = ["amps", "current_config_byte", "phones_level", "phones_mute", "_watchdog_ms"] def __init__(self, platform, processor, config): super().__init__(platform, processor, config) + self.amps = { + 'main': + {'cmd': 'AV', + 'volume': '00', + 'enabled': False, + }, + 'sub': + {'cmd': 'AS', + 'volume': '00', + 'enabled': False, + }, + 'headphones': + {'cmd': 'AH', + 'volume': '00', + 'enabled': False, + }, + } + self.current_config_byte = '00' + self.phones_level = True # False = line, True = phones + self.phones_mute = False # False = ignore phones insertion, True = mute main/sub when phones inserted + self._watchdog_ms = 0 async def init(self): self.platform.audio_interface = FASTAudioInterface(self.platform, self) - # set volumes - # send settings - - async def clear_board_serial_buffer(self): - pass + await self.soft_reset() async def soft_reset(self): - # rewrite current volumes and config - pass + for amp in self.amps: + self.set_volume(amp, self.amps[amp]['volume']) + self.update_config() + + def _volume_to_hw(self, volume): + volume = int(volume) + assert 0 <= volume <= 63, f"Invalid volume {volume}" + return f"{volume:02X}" + + def update_config(self, send_now=True): + byte = '00' + if self.amps['main']['enabled']: + byte = Util.set_bit(byte, 0) + if self.amps['sub']['enabled']: + byte = Util.set_bit(byte, 1) + if self.amps['headphones']['enabled']: + if not self.phones_level: # line level + byte = Util.set_bit(byte, 2) + byte = Util.set_bit(byte, 3) + else: # phones level + if self.phones_mute: + byte = Util.set_bit(byte, 3) + else: + byte = Util.set_bit(byte, 2) + + self.current_config_byte = byte + + if send_now: + self.send_config_to_board() def set_volume(self, amp, volume): - pass + if amp not in self.amps: + raise AssertionError(f"Invalid amp {amp}") + + volume = self._volume_to_hw(volume) + self.amps[amp]['volume'] = volume + self.send_and_forget(f"{self.amps[amp]['cmd']}:{volume}") def get_volume(self, amp): - pass + return int(self.amps[amp]['volume'], 16) def enable_amp(self, amp): - pass + if amp not in self.amps: + raise AssertionError(f"Invalid amp {amp}") + self.amps[amp]['enabled'] = True + self.update_config() def disable_amp(self, amp): - pass + if amp not in self.amps: + raise AssertionError(f"Invalid amp {amp}") + self.amps[amp]['enabled'] = False + self.update_config() def set_phones_level(self, mode, send_now=True): - # phones or line - pass + if mode == 'line': + self.phones_level = False + elif mode == 'headphones': + self.phones_level = True + else: + raise AssertionError(f"Invalid phones level {mode}") + + self.update_config(send_now) def set_phones_behavior(self, behavior, send_now=True): - # mute or ignore - pass + # Will return False if not possible, true if ok + if not self.phones_level: # phones are line level, this does not apply + return False + + if behavior == 'mute': + self.phones_mute = True + elif behavior == 'ignore': + self.phones_mute = False + else: + raise AssertionError(f"Invalid phones behavior {behavior}") + + self.update_config(send_now) + return True def send_config_to_board(self): - pass + self.send_and_forget(f"AM:{self.current_config_byte}") def save_settings_to_firmware(self): - pass + self.send_and_forget("AW:") def set_watchdog(self, timeout): pass + # TODO setup task to send watchdog def get_phones_status(self): pass + # TODO WD command processing def get_power_status(self): pass + # TODO WD command processing def pulse_output_pin(self, pin, ms): - pass + assert 0 < pin < 7, f"Invalid pin {pin}" + self.send_and_forget(f"XO:{pin:02X}:{hex(ms)[2:].upper()}") diff --git a/mpf/platforms/fast/fast_audio.py b/mpf/platforms/fast/fast_audio.py index e5cb517e8..4514c907d 100644 --- a/mpf/platforms/fast/fast_audio.py +++ b/mpf/platforms/fast/fast_audio.py @@ -1,5 +1,4 @@ from math import ceil -from mpf.core.utility_functions import Util from mpf.core.logging import LogMixin @@ -24,7 +23,13 @@ def __init__(self, platform, communicator): # TODO set amp enable settings self.communicator.set_phones_level(communicator.config['headphones_level'], False) - self.communicator.set_phones_behavior(communicator.config['mute_speakers_with_headphones'], False) + + if communicator.config['mute_speakers_with_headphones']: + phones = 'mute' + else: + phones = 'ignore' + + self.communicator.set_phones_behavior(phones, False) self.communicator.send_config_to_board() # TODO send initial volumes