"""
Requirements:
pysnmp
pysnmp-mibs
"""
import logging; log = logging.getLogger('pvl.snmp.snmp')
import optparse
from pysnmp.entity.rfc3413.oneliner import cmdgen as pysnmp
import collections
def options (parser) :
parser = optparse.OptionGroup(parser, "SNMP Agent")
parser.add_option('--snmp-community',
help="SNMPv2 read community")
return parser
class SNMPError (Exception) :
pass
class SNMPEngineError (SNMPError) :
"""
Internal SNMP Engine error (?)
"""
class SNMPAgent (object) :
"""
GET SNMP shit from a remote host.
"""
SNMP_PORT = 161
snmp_cmdgen = pysnmp.CommandGenerator()
@classmethod
def apply (cls, options, host, community=None) :
port = cls.SNMP_PORT
if community is None :
community = options.snmp_community
if '@' in host :
community, host = host.split('@', 1)
if ':' in host :
host, port = host.rsplit(':', 1)
return cls(
pysnmp.CommunityData(community),
pysnmp.UdpTransportTarget((host, port))
)
def __init__ (self, security, transport) :
self.security = security
self.transport = transport
def get (self, *request) :
"""
request = (
pysnmp.MibVariable('IF-MIB', 'ifInOctets', 1),
)
"""
opts = dict(
lookupNames = True,
lookupValues = True,
)
try :
error, error_status, error_index, response = self.snmp_cmdgen.getCmd(self.security, self.transport, *request, **opts)
except pysnmp.error.PySnmpError as ex :
raise SNMPEngineError(ex)
if error :
raise SNMPEngineError(error)
if error_status :
raise SNMPError(errorStatus.prettyPrint())
return response
#for name, value in response :
# yield name.getMibSymbol(), value.prettyPrint()
def walk (self, *request) :
"""
request = (
pysnmp.MibVariable('IF-MIB', 'ifInOctets'),
)
"""
opts = dict(
lookupNames = True,
lookupValues = True,
)
try :
error, error_status, error_index, responses = self.snmp_cmdgen.nextCmd(self.security, self.transport, *request, **opts)
except pysnmp.error.PySnmpError as ex :
raise SNMPEngineError(ex)
if error :
raise SNMPEngineError(error)
if error_status :
raise SNMPError(errorStatus.prettyPrint())
return responses
#for response in responses:
# for name, value in response :
# yield name.getMibSymbol(), value.prettyPrint()
def table (self, *columns) :
"""
Given [oid] returns { idx: { oid: value } }
"""
data = collections.defaultdict(dict)
for row in self.walk(*columns) :
log.debug("%s", row)
for column, (field, value) in zip(columns, row) :
mib, sym, idx = field.getMibSymbol()
log.debug("%s::%s[%s]: %s: %s", mib, sym, ', '.join(str(x) for x in idx), column, value)
data[idx][column] = value
return data.items()