From 5e6378aa4e4d8468b6c23a44814a6e8b36b660b6 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Thu, 26 Oct 2023 11:59:00 -0700 Subject: [PATCH 1/2] Updated Adtran Driver - Fixed global_cmd_verify default value and enable mode issues. (#3329) Co-authored-by: Joshua Robinson <51381009+Gatorjosh14@users.noreply.github.com> --- netmiko/adtran/adtran.py | 59 ++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/netmiko/adtran/adtran.py b/netmiko/adtran/adtran.py index 83a6c8403..bf993b260 100644 --- a/netmiko/adtran/adtran.py +++ b/netmiko/adtran/adtran.py @@ -1,16 +1,12 @@ from typing import Any, Optional import re from netmiko.cisco_base_connection import CiscoBaseConnection +from netmiko.exceptions import NetmikoTimeoutException class AdtranOSBase(CiscoBaseConnection): prompt_pattern = r"[>#]" - def __init__(self, *args: Any, **kwargs: Any) -> None: - if kwargs.get("global_cmd_verify") is None: - kwargs["global_cmd_verify"] = False - return super().__init__(*args, **kwargs) - def session_preparation(self) -> None: """Prepare the session after the connection has been established.""" self.ansi_escape_codes = True @@ -31,14 +27,55 @@ def enable( check_state: bool = True, re_flags: int = re.IGNORECASE, ) -> str: - return super().enable( - cmd=cmd, - pattern=pattern, - enable_pattern=enable_pattern, - check_state=check_state, - re_flags=re_flags, + output = "" + msg = ( + "Failed to enter enable mode. Please ensure you pass " + "the 'secret' argument to ConnectHandler." ) + # Check if in enable mode already. + if check_state and self.check_enable_mode(): + return output + + # Send "enable" mode command + self.write_channel(self.normalize_cmd(cmd)) + try: + # Read the command echo + if self.global_cmd_verify is not False: + output += self.read_until_pattern(pattern=re.escape(cmd.strip())) + + # Search for trailing prompt or password pattern + output += self.read_until_prompt_or_pattern( + pattern=pattern, re_flags=re_flags + ) + + # Send the "secret" in response to password pattern + if re.search(pattern, output): + self.write_channel(self.normalize_cmd(self.secret)) + + # Handle the fallback to local authentication case + fallback_pattern = r"Falling back" + new_output = self.read_until_prompt_or_pattern( + pattern=fallback_pattern, re_flags=re_flags + ) + output += new_output + + if "Falling back" in new_output: + self.write_channel(self.normalize_cmd(self.secret)) + output += self.read_until_prompt() + + # Search for terminating pattern if defined + if enable_pattern and not re.search(enable_pattern, output): + output += self.read_until_pattern(pattern=enable_pattern) + else: + if not self.check_enable_mode(): + raise ValueError(msg) + + except NetmikoTimeoutException: + raise ValueError(msg) + + return output + def exit_enable_mode(self, exit_command: str = "disable") -> str: return super().exit_enable_mode(exit_command=exit_command) From 56d2309ffe8f1782c565a0a91522c456c47377d2 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Tue, 31 Oct 2023 09:52:29 -0700 Subject: [PATCH 2/2] Fix session_log failure to hide "no_log" data. (#3331) --- .gitignore | 1 + netmiko/base_connection.py | 18 ++ netmiko/session_log.py | 39 +++- tests/SLOG/cisco881_slog.log | 1 - tests/SLOG/cisco881_slog_append.log | 17 +- tests/SLOG/cisco881_slog_wr.log | 23 ++- tests/SLOG/netmiko.log | 77 +++++++- ...ession_log_append-cisco881_slog_append.log | 17 ++ ...og_append-cisco881_slog_append_compare.log | 15 ++ ...sion_log_bytesio-cisco881_slog_compare.log | 22 +++ ...n_log_custom_session_log-cisco881_slog.log | 15 ++ .../test_session_log_no_log-cisco881_slog.log | 23 +++ ...t_session_log_no_log_cfg-cisco881_slog.log | 36 ++++ ...test_session_log_secrets-cisco881_slog.log | 10 + tests/SLOG/test_unicode-cisco881_slog.log | 8 + tests/conftest.py | 19 +- tests/test_netmiko_session_log.py | 172 ++++++++++++++++-- 17 files changed, 465 insertions(+), 48 deletions(-) create mode 100644 tests/SLOG/test_session_log_append-cisco881_slog_append.log create mode 100644 tests/SLOG/test_session_log_append-cisco881_slog_append_compare.log create mode 100644 tests/SLOG/test_session_log_bytesio-cisco881_slog_compare.log create mode 100644 tests/SLOG/test_session_log_custom_session_log-cisco881_slog.log create mode 100644 tests/SLOG/test_session_log_no_log-cisco881_slog.log create mode 100644 tests/SLOG/test_session_log_no_log_cfg-cisco881_slog.log create mode 100644 tests/SLOG/test_session_log_secrets-cisco881_slog.log create mode 100644 tests/SLOG/test_unicode-cisco881_slog.log diff --git a/.gitignore b/.gitignore index 1980c4777..5399c9ef7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ tests/etc/test_devices.yml tests/etc/commands.yml tests/etc/responses.yml tests/etc/test_devices_exc.yml +tests/SLOG/test_logging_filter_secrets-netmiko.log examples/SECRET_DEVICE_CREDS.py ./build diff --git a/netmiko/base_connection.py b/netmiko/base_connection.py index 12191c391..878b70702 100644 --- a/netmiko/base_connection.py +++ b/netmiko/base_connection.py @@ -102,6 +102,20 @@ def wrapper_decorator(self: "BaseConnection", *args: Any, **kwargs: Any) -> Any: return cast(F, wrapper_decorator) +def flush_session_log(func: F) -> F: + @functools.wraps(func) + def wrapper_decorator(self: "BaseConnection", *args: Any, **kwargs: Any) -> Any: + try: + return_val = func(self, *args, **kwargs) + finally: + # Always flush the session_log + if self.session_log: + self.session_log.flush() + return return_val + + return cast(F, wrapper_decorator) + + def log_writes(func: F) -> F: """Handle both session_log and log of writes.""" @@ -391,6 +405,7 @@ def __init__( elif isinstance(session_log, SessionLog): # SessionLog object self.session_log = session_log + self.session_log.open() else: raise ValueError( "session_log must be a path to a file, a file handle, " @@ -1467,6 +1482,7 @@ def command_echo_read(self, cmd: str, read_timeout: float) -> str: pass return new_data + @flush_session_log @select_cmd_verify def send_command_timing( self, @@ -1615,6 +1631,7 @@ def _prompt_handler(self, auto_find_prompt: bool) -> str: prompt = self.base_prompt return re.escape(prompt.strip()) + @flush_session_log @select_cmd_verify def send_command( self, @@ -2138,6 +2155,7 @@ def send_config_from_file( commands = cfg_file.readlines() return self.send_config_set(commands, **kwargs) + @flush_session_log def send_config_set( self, config_commands: Union[str, Sequence[str], Iterator[str], TextIO, None] = None, diff --git a/netmiko/session_log.py b/netmiko/session_log.py index 9ead1316c..80adea31c 100644 --- a/netmiko/session_log.py +++ b/netmiko/session_log.py @@ -12,6 +12,7 @@ def __init__( file_encoding: str = "utf-8", no_log: Optional[Dict[str, Any]] = None, record_writes: bool = False, + slog_buffer: Optional[io.StringIO] = None, ) -> None: if no_log is None: self.no_log = {} @@ -30,6 +31,13 @@ def __init__( else: self.session_log = None + # In order to ensure all the no_log entries get hidden properly, + # we must first store everying in memory and then write out to file. + # Otherwise, we might miss the data we are supposed to hide (since + # the no_log data potentially spans multiple reads). + if slog_buffer is None: + self.slog_buffer = io.StringIO() + # Ensures last write operations prior to disconnect are recorded. self.fin = False @@ -49,15 +57,30 @@ def open(self) -> None: def close(self) -> None: """Close the session_log file (if it is a file that we opened).""" + self.flush() if self.session_log and self._session_log_close: self.session_log.close() self.session_log = None - def write(self, data: str) -> None: - if self.session_log is not None and len(data) > 0: - # Hide the password and secret in the session_log - for hidden_data in self.no_log.values(): - data = data.replace(hidden_data, "********") + def no_log_filter(self, data: str) -> str: + """Filter content from the session_log.""" + for hidden_data in self.no_log.values(): + data = data.replace(hidden_data, "********") + return data + + def _read_buffer(self) -> str: + self.slog_buffer.seek(0) + data = self.slog_buffer.read() + # Once read, create a new buffer + self.slog_buffer = io.StringIO() + return data + + def flush(self) -> None: + """Force the slog_buffer to be written out to the actual file""" + + if self.session_log is not None: + data = self._read_buffer() + data = self.no_log_filter(data) if isinstance(self.session_log, io.BufferedIOBase): self.session_log.write(write_bytes(data, encoding=self.file_encoding)) @@ -67,4 +90,10 @@ def write(self, data: str) -> None: assert isinstance(self.session_log, io.BufferedIOBase) or isinstance( self.session_log, io.TextIOBase ) + + # Flush the underlying file self.session_log.flush() + + def write(self, data: str) -> None: + if len(data) > 0: + self.slog_buffer.write(data) diff --git a/tests/SLOG/cisco881_slog.log b/tests/SLOG/cisco881_slog.log index 1e00df96e..00bb16745 100644 --- a/tests/SLOG/cisco881_slog.log +++ b/tests/SLOG/cisco881_slog.log @@ -1,4 +1,3 @@ - cisco1> cisco1>terminal width 511 cisco1>terminal length 0 diff --git a/tests/SLOG/cisco881_slog_append.log b/tests/SLOG/cisco881_slog_append.log index 213ca5b50..aaecd375c 100644 --- a/tests/SLOG/cisco881_slog_append.log +++ b/tests/SLOG/cisco881_slog_append.log @@ -1,5 +1,6 @@ Initial file contents + cisco1> cisco1>terminal width 511 cisco1>terminal length 0 @@ -15,19 +16,3 @@ FastEthernet4 10.220.88.20 YES NVRAM up up Vlan1 unassigned YES unset down down cisco1> cisco1>exit -cisco1> -cisco1>terminal width 511 -cisco1>terminal length 0 -cisco1> -cisco1> -Testing password and secret replacement -This is my password ******** -This is my secret ******** - -cisco1> -cisco1>terminal width 511 -cisco1>terminal length 0 -cisco1> -cisco1> -Testing unicode -😁😁 diff --git a/tests/SLOG/cisco881_slog_wr.log b/tests/SLOG/cisco881_slog_wr.log index 71a29c32b..5d735e3f0 100644 --- a/tests/SLOG/cisco881_slog_wr.log +++ b/tests/SLOG/cisco881_slog_wr.log @@ -1,5 +1,6 @@ terminal width 511 + cisco1> cisco1>terminal width 511 cisco1>terminal length 0 @@ -8,12 +9,22 @@ cisco1> cisco1> -cisco1>enable -enable -Password: ******** +cisco1>show foooooooo +show foooooooo + ^ +% Invalid input detected at '^' marker. -cisco1# +cisco1> -cisco1# +cisco1>show ip interface brief +show ip interface brief +Interface IP-Address OK? Method Status Protocol +FastEthernet0 unassigned YES unset down down +FastEthernet1 unassigned YES unset down down +FastEthernet2 unassigned YES unset down down +FastEthernet3 unassigned YES unset down down +FastEthernet4 10.220.88.20 YES NVRAM up up +Vlan1 unassigned YES unset down down +cisco1> -cisco1#exit +cisco1>exit diff --git a/tests/SLOG/netmiko.log b/tests/SLOG/netmiko.log index 3c3c8b275..92b448c2d 100644 --- a/tests/SLOG/netmiko.log +++ b/tests/SLOG/netmiko.log @@ -38,8 +38,8 @@ write_channel: b'terminal width 511\n' read_channel: cisco1> cisco1> -read_channel: terminal widt -read_channel: h 511 +read_channel: terminal wid +read_channel: th 511 cisco1> Pattern found: (terminal width 511) cisco1> @@ -64,6 +64,16 @@ write_channel: b'\n' read_channel: read_channel: cisco1> + +Parenthesis found in pattern. + +pattern: (\#|>) + + +This can be problemtic when used in read_until_pattern(). + +You should ensure that you use either non-capture groups i.e. '(?:' or that the +parenthesis completely wrap the pattern '(pattern)' Pattern found: (\#|>) cisco1> read_channel: @@ -72,8 +82,8 @@ write_channel: b'\n' write_channel: b'terminal width 511\n' read_channel: cisco1> cisco1> -read_channel: terminal wid -read_channel: th 511 +read_channel: terminal widt +read_channel: h 511 cisco1> Pattern found: (terminal width 511) cisco1> cisco1>terminal width 511 @@ -97,6 +107,16 @@ write_channel: b'\n' read_channel: read_channel: cisco1> + +Parenthesis found in pattern. + +pattern: (\#|>) + + +This can be problemtic when used in read_until_pattern(). + +You should ensure that you use either non-capture groups i.e. '(?:' or that the +parenthesis completely wrap the pattern '(pattern)' Pattern found: (\#|>) cisco1> read_channel: @@ -110,8 +130,8 @@ read_channel: [find_prompt()]: prompt is cisco1> write_channel: b'show ip interface brief\n' read_channel: -read_channel: show ip inter -read_channel: face brief +read_channel: show ip interf +read_channel: ace brief Interface IP-Address OK? Method Status Protocol FastEthernet0 unassigned YES unset down down FastEthernet1 unassigned YES unset down down @@ -129,3 +149,48 @@ cisco1> Pattern found: ([>#]) cisco1> write_channel: b'exit\n' +write_channel: b'\n' +write_channel: b'terminal width 511\n' +read_channel: +cisco1> +cisco1> +read_channel: terminal wid +read_channel: th 511 +cisco1> +Pattern found: (terminal width 511) +cisco1> +cisco1>terminal width 511 +In disable_paging +Command: terminal length 0 + +write_channel: b'terminal length 0\n' +read_channel: +read_channel: terminal lengt +read_channel: h 0 +cisco1> +Pattern found: (terminal\ length\ 0) +cisco1>terminal length 0 + +cisco1>terminal length 0 +Exiting disable_paging +read_channel: +Clear buffer detects data in the channel +read_channel: +write_channel: b'\n' +read_channel: +read_channel: +cisco1> + +Parenthesis found in pattern. + +pattern: (\#|>) + + +This can be problemtic when used in read_until_pattern(). + +You should ensure that you use either non-capture groups i.e. '(?:' or that the +parenthesis completely wrap the pattern '(pattern)' +Pattern found: (\#|>) +cisco1> +read_channel: +[find_prompt()]: prompt is cisco1> diff --git a/tests/SLOG/test_session_log_append-cisco881_slog_append.log b/tests/SLOG/test_session_log_append-cisco881_slog_append.log new file mode 100644 index 000000000..40fa67d00 --- /dev/null +++ b/tests/SLOG/test_session_log_append-cisco881_slog_append.log @@ -0,0 +1,17 @@ +Initial file contents + +cisco1> +cisco1>terminal width 511 +cisco1>terminal length 0 +cisco1> +cisco1> +cisco1>show ip interface brief +Interface IP-Address OK? Method Status Protocol +FastEthernet0 unassigned YES unset down down +FastEthernet1 unassigned YES unset down down +FastEthernet2 unassigned YES unset down down +FastEthernet3 unassigned YES unset down down +FastEthernet4 10.220.88.20 YES NVRAM up up +Vlan1 unassigned YES unset down down +cisco1> +cisco1>exit diff --git a/tests/SLOG/test_session_log_append-cisco881_slog_append_compare.log b/tests/SLOG/test_session_log_append-cisco881_slog_append_compare.log new file mode 100644 index 000000000..00bb16745 --- /dev/null +++ b/tests/SLOG/test_session_log_append-cisco881_slog_append_compare.log @@ -0,0 +1,15 @@ +cisco1> +cisco1>terminal width 511 +cisco1>terminal length 0 +cisco1> +cisco1> +cisco1>show ip interface brief +Interface IP-Address OK? Method Status Protocol +FastEthernet0 unassigned YES unset down down +FastEthernet1 unassigned YES unset down down +FastEthernet2 unassigned YES unset down down +FastEthernet3 unassigned YES unset down down +FastEthernet4 10.220.88.20 YES NVRAM up up +Vlan1 unassigned YES unset down down +cisco1> +cisco1>exit diff --git a/tests/SLOG/test_session_log_bytesio-cisco881_slog_compare.log b/tests/SLOG/test_session_log_bytesio-cisco881_slog_compare.log new file mode 100644 index 000000000..ad78858dd --- /dev/null +++ b/tests/SLOG/test_session_log_bytesio-cisco881_slog_compare.log @@ -0,0 +1,22 @@ + +terminal width 511 +cisco1> +cisco1>terminal width 511 +cisco1>terminal length 0 +terminal length 0 +cisco1> + +cisco1> + +cisco1>show ip interface brief +show ip interface brief +Interface IP-Address OK? Method Status Protocol +FastEthernet0 unassigned YES unset down down +FastEthernet1 unassigned YES unset down down +FastEthernet2 unassigned YES unset down down +FastEthernet3 unassigned YES unset down down +FastEthernet4 10.220.88.20 YES NVRAM up up +Vlan1 unassigned YES unset down down +cisco1> + +cisco1>exit diff --git a/tests/SLOG/test_session_log_custom_session_log-cisco881_slog.log b/tests/SLOG/test_session_log_custom_session_log-cisco881_slog.log new file mode 100644 index 000000000..29a7e2fc1 --- /dev/null +++ b/tests/SLOG/test_session_log_custom_session_log-cisco881_slog.log @@ -0,0 +1,15 @@ +cisco1> +cisco1>terminal width 511 +cisco1>******** +cisco1> +cisco1> +Testing password and secret replacement +This is my first secret ******** +This is my second secret ******** +This is my third secret ******** +This is my super secret ******** + +!Testing send_command() and send_command_timing() filtering +cisco1>******** +cisco1>******** +cisco1> \ No newline at end of file diff --git a/tests/SLOG/test_session_log_no_log-cisco881_slog.log b/tests/SLOG/test_session_log_no_log-cisco881_slog.log new file mode 100644 index 000000000..fe1a573f4 --- /dev/null +++ b/tests/SLOG/test_session_log_no_log-cisco881_slog.log @@ -0,0 +1,23 @@ + +terminal width 511 +cisco1> +cisco1>terminal width 511 +cisco1>terminal length 0 +terminal length 0 +cisco1> + +cisco1> + +cisco1>******** +******** +Translating "********" + +% Bad IP address or host name +% Unknown command or computer name, or unable to find computer address +cisco1>******** +******** +% Bad IP address or host name +% Unknown command or computer name, or unable to find computer address +cisco1> + +cisco1>exit diff --git a/tests/SLOG/test_session_log_no_log_cfg-cisco881_slog.log b/tests/SLOG/test_session_log_no_log_cfg-cisco881_slog.log new file mode 100644 index 000000000..1cea6d55a --- /dev/null +++ b/tests/SLOG/test_session_log_no_log_cfg-cisco881_slog.log @@ -0,0 +1,36 @@ + +terminal width 511 +cisco1> +cisco1>terminal width 511 +cisco1>terminal length 0 +terminal length 0 +cisco1> + +cisco1> + +cisco1>enable +enable +Password: 88newclass + +cisco1# + +cisco1# + +cisco1#configure terminal +configure terminal +Enter configuration commands, one per line. End with CNTL/Z. +cisco1(config)# + +cisco1(config)#******** +******** +cisco1(config)#no logging console +no logging console +cisco1(config)# + +cisco1(config)#end +end +cisco1# + +cisco1# + +cisco1#exit diff --git a/tests/SLOG/test_session_log_secrets-cisco881_slog.log b/tests/SLOG/test_session_log_secrets-cisco881_slog.log new file mode 100644 index 000000000..1725a6e5f --- /dev/null +++ b/tests/SLOG/test_session_log_secrets-cisco881_slog.log @@ -0,0 +1,10 @@ +cisco1> +cisco1>terminal width 511 +cisco1>terminal length 0 +cisco1> +cisco1> +Testing password and secret replacement +This is my password ******** +This is my secret ******** + +cisco1>exit diff --git a/tests/SLOG/test_unicode-cisco881_slog.log b/tests/SLOG/test_unicode-cisco881_slog.log new file mode 100644 index 000000000..5fa74b4c6 --- /dev/null +++ b/tests/SLOG/test_unicode-cisco881_slog.log @@ -0,0 +1,8 @@ +cisco1> +cisco1>terminal width 511 +cisco1>terminal length 0 +cisco1> +cisco1> +Testing unicode +😁😁 +cisco1>exit diff --git a/tests/conftest.py b/tests/conftest.py index b5931febb..c8feff91b 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -117,7 +117,7 @@ def net_connect_slog_wr(request): @pytest.fixture(scope="module") def device_slog(request): """ - Create the SSH connection to the remote device. Modify session_log init arguments. + Modify session_log init arguments. Return the netmiko device (not connected) """ @@ -131,6 +131,23 @@ def device_slog(request): return device +@pytest.fixture(scope="function") +def device_slog_test_name(request): + """ + Modify session_log init arguments. + + Return the netmiko device (not connected) + """ + device_under_test = request.config.getoption("test_device") + test_devices = parse_yaml(PWD + "/etc/test_devices.yml") + device = test_devices[device_under_test] + # Fictional secret + device["secret"] = "invalid" + device["verbose"] = False + test_name = request.node.name + return (device, test_name) + + @pytest.fixture(scope="module") def device_failed_key(request): """ diff --git a/tests/test_netmiko_session_log.py b/tests/test_netmiko_session_log.py index bc41398ab..39b13c6bd 100644 --- a/tests/test_netmiko_session_log.py +++ b/tests/test_netmiko_session_log.py @@ -7,6 +7,12 @@ from netmiko.session_log import SessionLog +def add_test_name_to_file_name(initial_fname, test_name): + dir_name, f_name = initial_fname.split("/") + new_file_name = f"{dir_name}/{test_name}-{f_name}" + return new_file_name + + def calc_md5(file_name=None, contents=None): """Compute MD5 hash of file.""" if contents is not None: @@ -98,30 +104,48 @@ def test_session_log_write(net_connect_slog_wr, commands, expected_responses): assert session_log_md5 == compare_log_md5 -def test_session_log_append(device_slog, commands, expected_responses): +def test_session_log_append(device_slog_test_name, commands, expected_responses): """Verify session_log matches expected content, but when channel writes are also logged.""" - session_file = expected_responses["session_log_append"] + + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + slog_file = expected_responses["session_log_append"] + session_file = add_test_name_to_file_name(slog_file, test_name) + # Create a starting file with open(session_file, "wb") as f: f.write(b"Initial file contents\n\n") # The netmiko connection has not been established yet. device_slog["session_log"] = session_file + device_slog["session_log_file_mode"] = "append" conn = ConnectHandler(**device_slog) command = commands["basic"] session_action(conn, command) - compare_file = expected_responses["compare_log_append"] + compare_file_base = expected_responses["compare_log_append"] + dir_name, f_name = compare_file_base.split("/") + compare_file = f"{dir_name}/{test_name}-{f_name}" session_log_md5_append(session_file, compare_file) -def test_session_log_secrets(device_slog): +def test_session_log_secrets(device_slog_test_name): """Verify session_log does not contain password or secret.""" + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + slog_file = device_slog["session_log"] + + new_slog_file = add_test_name_to_file_name(slog_file, test_name) + device_slog["session_log"] = new_slog_file + conn = ConnectHandler(**device_slog) conn.session_log.write("\nTesting password and secret replacement\n") conn.session_log.write("This is my password {}\n".format(conn.password)) conn.session_log.write("This is my secret {}\n".format(conn.secret)) + time.sleep(1) + conn.session_log.flush() + conn.disconnect() file_name = device_slog["session_log"] with open(file_name, "r") as f: @@ -130,15 +154,25 @@ def test_session_log_secrets(device_slog): assert conn.password not in session_log if conn.secret: assert conn.secret not in session_log + assert "terminal width" in session_log -def test_logging_filter_secrets(net_connect_slog_wr): +def test_logging_filter_secrets(device_slog_test_name): """Verify logging DEBUG output does not contain password or secret.""" - nc = net_connect_slog_wr + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + + # No session_log for this test + del device_slog["session_log"] + + # Set the secret to be correct + device_slog["secret"] = device_slog["password"] + + nc = ConnectHandler(**device_slog) # setup logger to output to file - file_name = "SLOG/netmiko.log" + file_name = f"SLOG/{test_name}-netmiko.log" netmikologger = logging.getLogger("netmiko") netmikologger.setLevel(logging.DEBUG) file_handler = logging.FileHandler(file_name) @@ -163,8 +197,15 @@ def test_logging_filter_secrets(net_connect_slog_wr): assert nc.secret not in netmiko_log -def test_unicode(device_slog): +def test_unicode(device_slog_test_name): """Verify that you can write unicode characters into the session_log.""" + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + slog_file = device_slog["session_log"] + + new_slog_file = add_test_name_to_file_name(slog_file, test_name) + device_slog["session_log"] = new_slog_file + conn = ConnectHandler(**device_slog) smiley_face = "\N{grinning face with smiling eyes}" @@ -172,25 +213,33 @@ def test_unicode(device_slog): conn.session_log.write(smiley_face) conn.session_log.write(smiley_face) + conn.disconnect() + file_name = device_slog["session_log"] with open(file_name, "r") as f: session_log = f.read() assert smiley_face in session_log -def test_session_log_bytesio(device_slog, commands, expected_responses): +def test_session_log_bytesio(device_slog_test_name, commands, expected_responses): """Verify session_log matches expected content, but when channel writes are also logged.""" + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + s_log = io.BytesIO() # The netmiko connection has not been established yet. device_slog["session_log"] = s_log device_slog["session_log_file_mode"] = "write" + device_slog["session_log_record_writes"] = True conn = ConnectHandler(**device_slog) command = commands["basic"] session_action(conn, command) + conn.disconnect() compare_file = expected_responses["compare_log"] + compare_file = add_test_name_to_file_name(compare_file, test_name) compare_log_md5 = calc_md5(file_name=compare_file) log_content = s_log.getvalue() @@ -198,18 +247,98 @@ def test_session_log_bytesio(device_slog, commands, expected_responses): assert session_log_md5 == compare_log_md5 -def test_session_log_custom_secrets(device_slog): - """Verify session_log does not contain custom words.""" +def test_session_log_no_log(device_slog_test_name): + """Test no_log works properly for show commands.""" + + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + slog_file = device_slog["session_log"] + new_slog_file = add_test_name_to_file_name(slog_file, test_name) + + # record_writes as well to make sure both the write and read-echo don't get logged. + device_slog["session_log"] = new_slog_file + device_slog["session_log_record_writes"] = True + + conn = ConnectHandler(**device_slog) + + # After connection change the password to "invalid" + fake_password = "invalid" + conn.password = fake_password + + # Now try to actually send the fake password as a show command + conn.send_command(fake_password) + time.sleep(1) + conn.send_command_timing(fake_password) + + with open(new_slog_file, "r") as f: + session_log = f.read() + + assert fake_password not in session_log + # One for read, one for write, one for Cisco "translating" (send_command) + # Plus one for read, one for write (send_command_timing) + assert session_log.count("********") == 5 + + # Do disconnect after test (to make sure send_command() actually flushes session_log buffer) + conn.disconnect() + + +def test_session_log_no_log_cfg(device_slog_test_name, commands): + """Test no_log works properly for config commands.""" + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + slog_file = device_slog["session_log"] + new_slog_file = add_test_name_to_file_name(slog_file, test_name) + + # Likely "logging buffered 20000" (hide/obscure this command) + config_command1 = commands["config"][0] + + # Likely "no logging console" (this command should show up in the log) + config_command2 = commands["config"][1] + + # Dict of strings that should be sanitized + no_log = { + "cfg1": config_command1, + } + # Create custom session log (use 'record_writes' just to test that situation) + custom_log = SessionLog(file_name=new_slog_file, no_log=no_log, record_writes=True) + + # Pass in custom SessionLog obj to session_log attribute + device_slog["session_log"] = custom_log + device_slog["secret"] = device_slog["password"] + + conn = ConnectHandler(**device_slog) + conn.enable() + conn.send_config_set([config_command1, config_command2]) + + # Check the results + with open(new_slog_file, "r") as f: + session_log = f.read() + + assert config_command1 not in session_log + assert session_log.count("********") == 2 + assert config_command2 in session_log + + # Make sure send_config_set flushes the session_log (so disconnect after the asserts) + conn.disconnect() + + +def test_session_log_custom_session_log(device_slog_test_name): + """Verify session_log does not contain custom words (use SessionLog obj).""" + device_slog = device_slog_test_name[0] + test_name = device_slog_test_name[1] + slog_file = device_slog["session_log"] + new_slog_file = add_test_name_to_file_name(slog_file, test_name) + # Dict of words that should be sanitized in session log sanitize_secrets = { "secret1": "admin_username", "secret2": "snmp_auth_secret", "secret3": "snmp_priv_secret", "supersecret": "supersecret", + "data1": "terminal length 0", # Hide something that Netmiko sends } # Create custom session log - custom_log = SessionLog(file_name="SLOG/device_log.log", no_log=sanitize_secrets) - custom_log.open() + custom_log = SessionLog(file_name=new_slog_file, no_log=sanitize_secrets) # Pass in custom SessionLog obj to session_log attribute device_slog["session_log"] = custom_log @@ -228,11 +357,28 @@ def test_session_log_custom_secrets(device_slog): conn.session_log.write( "This is my super secret {}\n".format(sanitize_secrets["supersecret"]) ) + time.sleep(1) + conn.session_log.flush() + + # Use send_command and send_command_timing to send something that should be filtered + conn.session_log.write( + "\n!Testing send_command() and send_command_timing() filtering" + ) + conn.send_command(sanitize_secrets["data1"]) + conn.send_command_timing(sanitize_secrets["data1"]) # Retrieve the file name. file_name = custom_log.file_name with open(file_name, "r") as f: session_log = f.read() + + # Ensure file exists and has logging content + assert "terminal width" in session_log + + # 'terminal length 0' should be hidden in the session_log + assert "terminal length" not in session_log + assert ">********" in session_log + if sanitize_secrets.get("secret1") is not None: assert sanitize_secrets["secret1"] not in session_log if sanitize_secrets.get("secret2") is not None: