Skip to content

Commit

Permalink
Formatting for linter
Browse files Browse the repository at this point in the history
Signed-off-by: paulober <[email protected]>
  • Loading branch information
paulober committed Oct 16, 2024
1 parent bf9ab5e commit ee038ab
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 131 deletions.
15 changes: 6 additions & 9 deletions cloudinit/config/cc_rpi_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,17 @@
"id": "cc_rpi_connect",
"distros": ["raspberry-pi-os"],
"frequency": PER_INSTANCE,
"activate_by_schema_keys": [
ENABLE_RPI_CONNECT_KEY
]
"activate_by_schema_keys": [ENABLE_RPI_CONNECT_KEY],
}


def configure_rpi_connect(enable: bool) -> None:
LOG.debug(f"Configuring rpi-connect: {enable}")

num = 0 if enable else 1

try:
subp.subp([
"/usr/bin/raspi-config",
"do_rpi_connect",
str(num)
])
subp.subp(["/usr/bin/raspi-config", "do_rpi_connect", str(num)])
except subp.ProcessExecutionError as e:
LOG.error(f"Failed to configure rpi-connect: {e}")

Expand All @@ -47,4 +42,6 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
if isinstance(enable, bool):
configure_rpi_connect(enable)
else:
LOG.warning(f"Invalid value for {ENABLE_RPI_CONNECT_KEY}: {enable}")
LOG.warning(
f"Invalid value for {ENABLE_RPI_CONNECT_KEY}: " + str(enable)
)
83 changes: 47 additions & 36 deletions cloudinit/config/cc_rpi_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"i2c": "do_i2c",
"serial": "do_serial",
"onewire": "do_onewire",
"remote_gpio": "do_rgpio"
"remote_gpio": "do_rgpio",
}
RASPI_CONFIG_SERIAL_CONS_FN = "do_serial_cons"
RASPI_CONFIG_SERIAL_HW_FN = "do_serial_hw"
Expand All @@ -28,28 +28,25 @@
"id": "cc_rpi_interfaces",
"distros": ["raspberry-pi-os"],
"frequency": PER_INSTANCE,
"activate_by_schema_keys": [
RPI_INTERFACES_KEY
]
"activate_by_schema_keys": [RPI_INTERFACES_KEY],
}


# TODO: test
def require_reboot(cfg: Config) -> None:
cfg["power_state"] = cfg.get("power_state", {})
cfg["power_state"]["mode"] = cfg["power_state"].get("mode", "reboot")
cfg["power_state"]["condition"] = True


def is_pifive() -> bool:
try:
subp.subp([
"/usr/bin/raspi-config",
"nonint",
"is_pifive"
])
subp.subp(["/usr/bin/raspi-config", "nonint", "is_pifive"])
return True
except subp.ProcessExecutionError:
return False


def configure_serial_interface(cfg: dict | bool, instCfg: Config) -> None:
enable_console = False
enable_hw = False
Expand All @@ -58,7 +55,7 @@ def configure_serial_interface(cfg: dict | bool, instCfg: Config) -> None:
enable_console = cfg.get("console", False)
enable_hw = cfg.get("hardware", False)
elif isinstance(cfg, bool):
# default to enabling console as if < pi5
# default to enabling console as if < pi5
# this will also enable the hardware
enable_console = cfg

Expand All @@ -69,48 +66,58 @@ def configure_serial_interface(cfg: dict | bool, instCfg: Config) -> None:
enable_hw = True

try:
subp.subp([
"/usr/bin/raspi-config",
"nonint",
RASPI_CONFIG_SERIAL_CONS_FN,
str(0 if enable_console else 1)
])

try:
subp.subp([
subp.subp(
[
"/usr/bin/raspi-config",
"nonint",
RASPI_CONFIG_SERIAL_HW_FN,
str(0 if enable_hw else 1)
])
RASPI_CONFIG_SERIAL_CONS_FN,
str(0 if enable_console else 1),
]
)

try:
subp.subp(
[
"/usr/bin/raspi-config",
"nonint",
RASPI_CONFIG_SERIAL_HW_FN,
str(0 if enable_hw else 1),
]
)
except subp.ProcessExecutionError as e:
LOG.error(f"Failed to configure serial hardware: {e}")

require_reboot(instCfg)
except subp.ProcessExecutionError as e:
LOG.error(f"Failed to configure serial console: {e}")


def configure_interface(iface: str, enable: bool) -> None:
assert iface in SUPPORTED_INTERFACES.keys() \
and iface != "serial", \
f"Unsupported interface: {iface}"
assert (
iface in SUPPORTED_INTERFACES.keys() and iface != "serial"
), f"Unsupported interface: {iface}"

try:
subp.subp([
"/usr/bin/raspi-config",
"nonint",
SUPPORTED_INTERFACES[iface],
str(0 if enable else 1)
])
subp.subp(
[
"/usr/bin/raspi-config",
"nonint",
SUPPORTED_INTERFACES[iface],
str(0 if enable else 1),
]
)
except subp.ProcessExecutionError as e:
LOG.error(f"Failed to configure {iface}: {e}")


def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
if RPI_INTERFACES_KEY not in cfg:
return
elif not isinstance(cfg[RPI_INTERFACES_KEY], dict):
LOG.warning(f"Invalid value for {RPI_INTERFACES_KEY}: {cfg[RPI_INTERFACES_KEY]}")
LOG.warning(
f"Invalid value for {RPI_INTERFACES_KEY}: "
+ cfg[RPI_INTERFACES_KEY]
)
return
elif not cfg[RPI_INTERFACES_KEY]:
LOG.debug(f"Empty value for {RPI_INTERFACES_KEY}. Skipping...")
Expand All @@ -126,11 +133,15 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:

if key == "serial":
if not isinstance(enable, dict) and not isinstance(enable, bool):
LOG.warning(f"Invalid value for {RPI_INTERFACES_KEY}.{key}: {enable}")
LOG.warning(
f"Invalid value for {RPI_INTERFACES_KEY}.{key}: {enable}"
)
continue
configure_serial_interface(enable, cfg)

if isinstance(enable, bool):
configure_interface(key, enable)
else:
LOG.warning(f"Invalid value for {RPI_INTERFACES_KEY}.{key}: {enable}")
LOG.warning(
f"Invalid value for {RPI_INTERFACES_KEY}.{key}: {enable}"
)
102 changes: 72 additions & 30 deletions cloudinit/config/cc_rpi_userdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
# This file is part of cloud-init. See LICENSE file for license information.

import logging
import os, time
import os
import time
import subprocess
from cloudinit import subp
from cloudinit.cloud import Cloud
Expand All @@ -23,46 +24,57 @@
meta: MetaSchema = {
"id": "cc_rpi_userdata",
"distros": ["raspberry-pi-os"],
"frequency": PER_ALWAYS, # Run every boot to trigger setup wizard even when no settings
"activate_by_schema_keys": [
DISABLE_PIWIZ_KEY,
RPI_USERCONF_KEY
]
# Run every boot to trigger setup wizard even when no settings
"frequency": PER_ALWAYS,
"activate_by_schema_keys": [DISABLE_PIWIZ_KEY, RPI_USERCONF_KEY],
}


def bool_to_str(value: bool | None) -> str:
return "Yes" if value else "No"


def get_fwloc_or_default() -> str:
fwloc = None
try:
# Run the command and capture the output
fwloc = subp.subp(
['/usr/lib/raspberrypi-sys-mods/get_fw_loc'], decode='strict') \
.stdout.strip()
["/usr/lib/raspberrypi-sys-mods/get_fw_loc"], decode="strict"
).stdout.strip()

# If the output is empty, set the default value
if not fwloc:
fwloc = '/boot'
fwloc = "/boot"
except subp.ProcessExecutionError:
# If the command fails, set the default value
fwloc = '/boot'
fwloc = "/boot"
return fwloc

def run_userconf_service(base: str | None, passwd_override: str | None = None) -> bool:

def run_userconf_service(
base: str | None, passwd_override: str | None = None
) -> bool:
try:
# reset the TTY device
os.system(f"echo 'reset\\r\\n' > {USERCONF_SERVICE_TTY}")

time.sleep(1)
# Execute the command on different tty
result = subp.subp([
"openvt",
"-s", "-f", "-w", "-c", USERCONF_SERVICE_TTY[-1],
"--", "/usr/lib/userconf-pi/userconf-service"],
result = subp.subp(
[
"openvt",
"-s",
"-f",
"-w",
"-c",
USERCONF_SERVICE_TTY[-1],
"--",
"/usr/lib/userconf-pi/userconf-service",
],
timeout=(None if not passwd_override else 10),
decode='strict')

decode="strict",
)

if base:
try:
os.remove(f"{base}/userconf.txt")
Expand Down Expand Up @@ -94,7 +106,10 @@ def run_userconf_service(base: str | None, passwd_override: str | None = None) -
pass
return False

def run_service(passwd_override: str | None = None, user_override: str | None = None) -> bool:

def run_service(
passwd_override: str | None = None, user_override: str | None = None
) -> bool:
# Ensure the TTY exists before trying to open it
if not os.path.exists(USERCONF_SERVICE_TTY):
if not passwd_override:
Expand All @@ -103,10 +118,13 @@ def run_service(passwd_override: str | None = None, user_override: str | None =
else:
LOG.debug(f"TTY device {USERCONF_SERVICE_TTY} does not exist.")

# should never happen and not solvable by the user
assert (passwd_override is None and user_override is None) or \
(passwd_override is not None and user_override is not None), \
"Internal error: User override is required when password override is provided."
# should never happen and not solvable by the user
assert (passwd_override is None and user_override is None) or (
passwd_override is not None and user_override is not None
), (
"Internal error: User override is required when password "
+ "override is provided."
)

base: str | None = None
if passwd_override:
Expand All @@ -129,11 +147,23 @@ def run_service(passwd_override: str | None = None, user_override: str | None =
LOG.debug("Userconf-pi service loop finished.")
return True

def configure_pizwiz(cfg: Config, disable: bool, passwd_override: str | None, user_override: str | None = None) -> None:
LOG.debug("Configuring piwiz with disable_piwiz=%s, passwd_override=%s, user_override=%s", disable, bool_to_str(passwd_override), bool_to_str(user_override))

def configure_pizwiz(
cfg: Config,
disable: bool,
passwd_override: str | None,
user_override: str | None = None,
) -> None:
LOG.debug(
"Configuring piwiz with disable_piwiz=%s, passwd_override=%s, "
+ "user_override=%s",
bool_to_str(disable),
bool_to_str(passwd_override != None),
bool_to_str(user_override != None),
)

if disable:
# execute cancel rename script to ensure
# execute cancel rename script to ensure
# piwiz isn't started (on desktop)
os.system("/usr/bin/cancel-rename pi")
else:
Expand All @@ -149,32 +179,44 @@ def configure_pizwiz(cfg: Config, disable: bool, passwd_override: str | None, us
elif "pi" not in cfg["users"]:
cfg["users"].append("pi")


def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
disable_piwiz: bool = False
password_override: str | None = None
user_override: str | None = None

if os.path.exists(MODULE_DEACTIVATION_FILE) or not os.path.exists("/usr/lib/userconf-pi"):
LOG.debug("Userconf-Pi: deactivation file detected or userconf-pi not installed. Skipping...")
if os.path.exists(MODULE_DEACTIVATION_FILE) or not os.path.exists(
"/usr/lib/userconf-pi"
):
LOG.debug(
"Userconf-Pi: deactivation file detected or userconf-pi "
+ "not installed. Skipping..."
)
return

if RPI_USERCONF_KEY in cfg:
# expect it to be a dictionary
userconf = cfg[RPI_USERCONF_KEY]

# look over miss configuration to
# look over miss configuration to
if isinstance(userconf, dict) and "password" in userconf:
password_override = userconf["password"]
# user key is optional with default to pi
user_override = userconf.get("user", "pi")
LOG.debug(f"Userconf override: user={user_override}, password=<REDACTED>")
LOG.debug(
f"Userconf override: user={user_override}, "
+ "password=<REDACTED>"
)
else:
LOG.error(f"Invalid userconf-pi configuration: {userconf}")

if not password_override and DISABLE_PIWIZ_KEY in cfg:
if isinstance(cfg[DISABLE_PIWIZ_KEY], bool):
disable_piwiz = cfg[DISABLE_PIWIZ_KEY]
else:
LOG.error(f"Invalid {DISABLE_PIWIZ_KEY} configuration: {cfg[DISABLE_PIWIZ_KEY]}")
LOG.error(
f"Invalid {DISABLE_PIWIZ_KEY} configuration: "
+ cfg[DISABLE_PIWIZ_KEY]
)

configure_pizwiz(cfg, disable_piwiz, password_override, user_override)
Loading

0 comments on commit ee038ab

Please sign in to comment.