Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass SessionLog obj to session_log arg #3308

Closed
wants to merge 10 commits into from
11 changes: 7 additions & 4 deletions netmiko/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ def __init__(
to select smallest of global and specific. Sets default global_delay_factor to .1
(default: True)

:param session_log: File path or BufferedIOBase subclass object to write the session log to.
:param session_log: File path, SessionLog object, or BufferedIOBase subclass object
to write the session log to.

:param session_log_record_writes: The session log generally only records channel reads due
to eliminate command duplication due to command echo. You can enable this if you
Expand Down Expand Up @@ -366,6 +367,7 @@ def __init__(
no_log["password"] = self.password
if self.secret:
no_log["secret"] = self.secret
# Always sanitize username and password
log.addFilter(SecretsFilter(no_log=no_log))

# Netmiko will close the session_log if we open the file
Expand All @@ -386,10 +388,13 @@ def __init__(
no_log=no_log,
record_writes=session_log_record_writes,
)
elif isinstance(session_log, SessionLog):
# SessionLog object
self.session_log = session_log
else:
raise ValueError(
"session_log must be a path to a file, a file handle, "
"or a BufferedIOBase subclass."
"SessionLog object, or a BufferedIOBase subclass."
)

# Default values
Expand Down Expand Up @@ -667,7 +672,6 @@ def read_until_pattern(
start_time = time.time()
# if read_timeout == 0 or 0.0 keep reading indefinitely
while (time.time() - start_time < read_timeout) or (not read_timeout):

output += self.read_channel()

if re.search(pattern, output, flags=re_flags):
Expand Down Expand Up @@ -1447,7 +1451,6 @@ def clear_buffer(
return output

def command_echo_read(self, cmd: str, read_timeout: float) -> str:

# Make sure you read until you detect the command echo (avoid getting out of sync)
new_data = self.read_until_pattern(
pattern=re.escape(cmd), read_timeout=read_timeout
Expand Down
43 changes: 43 additions & 0 deletions tests/test_netmiko_session_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io
import logging
from netmiko import ConnectHandler
from netmiko.session_log import SessionLog


def calc_md5(file_name=None, contents=None):
Expand Down Expand Up @@ -131,6 +132,48 @@ def test_session_log_secrets(device_slog):
assert conn.secret not in session_log


def test_session_log_custom_secrets(device_slog):
"""Verify session_log does not contain custom words."""
# Dict of words that should be sanitized in session log
sanitize_secrets = {
"secret1": "admin_username",
"secret2": "snmp_auth_secret",
"secret3": "snmp_priv_secret",
"supersecret": "supersecret",
}
# Create custom session log
custom_log = SessionLog(file_name="device_log.log", no_log=sanitize_secrets)
# Pass in custom SessionLog obj to session_log attribute
device_slog["session_log"] = custom_log

conn = ConnectHandler(**device_slog)
conn.session_log.write("\nTesting password and secret replacement\n")
conn.session_log.write(
"This is my first secret {}\n".format(sanitize_secrets["secret1"])
)
conn.session_log.write(
"This is my second secret {}\n".format(sanitize_secrets["secret2"])
)
conn.session_log.write(
"This is my third secret {}\n".format(sanitize_secrets["secret3"])
)
conn.session_log.write(
"This is my super secret {}\n".format(sanitize_secrets["supersecret"])
)

file_name = device_slog["session_log"]
with open(file_name, "r") as f:
session_log = f.read()
if sanitize_secrets.get("secret1") is not None:
assert sanitize_secrets["secret1"] not in session_log
if sanitize_secrets.get("secret2") is not None:
assert sanitize_secrets["secret2"] not in session_log
if sanitize_secrets.get("secret3") is not None:
assert sanitize_secrets["secret3"] not in session_log
if sanitize_secrets("supersecret") is not None:
assert sanitize_secrets["supersecret"] not in session_log


def test_logging_filter_secrets(net_connect_slog_wr):
"""Verify logging DEBUG output does not contain password or secret."""

Expand Down
Loading