#!/usr/bin/env python
"""
Requirements:
pysnmp
pysnmp-mibs
memoized-property
Setup:
./opt/bin/build-pysnmp-mib -o usr/mibs/LLDP-MIB.py etc/mibs/LLDP-MIB
Run:
PYSNMP_MIB_DIRS=usr/mibs/ ./opt/bin/python ...
"""
import pvl.args
import pvl.hosts
from pvl.invoke import merge
import logging; log = logging.getLogger('pvl.hosts-snmp')
import optparse
from pysnmp.entity.rfc3413.oneliner import cmdgen as pysnmp
import collections
class SNMPError (Exception) :
pass
class SNMPEngineError (SNMPError) :
"""
Internal SNMP Engine error (?)
"""
class SNMPAgent (object) :
"""
GET SNMP shit from a remote host.
"""
SNMP_PORT = 161
snmp_cmdgen = pysnmp.CommandGenerator()
@classmethod
def apply (cls, options, host, community=None) :
port = cls.SNMP_PORT
if community is None :
community = options.snmp_community
if '@' in host :
community, host = host.split('@', 1)
if ':' in host :
host, port = host.rsplit(':', 1)
return cls(
pysnmp.CommunityData(community),
pysnmp.UdpTransportTarget((host, port))
)
def __init__ (self, security, transport) :
self.security = security
self.transport = transport
def get (self, *request) :
"""
request = (
pysnmp.MibVariable('IF-MIB', 'ifInOctets', 1),
)
"""
opts = dict(
lookupNames = True,
lookupValues = True,
)
try :
error, error_status, error_index, response = self.snmp_cmdgen.getCmd(self.security, self.transport, *request, **opts)
except pysnmp.error.PySnmpError as ex :
raise SNMPEngineError(ex)
if error :
raise SNMPEngineError(error)
if error_status :
raise SNMPError(errorStatus.prettyPrint())
return response
#for name, value in response :
# yield name.getMibSymbol(), value.prettyPrint()
def walk (self, *request) :
"""
request = (
pysnmp.MibVariable('IF-MIB', 'ifInOctets'),
)
"""
opts = dict(
lookupNames = True,
lookupValues = True,
)
try :
error, error_status, error_index, responses = self.snmp_cmdgen.nextCmd(self.security, self.transport, *request, **opts)
except pysnmp.error.PySnmpError as ex :
raise SNMPEngineError(ex)
if error :
raise SNMPEngineError(error)
if error_status :
raise SNMPError(errorStatus.prettyPrint())
return responses
#for response in responses:
# for name, value in response :
# yield name.getMibSymbol(), value.prettyPrint()
def table (self, *columns) :
"""
Given [oid] returns { idx: { oid: value } }
"""
data = collections.defaultdict(dict)
for row in self.walk(*columns) :
log.debug("%s", row)
for column, (field, value) in zip(columns, row) :
mib, sym, idx = field.getMibSymbol()
log.debug("%s::%s[%s]: %s: %s", mib, sym, ', '.join(str(x) for x in idx), column, value)
data[idx][column] = value
return data.items()
from memoized_property import memoized_property
def macaddr (value) :
"""
Excepts a MAC address from an SNMP OctetString.
"""
return ':'.join('{octet:02x}'.format(octet=c) for c in value.asNumbers())
class LLDPAgent (SNMPAgent) :
"""
Query LLDP info from a remote agent.
"""
@classmethod
def _chassis_id (cls, chassis_id, subtype) :
log.debug("%s: %r", subtype, chassis_id)
# XXX: reference names from LLDP-MIB.py
if subtype == 4:
return macaddr(chassis_id)
else :
return chassis_id
@classmethod
def _port_id (cls, port_id, subtype) :
log.debug("%s: %r", subtype, port_id)
# XXX: reference names from LLDP-MIB.py
if subtype == 5: # interfaceName -> IF-MIB::ifName ?
return str(port_id)
elif subtype == 3 : # macAddress
return macaddr(port_id)
elif subtype == 7 : # local
return str(port_id) # XXX: integer?
else :
log.warn("unknown subtype=%d: %r", subtype, port_id)
return port_id
LLDP_LOC_CHASSIS_ID = pysnmp.MibVariable('LLDP-MIB', 'lldpLocChassisId')
LLDP_LOC_CHASSIS_ID_SUBTYPE = pysnmp.MibVariable('LLDP-MIB', 'lldpLocChassisIdSubtype')
LLDP_LOC_SYS_NAME = pysnmp.MibVariable('LLDP-MIB', 'lldpLocSysName')
@memoized_property
def local (self) :
"""
Describe the local system.
"""
for idx, data in self.table(
self.LLDP_LOC_CHASSIS_ID,
self.LLDP_LOC_CHASSIS_ID_SUBTYPE,
self.LLDP_LOC_SYS_NAME,
) :
return {
'chassis': self._chassis_id(data[self.LLDP_LOC_CHASSIS_ID], data[self.LLDP_LOC_CHASSIS_ID_SUBTYPE]),
'sys_name': str(data[self.LLDP_LOC_SYS_NAME]),
}
LLDP_LOC_PORT_ID = pysnmp.MibVariable('LLDP-MIB', 'lldpLocPortId')
LLDP_LOC_PORT_ID_SUBTYPE = pysnmp.MibVariable('LLDP-MIB', 'lldpLocPortIdSubtype')
@memoized_property
def ports (self) :
"""
Describe the local ports.
"""
ports = { }
for idx, data in self.table(
self.LLDP_LOC_PORT_ID,
self.LLDP_LOC_PORT_ID_SUBTYPE,
) :
port, = idx
ports[int(port)] = {
'port': self._port_id(data[self.LLDP_LOC_PORT_ID], data[self.LLDP_LOC_PORT_ID_SUBTYPE]),
}
return ports
def port (self, port) :
return self.ports[int(port)]
LLDP_REM_CHASSIS_ID = pysnmp.MibVariable('LLDP-MIB', 'lldpRemChassisId')
LLDP_REM_CHASSIS_ID_SUBTYPE = pysnmp.MibVariable('LLDP-MIB', 'lldpRemChassisIdSubtype')
LLDP_REM_SYS_NAME = pysnmp.MibVariable('LLDP-MIB', 'lldpRemSysName')
LLDP_REM_PORT_ID = pysnmp.MibVariable('LLDP-MIB', 'lldpRemPortId')
LLDP_REM_PORT_ID_SUBTYPE = pysnmp.MibVariable('LLDP-MIB', 'lldpRemPortIdSubtype')
def remotes (self) :
"""
Describe remote systems, indexed by local port.
"""
for idx, data in self.table(
self.LLDP_REM_CHASSIS_ID,
self.LLDP_REM_CHASSIS_ID_SUBTYPE,
self.LLDP_REM_SYS_NAME,
self.LLDP_REM_PORT_ID,
self.LLDP_REM_PORT_ID_SUBTYPE,
) :
time, port, idx = idx
yield int(port), {
'chassis': self._chassis_id(data[self.LLDP_REM_CHASSIS_ID], data[self.LLDP_REM_CHASSIS_ID_SUBTYPE]),
'sys_name': str(data[self.LLDP_REM_SYS_NAME]),
'port': self._port_id(data[self.LLDP_REM_PORT_ID], data[self.LLDP_REM_PORT_ID_SUBTYPE]),
}
def hosts_lldp (options, hosts) :
"""
Discover LLDP-supporting hosts.
Yields Host, LLDPAgent
"""
for host in hosts :
snmp = host.extensions.get('snmp')
log.debug("%s: %s", host, snmp)
if not snmp :
continue
lldp = LLDPAgent.apply(options, host.fqdn(), community=snmp.get('community'))
try :
local = lldp.local
except SNMPError as ex :
log.warning("%s: %s", host, ex)
continue
if local['sys_name'] != host.host :
log.warning("%s: SNMP sys_name mismatch: %s", host, local['sys_name'])
yield host, lldp
def apply_hosts_lldp (options, hosts) :
"""
Query host LLDP info.
"""
for host, lldp in hosts_lldp(options, hosts) :
log.info("%s: %s", host, lldp.local)
for port, remote in lldp.remotes() :
port = lldp.port(port)
log.info("%s: %s: %s", host, port, remote)
yield host, merge(lldp.local, port), remote
def main (argv) :
"""
SNMP polling.
"""
parser = optparse.OptionParser(main.__doc__)
parser.add_option_group(pvl.args.parser(parser))
parser.add_option_group(pvl.hosts.optparser(parser))
parser.add_option('--snmp-community',
help="SNMPv2 read community")
options, args = parser.parse_args(argv[1:])
pvl.args.apply(options)
# input
hosts = pvl.hosts.apply(options, args)
# apply
for host, local, remote in apply_hosts_lldp(options, hosts) :
print "{host:30} {local:40} <- {remote:40}".format(
host = host,
local = "{local[chassis]:15}[{local[port]}]".format(local=local),
remote = "{remote[chassis]:15}[{remote[port]}]".format(remote=remote),
)
if __name__ == '__main__':
pvl.args.main(main)