From f602586a98f39cc707ee20e5ef2409c6202221df Mon Sep 17 00:00:00 2001 From: "Mark A. Ziesemer" Date: Sun, 26 Mar 2023 16:28:27 -0500 Subject: [PATCH 1/2] Initial fix for trap receiver. - Fixes exhuma/puresnmp#107 (https://github.com/exhuma/puresnmp/issues/107). --- puresnmp/api/raw.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/puresnmp/api/raw.py b/puresnmp/api/raw.py index 6cd3746..f0eaa8d 100644 --- a/puresnmp/api/raw.py +++ b/puresnmp/api/raw.py @@ -33,7 +33,7 @@ from typing import Type as TType from typing import TypeVar, cast -from x690.types import Integer, Null, ObjectIdentifier, Sequence +from x690.types import Integer, Null, ObjectIdentifier, OctetString, Sequence from x690.types import X690Type as Type from puresnmp.plugins import mpm @@ -897,7 +897,7 @@ async def handler(data: bytes) -> bytes: as_sequence = Sequence.decode(packet.data) - obj = cast(Tuple[Integer, Integer, Trap], as_sequence[0]) + obj = cast(Tuple[Integer, OctetString, Trap], as_sequence) mproc = mpm.create(obj[0].value, handler, lcd) trap = mproc.decode(packet.data, credentials) From d9223e624f1f68119d1be6d4992ebe587f6db9d8 Mon Sep 17 00:00:00 2001 From: "Mark A. Ziesemer" Date: Tue, 4 Apr 2023 13:54:04 -0500 Subject: [PATCH 2/2] Add SNMPv1 trap support. - Contributes to exhuma/puresnmp#36 (https://github.com/exhuma/puresnmp/issues/36). --- puresnmp/pdu.py | 6 ++ puresnmp/v1_trap.py | 122 +++++++++++++++++++++++++++++++++++++ puresnmp_plugins/mpm/v1.py | 5 ++ tests/test_mpm_v1.py | 68 +++++++++++++++++++++ 4 files changed, 201 insertions(+) create mode 100644 puresnmp/v1_trap.py create mode 100644 tests/test_mpm_v1.py diff --git a/puresnmp/pdu.py b/puresnmp/pdu.py index b22d606..07a4d6d 100644 --- a/puresnmp/pdu.py +++ b/puresnmp/pdu.py @@ -279,6 +279,12 @@ class SetRequest(PDU): TAG = 3 +# TAG 4 is a SNMPv1 Trap Message - and specific to SNMPv1, pretty much does not +# relate at all to the typical PDU format here, including no use of 3 of the 4 +# base fields (request-id, error-status, error-index). +# As such, this format is instead handled by v1_trap.TrapV1. + + class BulkGetRequest(PDU): """ Represents a SNMP GetBulk request diff --git a/puresnmp/v1_trap.py b/puresnmp/v1_trap.py new file mode 100644 index 0000000..f195cda --- /dev/null +++ b/puresnmp/v1_trap.py @@ -0,0 +1,122 @@ +""" +Model for a SNMPv1 Trap Message. + +Specific to SNMPv1, these messages pretty much do not relate at all to the +typical PDU format otherwise used throughout SNMP - including no use of 3 of the +4 PDU base fields (request-id, error-status, error-index). + +This class fulfills the otherwise missing format for TAG 4 +(as is commented into pdu.py). +""" + +import dataclasses +import datetime +import enum +import logging +import typing + +import ipaddress +import puresnmp.exc +import puresnmp.types +import puresnmp.varbind +import x690 +import x690.types +import x690.util + +LOG = logging.getLogger(__name__) + + +class GenericTrap(enum.IntEnum): + COLD_START = 0 + WARM_START = 1 + LINK_DOWN = 2 + LINK_UP = 3 + AUTHENTICATION_FAILURE = 4 + EGP_NEIGHBOR_LOSS = 5 + ENTERPRISE_SPECIFIC = 6 + + +@dataclasses.dataclass(frozen=True) +class TrapV1Content: + enterprise: x690.types.ObjectIdentifier + agent_addr: ipaddress.IPv4Address + generic_trap: GenericTrap + specific_trap: int + time_stamp: typing.Optional[datetime.timedelta] + varbinds: typing.List[puresnmp.varbind.VarBind] + + +class TrapV1(x690.types.X690Type[TrapV1Content]): + """ + Represents an SNMPv1 Trap + - https://www.rfc-editor.org/rfc/rfc1157#page-27 + """ + + TYPECLASS = x690.util.TypeClass.CONTEXT + TAG = 4 + + @property + def value(self) -> TrapV1Content: + if not isinstance(self.pyvalue, x690.types._SENTINEL_UNINITIALISED): + return self.pyvalue + self.pyvalue = self.decode_raw(self.raw_bytes, self.bounds) + return self.pyvalue + + @staticmethod + def decode_raw(data: bytes, slc: slice = slice(None)) -> TrapV1Content: + if not data: + raise puresnmp.exc.EmptyMessage("No data to decode!") + enterprise, nxt = x690.decode( + data, slc.start or 0, enforce_type=x690.types.ObjectIdentifier + ) + agent_addr, nxt = x690.decode( + data, nxt, enforce_type=puresnmp.types.IpAddress + ) + generic_trap, nxt = x690.decode( + data, nxt, enforce_type=x690.types.Integer + ) + specific_trap, nxt = x690.decode( + data, nxt, enforce_type=x690.types.Integer + ) + time_stamp, nxt = x690.decode( + data, nxt, enforce_type=puresnmp.types.TimeTicks + ) + values, nxt = x690.decode(data, nxt, enforce_type=x690.types.Sequence) + + if not isinstance(values, x690.types.Sequence): + raise TypeError( + "Values can only be decoded from sequences but got " + "%r instead" % type(values) + ) + + varbinds = [] + for oid, value in values: # type: ignore + oid = typing.cast(x690.types.ObjectIdentifier, oid) # type: ignore + value = typing.cast(x690.types.Type[typing.Any], value) # type: ignore + varbinds.append(puresnmp.varbind.VarBind(oid, value)) + + return TrapV1Content( + enterprise, + agent_addr.value, + GenericTrap(generic_trap.value), + specific_trap.value, + time_stamp.pythonize(), + varbinds, + ) + + def __repr__(self) -> str: + try: + return "%s(%r, %r, %r, %r, %r, %r)" % ( + self.__class__.__name__, + self.value.enterprise, + self.value.agent_addr, + self.value.generic_trap, + self.value.specific_trap, + self.value.time_stamp, + self.value.varbinds, + ) + except: # pylint: disable=bare-except + LOG.exception( + "Exception caught in __repr__ of %s", self.__class__.__name__ + ) + return f"<{self.__class__.__name__} (error-in repr)>" diff --git a/puresnmp_plugins/mpm/v1.py b/puresnmp_plugins/mpm/v1.py index fd26469..b9f2409 100644 --- a/puresnmp_plugins/mpm/v1.py +++ b/puresnmp_plugins/mpm/v1.py @@ -12,12 +12,17 @@ from puresnmp.plugins.mpm import AbstractEncodingResult, MessageProcessingModel from puresnmp.plugins.security import SecurityModel from puresnmp.plugins.security import create as create_sm +from puresnmp.v1_trap import TrapV1 IDENTIFIER = 0 TV1SecModel = SecurityModel[PDU, Sequence] +# Ensure initialization of TrapV1 into the X690Type type registry, +# and to prevent a potential "unused import" warning or removal of the import. +TrapV1 + class V1EncodingResult(AbstractEncodingResult): data: bytes diff --git a/tests/test_mpm_v1.py b/tests/test_mpm_v1.py new file mode 100644 index 0000000..9a46bd2 --- /dev/null +++ b/tests/test_mpm_v1.py @@ -0,0 +1,68 @@ +import datetime +import typing + +import pytest + +import ipaddress +import x690.types + +import puresnmp.credentials +import puresnmp.plugins.mpm +import puresnmp.v1_trap +import puresnmp.varbind + + +# Ignore the "Experimental SNMPv1 support" warning for now, until it can be removed. +@pytest.mark.filterwarnings("ignore::UserWarning") +def test_v1_trap_decode() -> None: + async def handler(data: bytes) -> bytes: + return b"" + + raw_response = ( + b"\x30\x3b\x02\x01" # SNMP + b"\x00" # version: version-1 (0) + b"\x04\x06" + b"\x70\x75\x62\x6c\x69\x63" # community: public + b"\xa4\x2e" # data: trap (4) + b"\x06\x09" # trap + b"\x2b\x06\x01\x04\x01\x81\xfd\x59\x01" # enterprise: 1.3.6.1.4.1.32473.1 (iso.3.6.1.4.1.32473.1) + b"\x40\x04" + b"\xc0\xa8\x00\x01" # agent-addr: 192.168.0.1 + b"\x02\x01" + b"\x06" # generic-trap: enterpriseSpecific (6) + b"\x02\x01" + b"\x7b" # specific-trap: 123 + b"\x43\x02" + b"\x01\xc8" # time-stamp: 456 + b"\x30\x12" + b"\x30\x10" # variable-bindings: 1 item + b"\x06\x0a" + b"\x2b\x06\x01\x04\x01\x81\xfd\x59\x01\x00" # Object Name: 1.3.6.1.4.1.32473.1.0 (iso.3.6.1.4.1.32473.1.0) + b"\x02\x02" + b"\x03\x15" # Value (Integer32): 2 + ) + + lcd: typing.Dict[str, typing.Any] = {} + as_sequence = x690.types.Sequence.decode(raw_response) + obj = typing.cast( + typing.Tuple[ + x690.types.Integer, x690.types.OctetString, puresnmp.v1_trap.TrapV1 + ], + as_sequence, + ) + + instance = puresnmp.plugins.mpm.create(obj[0].value, handler, lcd) + result = instance.decode(raw_response, puresnmp.credentials.V1("public")) + + assert isinstance(result, puresnmp.v1_trap.TrapV1) + t = typing.cast(puresnmp.v1_trap.TrapV1, result) + tv = t.value + assert tv.enterprise == x690.types.ObjectIdentifier("1.3.6.1.4.1.32473.1") + assert tv.agent_addr == ipaddress.IPv4Address("192.168.0.1") + assert tv.generic_trap is puresnmp.v1_trap.GenericTrap.ENTERPRISE_SPECIFIC + assert tv.specific_trap == 123 + assert tv.time_stamp == datetime.timedelta(seconds=4.56) + assert len(tv.varbinds) == 1 + vb = tv.varbinds[0] + assert vb.oid == x690.types.ObjectIdentifier("1.3.6.1.4.1.32473.1.0") + assert vb.value.value == 789