Skip to content

Commit

Permalink
Add types for SNMP security fields.
Browse files Browse the repository at this point in the history
The changes made are to support clients specifying the configuration
without having to know about net-snmp's command line arguments.

ZEN-35109
  • Loading branch information
jpeacock-zenoss committed Oct 28, 2024
1 parent 871b36e commit d015ef7
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 114 deletions.
4 changes: 2 additions & 2 deletions pynetsnmp/SnmpSession.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

"""Backwards compatible API for SnmpSession"""

from __future__ import absolute_import

from . import netsnmp


Expand Down
29 changes: 27 additions & 2 deletions pynetsnmp/conversions.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
from __future__ import absolute_import

from ipaddr import IPAddress


def asOidStr(oid):
"""converts an oid int sequence to an oid string"""
return "." + ".".join([str(x) for x in oid])
return "." + ".".join(str(x) for x in oid)


def asOid(oidStr):
"""converts an OID string into a tuple of integers"""
return tuple([int(x) for x in oidStr.strip(".").split(".")])
return tuple(int(x) for x in oidStr.strip(".").split("."))


def asAgent(ip, port):
"""take a google ipaddr object and port number and produce a net-snmp
agent specification (see the snmpcmd manpage)"""
ip, interface = ip.split("%") if "%" in ip else (ip, None)
address = IPAddress(ip)

if address.version == 4:
return "udp:{}:{}".format(address.compressed, port)

if address.version == 6:
if address.is_link_local:
if interface is None:
raise RuntimeError(
"Cannot create agent specification from link local "
"IPv6 address without an interface"
)
else:
return "udp6:[{}%{}]:{}".format(
address.compressed, interface, port
)
return "udp6:[{}]:{}".format(address.compressed, port)
8 changes: 4 additions & 4 deletions pynetsnmp/netsnmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,17 +684,17 @@ def _doNothingProc(argc, argv, arg):


def parse_args(args, session):
args = [
sys.argv[0],
] + args
args = [sys.argv[0]] + args
argc = len(args)
argv = (c_char_p * argc)()
for i in range(argc):
# snmp_parse_args mutates argv, so create a copy
argv[i] = create_string_buffer(args[i]).raw
# WARNING: Usage of snmp_parse_args call causes memory leak.
if lib.snmp_parse_args(argc, argv, session, "", _doNothingProc) < 0:
raise ArgumentParseError("Unable to parse arguments", " ".join(argv))
raise ArgumentParseError(
"Unable to parse arguments arguments='{}'".format(" ".join(argv))
)
# keep a reference to the args for as long as sess is alive
return argv

Expand Down
111 changes: 111 additions & 0 deletions pynetsnmp/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import absolute_import

from .CONSTANTS import SNMP_VERSION_1, SNMP_VERSION_2c, SNMP_VERSION_3
from .usm import auth_protocols, priv_protocols


class Community(object):
"""
Provides the community based security model for SNMP v1/V2c.
"""

def __init__(self, name, version=SNMP_VERSION_2c):
version = _version_map.get(version)
if version is None:
raise ValueError("Unsupported SNMP version '{}'".format(version))
self.name = name
self.version = version

def getArguments(self):
community = ("-c", str(self.name)) if self.name else ()
return ("-v", self.version) + community


class UsmUser(object):
"""
Provides User-based Security Model configuration for SNMP v3.
"""

def __init__(self, name, auth=None, priv=None, engine=None, context=None):
self.name = name
if not isinstance(auth, (type(None), Authentication)):
raise ValueError("invalid authentication protocol")
self.auth = auth
if not isinstance(auth, (type(None), Privacy)):
raise ValueError("invalid privacy protocol")
self.priv = priv
self.engine = engine
self.context = context
self.version = _version_map.get(SNMP_VERSION_3)

def getArguments(self):
auth = (
("-a", str(self.auth.protocol), "-A", self.auth.passphrase)
if self.auth
else ()
)
if auth:
# The privacy arguments are only given if the authentication
# arguments are also provided.
priv = (
("-x", str(self.priv.protocol), "-X", self.priv.passphrase)
if self.priv
else ()
)
else:
priv = ()
seclevel = ("-l", _sec_level.get((auth, priv), "noAuthNoPriv"))

return (
("-v", self.version)
+ (("-u", self.name) if self.name else ())
+ seclevel
+ auth
+ priv
+ (("-e", self.engine) if self.engine else ())
+ (("-n", self.context) if self.context else ())
)


_sec_level = {(True, True): "authPriv", (True, False): "authNoPriv"}
_version_map = {
SNMP_VERSION_1: "1",
SNMP_VERSION_2c: "2c",
SNMP_VERSION_3: "3",
"v1": "1",
"v2c": "2c",
"v3": "3",
}


class Authentication(object):
"""
Provides the authentication data for UsmUser objects.
"""

def __init__(self, protocol, passphrase):
if protocol is None:
raise ValueError(
"Invalid Authentication protocol '{}'".format(protocol)
)
self.protocol = auth_protocols[protocol]
if not passphrase:
raise ValueError(
"authentication protocol requires an "
"authentication passphrase"
)
self.passphrase = passphrase


class Privacy(object):
"""
Provides the privacy data for UsmUser objects.
"""

def __init__(self, protocol, passphrase):
if protocol is None:
raise ValueError("Invalid Privacy protocol '{}'".format(protocol))
self.protocol = priv_protocols[protocol]
if not passphrase:
raise ValueError("privacy protocol requires a privacy passphrase")
self.passphrase = passphrase
Loading

0 comments on commit d015ef7

Please sign in to comment.