Skip to content

Commit

Permalink
feat: Add full bt-manager data to diagnostics, with redactions
Browse files Browse the repository at this point in the history
  • Loading branch information
agittins committed Jul 16, 2024
1 parent 010fa5c commit 57b5846
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 61 deletions.
144 changes: 83 additions & 61 deletions custom_components/bermuda/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ def __init__(
CONF_UPDATE_INTERVAL, DEFAULT_UPDATE_INTERVAL
)

# match/replacement pairs for redacting addresses
self.redactions: dict[str, str] = {}
# Any remaining MAC addresses will be replaced with this. We define it here
# so we can compile it once.
self._redact_generic_re = re.compile(
r"(?P<start>[0-9A-Fa-f]{2}):([0-9A-Fa-f]{2}:){4}(?P<end>[0-9A-Fa-f]{2})"
)
self._redact_generic_sub = r"\g<start>:xx:xx:xx:xx:\g<end>"

self.stamp_last_prune: float = 0 # When we last pruned device list

super().__init__(
Expand Down Expand Up @@ -1182,71 +1191,84 @@ async def service_dump_devices(
# lowercase all the addresses for matching
addresses = list(map(str.lower, addresses))

# set up redaction lookups. To make troubleshooting easier, we assign
# labels and identifiers to each unique MAC/address.
redactions = {}
if redact:
i = 0
for address in self.scanner_list:
i += 1
redactions[address.upper()] = (
f"{address[:2]}::SCANNER_{i}::{address[-2:]}"
)
i = 0
for address in self.options.get(CONF_DEVICES, []):
if address.upper() not in redactions:
i += 1
if address.count("_") == 2:
redactions[address] = (
f"{address[:4]}::CFG_iBea_{i}::{address[32:]}"
)
elif len(address) == 17:
redactions[address] = (
f"{address[:2]}::CFG_MAC_{i}::{address[-2:]}"
)
else:
# Don't know what it is, but not a mac.
redactions[address] = f"CFG_OTHER_{1}_{address}"

# don't reset i, just continue on for all the other devices.
for address, device in self.devices.items():
if address.upper() not in redactions:
# Only add if they are not already there.
i += 1
if device.address_type == ADDR_TYPE_PRIVATE_BLE_DEVICE:
redactions[address] = f"{address[:2]}::IRK_DEV_{i}"
elif address.count("_") == 2:
redactions[address] = (
f"{address[:4]}::OTHER_iBea_{i}::{address[32:]}"
)
elif len(address) == 17: # a MAC
redactions[address] = (
f"{address[:2]}::OTHER_MAC_{i}::{address[-2:]}"
)
else:
# Don't know what it is.
redactions[address] = f"OTHER_{1}_{address}"

# Build the dict of devices
for address, device in self.devices.items():
if len(addresses) == 0 or address.lower() in addresses:
out[address] = device.to_dict()

def redact_data(data, redactions):
if isinstance(data, str):
for find, fix in redactions.items():
data = re.sub(find, fix, data, flags=re.I)
return data
elif isinstance(data, dict):
return {
redact_data(k, redactions): redact_data(v, redactions)
for k, v in data.items()
}
elif isinstance(data, list):
return [redact_data(v, redactions) for v in data]
else:
return data

if redact:
out = cast(ServiceResponse, redact_data(out, redactions))
self.redaction_list_update()
out = cast(ServiceResponse, self.redact_data(out))
return out

def redaction_list_update(self):
"""Freshen or create the list of match/replace pairs that we use to
redact MAC addresses. This gives a set of helpful address replacements
that still allows identifying device entries without disclosing MAC
addresses."""
i = len(self.redactions) # not entirely accurate but we don't care.

# SCANNERS
for address in self.scanner_list:
if address.upper() not in self.redactions:
i += 1
self.redactions[address.upper()] = (
f"{address[:2]}::SCANNER_{i}::{address[-2:]}"
)
# CONFIGURED DEVICES
for address in self.options.get(CONF_DEVICES, []):
if address.upper() not in self.redactions:
i += 1
if address.count("_") == 2:
self.redactions[address] = (
f"{address[:4]}::CFG_iBea_{i}::{address[32:]}"
)
elif len(address) == 17:
self.redactions[address] = (
f"{address[:2]}::CFG_MAC_{i}::{address[-2:]}"
)
else:
# Don't know what it is, but not a mac.
self.redactions[address] = f"CFG_OTHER_{1}_{address}"
# EVERYTHING ELSE
for address, device in self.devices.items():
if address.upper() not in self.redactions:
# Only add if they are not already there.
i += 1
if device.address_type == ADDR_TYPE_PRIVATE_BLE_DEVICE:
self.redactions[address] = f"{address[:2]}::IRK_DEV_{i}"
elif address.count("_") == 2:
self.redactions[address] = (
f"{address[:4]}::OTHER_iBea_{i}::{address[32:]}"
)
elif len(address) == 17: # a MAC
self.redactions[address] = (
f"{address[:2]}::OTHER_MAC_{i}::{address[-2:]}"
)
else:
# Don't know what it is.
self.redactions[address] = f"OTHER_{1}_{address}"

def redact_data(self, data):
"""Wash any collection of data of any MAC addresses.
Uses the redaction list of substitutions if already created, then
washes any remaining mac-like addresses. This routine is recursive,
so if you're changing it bear that in mind!"""
if len(self.redactions) == 0:
# Initialise the list of addresses if not already done.
self.redaction_list_update()
if isinstance(data, str):
# the end of the recursive wormhole, do the actual work:
for find, fix in self.redactions.items():
data = re.sub(find, fix, data, flags=re.I)
# redactions done, now replace any remaining MAC addresses
# We are only looking for xx:xx:xx... format.
data = self._redact_generic_re.sub(self._redact_generic_sub, data)
return data
elif isinstance(data, dict):
return {self.redact_data(k): self.redact_data(v) for k, v in data.items()}
elif isinstance(data, list):
return [self.redact_data(v) for v in data]
else:
return data
8 changes: 8 additions & 0 deletions custom_components/bermuda/diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import Any

from homeassistant.components.bluetooth.api import _get_manager
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.core import ServiceCall
Expand All @@ -18,12 +19,19 @@ async def async_get_config_entry_diagnostics(
"""Return diagnostics for a config entry."""
coordinator: BermudaDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id]

# We can call this with our own config_entry because the diags step doesn't
# actually use it.

bt_manager = _get_manager(hass)
bt_diags = await bt_manager.async_diagnostics()

# Param structure for service call
call = ServiceCall(DOMAIN, "dump_devices", {"redact": True})

data: dict[str, Any] = {
"active_devices": f"{coordinator.count_active_devices()}/{len(coordinator.devices)}",
"active_scanners": f"{coordinator.count_active_scanners()}/{len(coordinator.scanner_list)}",
"devices": await coordinator.service_dump_devices(call),
"bt_manager": coordinator.redact_data(bt_diags),
}
return data

0 comments on commit 57b5846

Please sign in to comment.