Skip to content

Commit

Permalink
Adding support for pysnmp version6 (#3473)
Browse files Browse the repository at this point in the history
Co-authored-by: Tonygratta <[email protected]>
  • Loading branch information
ktbyers and Tonygratta authored Aug 8, 2024
1 parent 4841ecd commit f0041bb
Show file tree
Hide file tree
Showing 3 changed files with 799 additions and 777 deletions.
127 changes: 98 additions & 29 deletions netmiko/snmp_autodetect.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@

from typing import Optional, Dict, List
from typing.re import Pattern
import asyncio
import re
import socket

try:
from pysnmp.entity.rfc3413.oneliner import cmdgen

SNMP_MODE = "legacy"
except ImportError:
from pysnmp.hlapi.asyncio import cmdgen

SNMP_MODE = "v6_async"
except ImportError:
raise ImportError("pysnmp not installed; please install it: 'pip install pysnmp'")

Expand Down Expand Up @@ -304,12 +311,14 @@ def __init__(
self.snmp_target, timeout=1.5, retries=2
)

def _get_snmpv3(self, oid: str) -> str:
async def _run_query(self, creds: object, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv3 for the specified OID.
Asynchronous getCmd query to the device.
Parameters
----------
creds : UsmUserData or CommunityData object
The authentication credentials.
oid : str
The SNMP OID that you want to get.
Expand All @@ -318,26 +327,82 @@ def _get_snmpv3(self, oid: str) -> str:
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
errorIndication, errorStatus, errorIndex, varBinds = await cmdgen.getCmd(
cmdgen.SnmpEngine(),
creds,
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
cmdgen.ContextData(),
cmdgen.ObjectType(cmdgen.ObjectIdentity(oid)),
)

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
if not errorIndication and varBinds[0][1]:
return str(varBinds[0][1])
return ""

def _get_snmpv3_asyncwr(self, oid: str) -> str:
"""
This is an asynchronous wrapper to call code in newer versions of the pysnmp library
(V6 and later).
"""
return asyncio.run(
self._run_query(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
oid,
)
)

def _get_snmpv3(self, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv3 for the specified OID.
Parameters
----------
oid : str
The SNMP OID that you want to get.
Returns
-------
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
if SNMP_MODE == "legacy":
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.UsmUserData(
self.user,
self.auth_key,
self.encrypt_key,
authProtocol=self.auth_proto,
privProtocol=self.encryp_proto,
),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""
elif SNMP_MODE == "v6_async":
return self._get_snmpv3_asyncwr(oid=oid)
else:
raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

def _get_snmpv2c_asyncwr(self, oid: str) -> str:
"""
This is an asynchronous wrapper to call code in newer versions of the pysnmp library
(V6 and later).
"""
return asyncio.run(self._run_query(cmdgen.CommunityData(self.community), oid))

def _get_snmpv2c(self, oid: str) -> str:
"""
Try to send an SNMP GET operation using SNMPv2 for the specified OID.
Expand All @@ -352,19 +417,23 @@ def _get_snmpv2c(self, oid: str) -> str:
string : str
The string as part of the value from the OID you are trying to retrieve.
"""
cmd_gen = cmdgen.CommandGenerator()

(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.CommunityData(self.community),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)
if SNMP_MODE == "legacy":
cmd_gen = cmdgen.CommandGenerator()
(error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd(
cmdgen.CommunityData(self.community),
self.udp_transport_target,
oid,
lookupNames=True,
lookupValues=True,
)
if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""

if not error_detected and snmp_data[0][1]:
return str(snmp_data[0][1])
return ""
elif SNMP_MODE == "v6_async":
return self._get_snmpv2c_asyncwr(oid=oid)
else:
raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'")

def _get_snmp(self, oid: str) -> str:
"""Wrapper for generic SNMP call."""
Expand Down
Loading

0 comments on commit f0041bb

Please sign in to comment.