From 934f51293ee763fea002552ccd22317d0da57c22 Mon Sep 17 00:00:00 2001 From: Dermot Bradley Date: Sun, 2 Jun 2024 02:12:49 +0100 Subject: [PATCH] Currently cc_user_groups assumes that "useradd" never locks the password field of newly created users. This is an incorrect assumption. From the useradd manpage: '-p, --password PASSWORD The encrypted password, as returned by crypt(3). The default is to disable the password.' That is, if cloud-init runs 'useradd' but does not pass it the "-p" option (with an encrypted password) then the new user's password field will be locked by "useradd". cloud-init only passes the "-p" option when calling "useradd" when user-data specifies the "passwd" option for a new user. For user-data that specifies either the "hashed_passwd" or "plain_text_passwd" options instead then cloud-init calls "useradd" without the "-p" option and so the password field of such a user will be locked by "useradd". For user-data that specifies "hash_passwd" for a new user then "useradd" is called with no "-p" option, so causing "useradd" to lock the password field, however then cloud-init calls "chpasswd -e" to set the encrypted password which also results in the password field being unlocked. For user-data that specifies "plain_text_passwd" for a new user then "useradd" is called with no "-p" option, so causing "useradd" to lock the password. cloud-init then calls "chpasswd" to set the password which also results in the password field being unlocked. For user-data that specifies no password at all for a new user then "useradd" is called with no "-p" option, so causing "useradd" to lock the password. The password field is left locked. In all the above scenarios "passwd -l" may be called later by cloud-init to enforce "lock_passwd: true"). Conversely where "lock_passwd: false" applies the above "usermod" situation (for "hash_passwd", "plain_text_passwd" or no password) means that newly created users may have password fields locked when they should be unlocked. For Alpine, "adduser" does not support any form of password being passed and it always locks the password field. Therefore the password needs to be unlocked if "lock_passwd: false". This PR changes the add_user function to explicitly call either lock_passwd or unlock_passwd to achieve the desired final result. As a "safety" feature when "lock_passwd: false" is defined for a (either new or existing) user without any password value then it will *not* unlock the passsword. This "safety" feature can be overriden by specifying a blank password in the user-data (i.e. passwd: ""). For DragonflyBSD/FreeBSD add a stub unlock_passwd function that does nothing except generate a debug log message as their lock method is not reversible. For OpenBSD modify the existing stub unlock_passwd function to generate a debug log message as their lock method is not reversible. --- cloudinit/distros/__init__.py | 218 +++++++++++++++++-- cloudinit/distros/alpine.py | 39 +++- cloudinit/distros/bsd.py | 1 + cloudinit/distros/freebsd.py | 36 ++- cloudinit/distros/netbsd.py | 34 ++- cloudinit/distros/openbsd.py | 25 ++- tests/unittests/distros/test_create_users.py | 104 ++++++++- tests/unittests/distros/test_dragonflybsd.py | 58 +++++ tests/unittests/distros/test_freebsd.py | 30 +++ tests/unittests/distros/test_openbsd.py | 51 +++++ 10 files changed, 574 insertions(+), 22 deletions(-) create mode 100644 tests/unittests/distros/test_openbsd.py diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py index 4312d8038055..72309a81c04b 100644 --- a/cloudinit/distros/__init__.py +++ b/cloudinit/distros/__init__.py @@ -133,6 +133,7 @@ class Distro(persistence.CloudInitPickleMixin, metaclass=abc.ABCMeta): doas_fn = "/etc/doas.conf" ci_sudoers_fn = "/etc/sudoers.d/90-cloud-init-users" hostname_conf_fn = "/etc/hostname" + shadow_fn = "/etc/shadow" tz_zone_dir = "/usr/share/zoneinfo" default_owner = "root:root" init_cmd = ["service"] # systemctl, service etc @@ -649,19 +650,21 @@ def preferred_ntp_clients(self): def get_default_user(self): return self.get_option("default_user") - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: """ Add a user to the system using standard GNU tools This should be overridden on distros where useradd is not desirable or not available. + + Returns False if user already exists, otherwise True. """ # XXX need to make add_user idempotent somehow as we # still want to add groups or modify SSH keys on pre-existing # users in the image. if util.is_user(name): LOG.info("User %s already exists, skipping.", name) - return + return False if "create_groups" in kwargs: create_groups = kwargs.pop("create_groups") @@ -765,6 +768,9 @@ def add_user(self, name, **kwargs): util.logexc(LOG, "Failed to create user %s", name) raise e + # Indicate that a new user was created + return True + def add_snap_user(self, name, **kwargs): """ Add a snappy user to the system using snappy tools @@ -792,6 +798,62 @@ def add_snap_user(self, name, **kwargs): return username + def _check_if_password_field_matches( + self, username, pattern1, pattern2, pattern3=None, check_file=None + ) -> bool: + """ + Check whether ``username`` user has a hashed password matching + either pattern. + + FreeBSD, NetBSD, and OpenBSD use 3 patterns, others only use + 2 patterns. + + Returns either 'True' to indicate a match, otherwise 'False'. + """ + + if not check_file: + check_file = self.shadow_fn + + cmd = [ + "grep", + "-q", + "-e", + "^%s%s" % (username, pattern1), + "-e", + "^%s%s" % (username, pattern2), + ] + if pattern3 is not None: + cmd.extend(["-e", "^%s%s" % (username, pattern3)]) + cmd.append(check_file) + try: + subp.subp(cmd) + except subp.ProcessExecutionError as e: + if e.exit_code == 1: + # Exit code 1 means 'grep' didn't find empty password + return True + else: + util.logexc( + LOG, + "Failed to check the status of password for user %s", + username, + ) + raise e + return False + + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, "::", ":!:", check_file=shadow_file + ) + return status + def create_user(self, name, **kwargs): """ Creates or partially updates the ``name`` user in the system. @@ -818,20 +880,103 @@ def create_user(self, name, **kwargs): return self.add_snap_user(name, **kwargs) # Add the user - self.add_user(name, **kwargs) - - # Set password if plain-text password provided and non-empty - if "plain_text_passwd" in kwargs and kwargs["plain_text_passwd"]: - self.set_passwd(name, kwargs["plain_text_passwd"]) - - # Set password if hashed password is provided and non-empty - if "hashed_passwd" in kwargs and kwargs["hashed_passwd"]: - self.set_passwd(name, kwargs["hashed_passwd"], hashed=True) + pre_existing_user = not self.add_user(name, **kwargs) + + has_existing_password = False + ud_blank_password_specified = False + ud_password_specified = False + password_key = None + + if "plain_text_passwd" in kwargs: + ud_password_specified = True + password_key = "plain_text_passwd" + if kwargs["plain_text_passwd"]: + # Set password if plain-text password provided and non-empty + self.set_passwd(name, kwargs["plain_text_passwd"]) + else: + ud_blank_password_specified = True + + if "hashed_passwd" in kwargs: + ud_password_specified = True + password_key = "hashed_passwd" + if kwargs["hashed_passwd"]: + # Set password if hashed password is provided and non-empty + self.set_passwd(name, kwargs["hashed_passwd"], hashed=True) + else: + ud_blank_password_specified = True + + if pre_existing_user: + if not ud_password_specified: + if "passwd" in kwargs: + password_key = "passwd" + # Only "plain_text_passwd" and "hashed_passwd" + # are valid for an existing user. + LOG.warning( + "'passwd' in user-data is ignored for existing " + "user %s", + name, + ) - # Default locking down the account. 'lock_passwd' defaults to True. - # lock account unless lock_password is False. + # As no password specified for the existing user in user-data + # then check if the existing user's hashed password value is + # blank (whether locked or not). + if util.system_is_snappy(): + has_existing_password = self._check_if_existing_password( + name, "/var/lib/extrausers/shadow" + ) + if not has_existing_password: + # Check /etc/shadow also + has_existing_password = ( + self._check_if_existing_password(name) + ) + else: + has_existing_password = self._check_if_existing_password( + name + ) + else: + if "passwd" in kwargs: + ud_password_specified = True + password_key = "passwd" + if not kwargs["passwd"]: + ud_blank_password_specified = True + + # Default locking down the account. 'lock_passwd' defaults to True. + # Lock account unless lock_password is False in which case unlock + # account as long as a password (blank or otherwise) was specified. if kwargs.get("lock_passwd", True): self.lock_passwd(name) + elif has_existing_password or ud_password_specified: + # 'lock_passwd: False' and either existing account already with + # non-blank password or else existing/new account with password + # explicitly set in user-data. + if ud_blank_password_specified: + LOG.debug( + "Allowing unlocking empty password for %s based on empty" + " '%s' in user-data", + name, + password_key, + ) + + # Unlock the existing/new account + self.unlock_passwd(name) + elif pre_existing_user: + # Pre-existing user with no existing password and none + # explicitly set in user-data. + LOG.warning( + "Not unlocking blank password for existing user %s." + " 'lock_passwd: false' present in user-data but no existing" + " password set and no 'plain_text_passwd'/'hashed_passwd'" + " provided in user-data", + name, + ) + else: + # No password (whether blank or otherwise) explicitly set + LOG.warning( + "Not unlocking password for user %s. 'lock_passwd: false'" + " present in user-data but no 'passwd'/'plain_text_passwd'/" + "'hashed_passwd' provided in user-data", + name, + ) # Configure doas access if "doas" in kwargs: @@ -908,6 +1053,50 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to disable password for user %s", name) raise e + def unlock_passwd(self, name: str): + """ + Unlock the password of a user, i.e., enable password logins + """ + # passwd must use short '-u' due to SLES11 lacking long form '--unlock' + unlock_tools = (["passwd", "-u", name], ["usermod", "--unlock", name]) + try: + cmd = next(tool for tool in unlock_tools if subp.which(tool[0])) + except StopIteration as e: + raise RuntimeError( + "Unable to unlock user account '%s'. No tools available. " + " Tried: %s." % (name, [c[0] for c in unlock_tools]) + ) from e + try: + _, err = subp.subp(cmd, rcs=[0, 3]) + except Exception as e: + util.logexc(LOG, "Failed to enable password for user %s", name) + raise e + if err: + # if "passwd" or "usermod" are unable to unlock an account with + # an empty password then they display a message on stdout. In + # that case then instead set a blank password. + passwd_set_tools = ( + ["passwd", "-d", name], + ["usermod", "--password", "''", name], + ) + try: + cmd = next( + tool for tool in passwd_set_tools if subp.which(tool[0]) + ) + except StopIteration as e: + raise RuntimeError( + "Unable to set blank password for user account '%s'. " + "No tools available. " + " Tried: %s." % (name, [c[0] for c in unlock_tools]) + ) from e + try: + subp.subp(cmd) + except Exception as e: + util.logexc( + LOG, "Failed to set blank password for user %s", name + ) + raise e + def expire_passwd(self, user): try: subp.subp(["passwd", "--expire", user]) @@ -942,6 +1131,9 @@ def chpasswd(self, plist_in: list, hashed: bool): ) + "\n" ) + # Need to use the short option name '-e' instead of '--encrypted' + # (which would be more descriptive) since Busybox and SLES 11 + # chpasswd don't know about long names. cmd = ["chpasswd"] + (["-e"] if hashed else []) subp.subp(cmd, data=payload) diff --git a/cloudinit/distros/alpine.py b/cloudinit/distros/alpine.py index 867963f2c78f..cd1323b404da 100644 --- a/cloudinit/distros/alpine.py +++ b/cloudinit/distros/alpine.py @@ -205,16 +205,18 @@ def preferred_ntp_clients(self): return self._preferred_ntp_clients - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: """ Add a user to the system using standard tools On Alpine this may use either 'useradd' or 'adduser' depending on whether the 'shadow' package is installed. + + Returns False if user already exists, otherwise True. """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) - return + return False if "selinux_user" in kwargs: LOG.warning("Ignoring selinux_user parameter for Alpine Linux") @@ -418,6 +420,9 @@ def add_user(self, name, **kwargs): LOG, "Failed to update %s for user %s", shadow_file, name ) + # Indicate that a new user was created + return True + def lock_passwd(self, name): """ Lock the password of a user, i.e., disable password logins @@ -446,6 +451,36 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to disable password for user %s", name) raise e + def unlock_passwd(self, name: str): + """ + Unlock the password of a user, i.e., enable password logins + """ + + # Check whether Shadow's or Busybox's version of 'passwd'. + # If Shadow's 'passwd' is available then use the generic + # lock_passwd function from __init__.py instead. + if not os.path.islink( + "/usr/bin/passwd" + ) or "bbsuid" not in os.readlink("/usr/bin/passwd"): + return super().unlock_passwd(name) + + cmd = ["passwd", "-u", name] + # Busybox's 'passwd', unlike Shadow's 'passwd', errors + # if password is already unlocked: + # + # "passwd: password for user2 is already unlocked" + # + # with exit code 1 + # + # and also does *not* error if no password is set. + try: + _, err = subp.subp(cmd, rcs=[0, 1]) + if re.search(r"is already unlocked", err): + return True + except subp.ProcessExecutionError as e: + util.logexc(LOG, "Failed to unlock password for user %s", name) + raise e + def expire_passwd(self, user): # Check whether Shadow's or Busybox's version of 'passwd'. # If Shadow's 'passwd' is available then use the generic diff --git a/cloudinit/distros/bsd.py b/cloudinit/distros/bsd.py index 5bef9203c3d1..4e9fa1f7f2bd 100644 --- a/cloudinit/distros/bsd.py +++ b/cloudinit/distros/bsd.py @@ -15,6 +15,7 @@ class BSD(distros.Distro): networking_cls = BSDNetworking hostname_conf_fn = "/etc/rc.conf" rc_conf_fn = "/etc/rc.conf" + shadow_fn = "/etc/master.passwd" default_owner = "root:wheel" # This differs from the parent Distro class, which has -P for diff --git a/cloudinit/distros/freebsd.py b/cloudinit/distros/freebsd.py index 2d8fa02fea6d..ccd961159278 100644 --- a/cloudinit/distros/freebsd.py +++ b/cloudinit/distros/freebsd.py @@ -86,7 +86,12 @@ def manage_service( def _get_add_member_to_group_cmd(self, member_name, group_name): return ["pw", "usermod", "-n", member_name, "-G", group_name] - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: + """ + Add a user to the system using standard tools + + Returns False if user already exists, otherwise True. + """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) return False @@ -140,6 +145,28 @@ def add_user(self, name, **kwargs): if passwd_val is not None: self.set_passwd(name, passwd_val, hashed=True) + # Indicate that a new user was created + return True + + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + For FreeBSD (from https://man.freebsd.org/cgi/man.cgi?passwd(5)) a + password field of "" indicates no password, and a password + field value of either "*" or "*LOCKED*" indicate differing forms of + "locked" but with no password defined. + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, "::", ":*:", ":*LOCKED*:", check_file=shadow_file + ) + return status + def expire_passwd(self, user): try: subp.subp(["pw", "usermod", user, "-p", "01-Jan-1970"]) @@ -170,6 +197,13 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to lock password login for user %s", name) raise + def unlock_passwd(self, name): + LOG.debug( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user %s", + name, + ) + def apply_locale(self, locale, out_fn=None): # Adjust the locales value to the new value newconf = StringIO() diff --git a/cloudinit/distros/netbsd.py b/cloudinit/distros/netbsd.py index 972528c6df00..b9e038e895d6 100644 --- a/cloudinit/distros/netbsd.py +++ b/cloudinit/distros/netbsd.py @@ -63,7 +63,12 @@ def __init__(self, name, cfg, paths): def _get_add_member_to_group_cmd(self, member_name, group_name): return ["usermod", "-G", group_name, member_name] - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: + """ + Add a user to the system using standard tools + + Returns False if user already exists, otherwise True. + """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) return False @@ -112,6 +117,33 @@ def add_user(self, name, **kwargs): if passwd_val is not None: self.set_passwd(name, passwd_val, hashed=True) + # Indicate that a new user was created + return True + + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + For NetBSD (from https://man.netbsd.org/passwd.5) a password field + value of either "" or "*************" (13 "*") indicates no password, + a password field prefixed with "*LOCKED*" indicates a locked + password, and a password field of "*LOCKED*" followed by 13 "*" + indicates a locked and blank password. + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, + "::", + ":*************:", + ":*LOCKED**************:", + check_file=shadow_file, + ) + return status + def set_passwd(self, user, passwd, hashed=False): if hashed: hashed_pw = passwd diff --git a/cloudinit/distros/openbsd.py b/cloudinit/distros/openbsd.py index a701580deb15..418e99bc495a 100644 --- a/cloudinit/distros/openbsd.py +++ b/cloudinit/distros/openbsd.py @@ -45,6 +45,25 @@ def manage_service(cls, action: str, service: str, *extra_args, rcs=None): cmd = list(init_cmd) + list(cmds[action]) return subp.subp(cmd, capture=True, rcs=rcs) + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + For OpenBSD (from https://man.openbsd.org/passwd.5) a password field + of "" indicates no password, and password field values of either + "*" or "*************" (13 "*") indicate differing forms of "locked" + but with no password defined. + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, "::", ":*:", ":*************:", check_file=shadow_file + ) + return status + def lock_passwd(self, name): try: subp.subp(["usermod", "-p", "*", name]) @@ -53,7 +72,11 @@ def lock_passwd(self, name): raise def unlock_passwd(self, name): - pass + LOG.debug( + "OpenBSD password lock is not reversible, " + "ignoring unlock for user %s", + name, + ) def _get_pkg_cmd_environ(self): """Return env vars used in OpenBSD package_command operations""" diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py index 039723aaad2b..df52c2c3d2e8 100644 --- a/tests/unittests/distros/test_create_users.py +++ b/tests/unittests/distros/test_create_users.py @@ -16,6 +16,21 @@ def common_mocks(mocker): mocker.patch("cloudinit.distros.util.system_is_snappy", return_value=False) +def _existing_shadow_grep(name: str): + """Return a mock of grep of /etc/shadow call based on username.""" + return mock.call( + ["grep", "-q", "-e", f"^{name}::", "-e", f"^{name}:!:", "/etc/shadow"] + ) + + +def _chpasswdmock(name: str, password: str, hashed: bool = False): + """Return a mock of chpasswd call based on args""" + cmd = ["chpasswd", "-e"] if hashed else ["chpasswd"] + return mock.call( + cmd, data=f"{name}:{password}", logstring=f"chpasswd for {name}" + ) + + def _useradd2call(args: List[str]): # return a mock call for the useradd command in args # with expected 'logstring'. @@ -76,17 +91,98 @@ def dist(self): id="unlocked", ), pytest.param( - {"passwd": "passfoo"}, + {"passwd": "$6$rounds=..."}, + [ + _useradd2call([USER, "--password", "$6$rounds=...", "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="set_implicit_encrypted_password", + ), + pytest.param( + {"passwd": ""}, + [ + _useradd2call([USER, "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="set_empty_passwd_new_user", + ), + pytest.param( + {"plain_text_passwd": "clearfoo"}, + [ + _useradd2call([USER, "-m"]), + _chpasswdmock(USER, "clearfoo"), + mock.call(["passwd", "-l", USER]), + ], + id="set_plain_text_password", + ), + pytest.param( + {"hashed_passwd": "$6$rounds=..."}, + [ + _useradd2call([USER, "-m"]), + _chpasswdmock(USER, "$6$rounds=...", hashed=True), + mock.call(["passwd", "-l", USER]), + ], + id="set_explicitly_hashed_password", + ), + ], + ) + @mock.patch("cloudinit.distros.util.is_user", return_value=False) + def test_create_options( + self, _is_user, m_subp, dist, create_kwargs, expected + ): + dist.create_user(name=USER, **create_kwargs) + assert m_subp.call_args_list == expected + + @pytest.mark.parametrize( + "create_kwargs,expected,expected_logs", + [ + pytest.param( + {"passwd": "$6$rounds=..."}, [ - _useradd2call([USER, "--password", "passfoo", "-m"]), + _existing_shadow_grep(USER), mock.call(["passwd", "-l", USER]), ], - id="set_password", + [ + "'passwd' in user-data is ignored for existing user " + "foo_user" + ], + id="skip_passwd_set_on_existing_user", + ), + pytest.param( + {"plain_text_passwd": "clearfoo"}, + [ + _chpasswdmock(USER, "clearfoo"), + mock.call(["passwd", "-l", USER]), + ], + [], + id="set_plain_text_password_on_existing_user", + ), + pytest.param( + {"hashed_passwd": "$6$rounds=..."}, + [ + _chpasswdmock(USER, "$6$rounds=...", hashed=True), + mock.call(["passwd", "-l", USER]), + ], + [], + id="set_explicitly_hashed_password", ), ], ) - def test_create_options(self, m_subp, dist, create_kwargs, expected): + @mock.patch("cloudinit.distros.util.is_user", return_value=True) + def test_create_passwd_existing_user( + self, + m_is_user, + m_subp, + dist, + create_kwargs, + expected, + expected_logs, + caplog, + ): + """When user exists, don't unlock on empty or locked passwords.""" dist.create_user(name=USER, **create_kwargs) + for log in expected_logs: + assert log in caplog.text assert m_subp.call_args_list == expected @mock.patch("cloudinit.distros.util.is_group") diff --git a/tests/unittests/distros/test_dragonflybsd.py b/tests/unittests/distros/test_dragonflybsd.py index 8a240ea5fa91..8fe8a11aa0df 100644 --- a/tests/unittests/distros/test_dragonflybsd.py +++ b/tests/unittests/distros/test_dragonflybsd.py @@ -1,8 +1,66 @@ # This file is part of cloud-init. See LICENSE file for license information. import cloudinit.util +from cloudinit.distros.dragonflybsd import Distro +from cloudinit.distros.freebsd import FreeBSDNetworking +from tests.unittests.distros import _get_distro from tests.unittests.helpers import mock +M_PATH = "cloudinit.distros." + + +class TestDragonFlyBSD: + @mock.patch(M_PATH + "subp.subp") + def test_add_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("dragonflybsd") + distro.add_user("me2", uid=1234, default=False) + assert [ + mock.call( + [ + "pw", + "useradd", + "-n", + "me2", + "-u", + "1234", + "-d/home/me2", + "-m", + ], + logstring=["pw", "useradd", "-n", "me2", "-d/home/me2", "-m"], + ) + ] == m_subp.call_args_list + + @mock.patch(M_PATH + "subp.subp") + def test_check_existing_password_for_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("dragonflybsd") + distro._check_if_existing_password("me2") + assert [ + mock.call( + [ + "grep", + "-q", + "-e", + "^me2::", + "-e", + "^me2:*:", + "-e", + "^me2:*LOCKED*:", + "/etc/master.passwd", + ] + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, mocker, caplog): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("dragonflybsd") + distro.unlock_passwd("me2") + assert ( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + ) + def test_find_dragonflybsd_part(): assert cloudinit.util.find_freebsd_part("/dev/vbd0s3") == "vbd0s3" diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py index c4c067ead713..6cf962dab54c 100644 --- a/tests/unittests/distros/test_freebsd.py +++ b/tests/unittests/distros/test_freebsd.py @@ -39,6 +39,36 @@ def test_add_user(self, m_subp, mocker): ) ] == m_subp.call_args_list + @mock.patch(M_PATH + "subp.subp") + def test_check_existing_password_for_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("freebsd") + distro._check_if_existing_password("me2") + assert [ + mock.call( + [ + "grep", + "-q", + "-e", + "^me2::", + "-e", + "^me2:*:", + "-e", + "^me2:*LOCKED*:", + "/etc/master.passwd", + ] + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, mocker, caplog): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("freebsd") + distro.unlock_passwd("me2") + assert ( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + ) + class TestDeviceLookUp(CiTestCase): @mock.patch("cloudinit.subp.subp") diff --git a/tests/unittests/distros/test_openbsd.py b/tests/unittests/distros/test_openbsd.py new file mode 100644 index 000000000000..0e28ed5dc00e --- /dev/null +++ b/tests/unittests/distros/test_openbsd.py @@ -0,0 +1,51 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.distros.bsd import BSDNetworking +from cloudinit.distros.openbsd import Distro +from tests.unittests.distros import _get_distro +from tests.unittests.helpers import mock + +M_PATH = "cloudinit.distros.openbsd." + + +class TestOpenBSD: + @mock.patch(M_PATH + "subp.subp") + def test_add_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=BSDNetworking) + distro = _get_distro("openbsd") + distro.add_user("me2", uid=1234, default=False) + assert [ + mock.call( + ["useradd", "-m", "me2"], logstring=["useradd", "-m", "me2"] + ) + ] == m_subp.call_args_list + + @mock.patch(M_PATH + "subp.subp") + def test_check_existing_password_for_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=BSDNetworking) + distro = _get_distro("openbsd") + distro._check_if_existing_password("me2") + assert [ + mock.call( + [ + "grep", + "-q", + "-e", + "^me2::", + "-e", + "^me2:*:", + "-e", + "^me2:*************:", + "/etc/master.passwd", + ] + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, mocker, caplog): + mocker.patch.object(Distro, "networking_cls", spec=BSDNetworking) + distro = _get_distro("openbsd") + distro.unlock_passwd("me2") + assert ( + "OpenBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + )