From 57b584630edf3e50e79c7ad8e8fe91ea2ac61b4c Mon Sep 17 00:00:00 2001 From: Ashley Gittins Date: Tue, 16 Jul 2024 02:52:46 +0000 Subject: [PATCH] feat: Add full bt-manager data to diagnostics, with redactions --- custom_components/bermuda/coordinator.py | 144 +++++++++++++---------- custom_components/bermuda/diagnostics.py | 8 ++ 2 files changed, 91 insertions(+), 61 deletions(-) diff --git a/custom_components/bermuda/coordinator.py b/custom_components/bermuda/coordinator.py index b31afca..e65634d 100644 --- a/custom_components/bermuda/coordinator.py +++ b/custom_components/bermuda/coordinator.py @@ -131,6 +131,15 @@ def __init__( CONF_UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL ) + # match/replacement pairs for redacting addresses + self.redactions: dict[str, str] = {} + # Any remaining MAC addresses will be replaced with this. We define it here + # so we can compile it once. + self._redact_generic_re = re.compile( + r"(?P[0-9A-Fa-f]{2}):([0-9A-Fa-f]{2}:){4}(?P[0-9A-Fa-f]{2})" + ) + self._redact_generic_sub = r"\g:xx:xx:xx:xx:\g" + self.stamp_last_prune: float = 0 # When we last pruned device list super().__init__( @@ -1182,71 +1191,84 @@ async def service_dump_devices( # lowercase all the addresses for matching addresses = list(map(str.lower, addresses)) - # set up redaction lookups. To make troubleshooting easier, we assign - # labels and identifiers to each unique MAC/address. - redactions = {} - if redact: - i = 0 - for address in self.scanner_list: - i += 1 - redactions[address.upper()] = ( - f"{address[:2]}::SCANNER_{i}::{address[-2:]}" - ) - i = 0 - for address in self.options.get(CONF_DEVICES, []): - if address.upper() not in redactions: - i += 1 - if address.count("_") == 2: - redactions[address] = ( - f"{address[:4]}::CFG_iBea_{i}::{address[32:]}" - ) - elif len(address) == 17: - redactions[address] = ( - f"{address[:2]}::CFG_MAC_{i}::{address[-2:]}" - ) - else: - # Don't know what it is, but not a mac. - redactions[address] = f"CFG_OTHER_{1}_{address}" - - # don't reset i, just continue on for all the other devices. - for address, device in self.devices.items(): - if address.upper() not in redactions: - # Only add if they are not already there. - i += 1 - if device.address_type == ADDR_TYPE_PRIVATE_BLE_DEVICE: - redactions[address] = f"{address[:2]}::IRK_DEV_{i}" - elif address.count("_") == 2: - redactions[address] = ( - f"{address[:4]}::OTHER_iBea_{i}::{address[32:]}" - ) - elif len(address) == 17: # a MAC - redactions[address] = ( - f"{address[:2]}::OTHER_MAC_{i}::{address[-2:]}" - ) - else: - # Don't know what it is. - redactions[address] = f"OTHER_{1}_{address}" - # Build the dict of devices for address, device in self.devices.items(): if len(addresses) == 0 or address.lower() in addresses: out[address] = device.to_dict() - def redact_data(data, redactions): - if isinstance(data, str): - for find, fix in redactions.items(): - data = re.sub(find, fix, data, flags=re.I) - return data - elif isinstance(data, dict): - return { - redact_data(k, redactions): redact_data(v, redactions) - for k, v in data.items() - } - elif isinstance(data, list): - return [redact_data(v, redactions) for v in data] - else: - return data - if redact: - out = cast(ServiceResponse, redact_data(out, redactions)) + self.redaction_list_update() + out = cast(ServiceResponse, self.redact_data(out)) return out + + def redaction_list_update(self): + """Freshen or create the list of match/replace pairs that we use to + redact MAC addresses. This gives a set of helpful address replacements + that still allows identifying device entries without disclosing MAC + addresses.""" + i = len(self.redactions) # not entirely accurate but we don't care. + + # SCANNERS + for address in self.scanner_list: + if address.upper() not in self.redactions: + i += 1 + self.redactions[address.upper()] = ( + f"{address[:2]}::SCANNER_{i}::{address[-2:]}" + ) + # CONFIGURED DEVICES + for address in self.options.get(CONF_DEVICES, []): + if address.upper() not in self.redactions: + i += 1 + if address.count("_") == 2: + self.redactions[address] = ( + f"{address[:4]}::CFG_iBea_{i}::{address[32:]}" + ) + elif len(address) == 17: + self.redactions[address] = ( + f"{address[:2]}::CFG_MAC_{i}::{address[-2:]}" + ) + else: + # Don't know what it is, but not a mac. + self.redactions[address] = f"CFG_OTHER_{1}_{address}" + # EVERYTHING ELSE + for address, device in self.devices.items(): + if address.upper() not in self.redactions: + # Only add if they are not already there. + i += 1 + if device.address_type == ADDR_TYPE_PRIVATE_BLE_DEVICE: + self.redactions[address] = f"{address[:2]}::IRK_DEV_{i}" + elif address.count("_") == 2: + self.redactions[address] = ( + f"{address[:4]}::OTHER_iBea_{i}::{address[32:]}" + ) + elif len(address) == 17: # a MAC + self.redactions[address] = ( + f"{address[:2]}::OTHER_MAC_{i}::{address[-2:]}" + ) + else: + # Don't know what it is. + self.redactions[address] = f"OTHER_{1}_{address}" + + def redact_data(self, data): + """Wash any collection of data of any MAC addresses. + + Uses the redaction list of substitutions if already created, then + washes any remaining mac-like addresses. This routine is recursive, + so if you're changing it bear that in mind!""" + if len(self.redactions) == 0: + # Initialise the list of addresses if not already done. + self.redaction_list_update() + if isinstance(data, str): + # the end of the recursive wormhole, do the actual work: + for find, fix in self.redactions.items(): + data = re.sub(find, fix, data, flags=re.I) + # redactions done, now replace any remaining MAC addresses + # We are only looking for xx:xx:xx... format. + data = self._redact_generic_re.sub(self._redact_generic_sub, data) + return data + elif isinstance(data, dict): + return {self.redact_data(k): self.redact_data(v) for k, v in data.items()} + elif isinstance(data, list): + return [self.redact_data(v) for v in data] + else: + return data diff --git a/custom_components/bermuda/diagnostics.py b/custom_components/bermuda/diagnostics.py index 19df269..f73f8ce 100644 --- a/custom_components/bermuda/diagnostics.py +++ b/custom_components/bermuda/diagnostics.py @@ -4,6 +4,7 @@ from typing import Any +from homeassistant.components.bluetooth.api import _get_manager from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.core import ServiceCall @@ -18,6 +19,12 @@ async def async_get_config_entry_diagnostics( """Return diagnostics for a config entry.""" coordinator: BermudaDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id] + # We can call this with our own config_entry because the diags step doesn't + # actually use it. + + bt_manager = _get_manager(hass) + bt_diags = await bt_manager.async_diagnostics() + # Param structure for service call call = ServiceCall(DOMAIN, "dump_devices", {"redact": True}) @@ -25,5 +32,6 @@ async def async_get_config_entry_diagnostics( "active_devices": f"{coordinator.count_active_devices()}/{len(coordinator.devices)}", "active_scanners": f"{coordinator.count_active_scanners()}/{len(coordinator.scanner_list)}", "devices": await coordinator.service_dump_devices(call), + "bt_manager": coordinator.redact_data(bt_diags), } return data