Skip to content

Commit

Permalink
Merge branch 'develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
dasTor authored Aug 27, 2024
2 parents 9e739d1 + eefa4c4 commit 624c4fa
Show file tree
Hide file tree
Showing 10 changed files with 816 additions and 785 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Netmiko aims to accomplish both of these operations and to do it across a very b

## Installation

To install netmiko, simply us pip:
To install netmiko, simply use pip:

```
$ pip install netmiko
Expand Down
1 change: 1 addition & 0 deletions netmiko/arista/arista.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AristaBase(CiscoSSHConnection):

def session_preparation(self) -> None:
"""Prepare the session after the connection has been established."""
self.ansi_escape_codes = True
self._test_channel_read(pattern=self.prompt_pattern)
cmd = "terminal width 511"
self.set_terminal_width(command=cmd, pattern=r"Width set to")
Expand Down
8 changes: 6 additions & 2 deletions netmiko/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2364,6 +2364,8 @@ def strip_ansi_escape_codes(self, string_buffer: str) -> str:
ESC[9999B Move cursor down N-lines (very large value is attempt to move to the
very bottom of the screen)
ESC[c Query Device (used by MikroTik in 'Safe-Mode')
ESC[2004h Enable bracketed paste mode
ESC[2004l Disable bracketed paste mode
:param string_buffer: The string to be processed to remove ANSI escape codes
:type string_buffer: str
Expand Down Expand Up @@ -2397,7 +2399,8 @@ def strip_ansi_escape_codes(self, string_buffer: str) -> str:
code_cursor_up = chr(27) + r"\[\d*A"
code_cursor_down = chr(27) + r"\[\d*B"
code_wrap_around = chr(27) + r"\[\?7h"
code_bracketed_paste_mode = chr(27) + r"\[\?2004h"
code_enable_bracketed_paste_mode = chr(27) + r"\[\?2004h"
code_disable_bracketed_paste_mode = chr(27) + r"\[\?2004l"
code_underline = chr(27) + r"\[4m"
code_query_device = chr(27) + r"\[c"

Expand Down Expand Up @@ -2429,7 +2432,8 @@ def strip_ansi_escape_codes(self, string_buffer: str) -> str:
code_cursor_down,
code_cursor_forward,
code_wrap_around,
code_bracketed_paste_mode,
code_enable_bracketed_paste_mode,
code_disable_bracketed_paste_mode,
code_underline,
code_query_device,
]
Expand Down
2 changes: 1 addition & 1 deletion netmiko/linux/linux_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def remote_md5(
remote_file = self.source_file
remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}"
dest_md5 = self.ssh_ctl_chan._send_command_str(remote_md5_cmd, read_timeout=300)
dest_md5 = self.process_md5(dest_md5).strip()
dest_md5 = self.process_md5(dest_md5.strip()).strip()
return dest_md5

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions netmiko/mikrotik/mikrotik_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def send_command_timing( # type: ignore
command_string=command_string, cmd_verify=cmd_verify, **kwargs
)

def cleanup(self, command: str = "quit") -> None:
"""MikroTik uses 'quit' command instead of 'exit'."""
return super().cleanup(command=command)


class MikrotikRouterOsSSH(MikrotikBase):
"""Mikrotik RouterOS SSH driver."""
Expand Down
127 changes: 98 additions & 29 deletions netmiko/snmp_autodetect.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@

from typing import Optional, Dict, List
from typing.re import Pattern
import asyncio
import re
import socket

try:
from pysnmp.entity.rfc3413.oneliner import cmdgen

SNMP_MODE = "legacy"
except ImportError:
from pysnmp.hlapi.asyncio import cmdgen

SNMP_MODE = "v6_async"
except ImportError:
raise ImportError("pysnmp not installed; please install it: 'pip install pysnmp'")

Expand Down Expand Up @@ -304,12 +311,14 @@ def __init__(
self.snmp_target, timeout=1.5, retries=2
)

def _get_snmpv3(self, oid: str) -> str:
async def _run_query(self, creds: object, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv3 for the specified OID.
Asynchronous getCmd query to the device.
Parameters
----------
creds : UsmUserData or CommunityData object
The authentication credentials.
oid : str
The SNMP OID that you want to get.
Expand All @@ -318,26 +327,82 @@ def _get_snmpv3(self, oid: str) -> str:
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
errorIndication, errorStatus, errorIndex, varBinds = await cmdgen.getCmd(
cmdgen.SnmpEngine(),
creds,
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
cmdgen.ContextData(),
cmdgen.ObjectType(cmdgen.ObjectIdentity(oid)),
)

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
if not errorIndication and varBinds[0][1]:
return str(varBinds[0][1])
return ""

def _get_snmpv3_asyncwr(self, oid: str) -> str:
"""
This is an asynchronous wrapper to call code in newer versions of the pysnmp library
(V6 and later).
"""
return asyncio.run(
self._run_query(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
oid,
)
)

def _get_snmpv3(self, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv3 for the specified OID.
Parameters
----------
oid : str
The SNMP OID that you want to get.
Returns
-------
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
if SNMP_MODE == "legacy":
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""
elif SNMP_MODE == "v6_async":
return self._get_snmpv3_asyncwr(oid=oid)
else:
raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

def _get_snmpv2c_asyncwr(self, oid: str) -> str:
"""
This is an asynchronous wrapper to call code in newer versions of the pysnmp library
(V6 and later).
"""
return asyncio.run(self._run_query(cmdgen.CommunityData(self.community), oid))

def _get_snmpv2c(self, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv2 for the specified OID.
Expand All @@ -352,19 +417,23 @@ def _get_snmpv2c(self, oid: str) -> str:
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.CommunityData(self.community),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)
if SNMP_MODE == "legacy":
cmd_gen = cmdgen.CommandGenerator()
(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.CommunityData(self.community),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)
if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""
elif SNMP_MODE == "v6_async":
return self._get_snmpv2c_asyncwr(oid=oid)
else:
raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

def _get_snmp(self, oid: str) -> str:
"""Wrapper for generic SNMP call."""
Expand Down
6 changes: 2 additions & 4 deletions netmiko/sophos/sophos_sfos_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class SophosSfosSSH(NoEnable, NoConfig, CiscoSSHConnection):
def session_preparation(self) -> None:
"""Prepare the session after the connection has been established."""
self._test_channel_read(pattern=r"Main Menu")
self._test_channel_read(pattern=r"Select Menu Number")
"""
Sophos Firmware Version SFOS 18.0.0 GA-Build339
Expand All @@ -32,9 +32,7 @@ def session_preparation(self) -> None:
Select Menu Number [0-7]:
"""
self.write_channel(SOPHOS_MENU_DEFAULT + self.RETURN)
self._test_channel_read(pattern=r"[#>]")
self.set_base_prompt()
self.send_command_expect("\r", expect_string=r"Select Menu Number")
# Clear the read buffer
time.sleep(0.3 * self.global_delay_factor)
self.clear_buffer()
Expand Down
Loading

0 comments on commit 624c4fa

Please sign in to comment.