# HG changeset patch # User Tero Marttila # Date 1395093291 -7200 # Node ID e72f78b8bce496c20838627ed357b506e752fb71 # Parent 93e3b199cbe6f0b6b344d316ee1d38a8e4ffb494 pvl.snmp.vlan: read switch VLAN configuration over SNMP diff -r 93e3b199cbe6 -r e72f78b8bce4 pvl/snmp/vlan.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pvl/snmp/vlan.py Mon Mar 17 23:54:51 2014 +0200 @@ -0,0 +1,76 @@ +""" + Requirements: + memoized-property + + Setup: + ./opt/bin/build-pysnmp-mib -o usr/mibs/Q-BRIDGE-MIB.py etc/mibs/rfc2674_q.mib + + Run: + PYSNMP_MIB_DIRS=usr/mibs/ ./opt/bin/python ... + + SNMP: + Q-BRIDGE-MIB::dot1qVlanCurrentTable + +""" + +from pvl.snmp import snmp +from pvl.snmp.snmp import pysnmp + +from memoized_property import memoized_property + +import logging; log = logging.getLogger('pvl.snmp.vlan') + +DOT1Q_NUM_VLANS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qNumVlans') +DOT1Q_VLAN_CURRENT_EGRESS_PORTS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanCurrentEgressPorts') +DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanCurrentUntaggedPorts') +DOT1Q_VLAN_STATIC_NAME = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanStaticName') + +def portlist (value) : + r""" + Unpack a OctetString bitmask into bit offsets. + + >>> list(portlist('\x80')) + [1] + >>> list(portlist('\x01')) + [8] + """ + + for octet_idx, byte in enumerate(value) : + octet = ord(byte) + + for bit_idx in xrange(0, 8) : + if octet & (1 << (7 - bit_idx)) : + yield octet_idx * 8 + bit_idx + 1 + +class VLANAgent (snmp.SNMPAgent) : + """ + Query VLAN configuration. + """ + + @memoized_property + def count (self) : + for name, value in self.get(DOT1Q_NUM_VLANS) : + # XXX: ???? + if value.tagSet == pysnmp.rfc1905.NoSuchInstance.tagSet : + continue + + return int(value) + + def names (self) : + for idx, data in self.table(DOT1Q_VLAN_STATIC_NAME) : + yield str(data[DOT1Q_VLAN_STATIC_NAME]) + + def vlans (self) : + for idx, data in self.table(DOT1Q_VLAN_CURRENT_EGRESS_PORTS, DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS) : + time, vlan = idx + + egress = set(portlist(data[DOT1Q_VLAN_CURRENT_EGRESS_PORTS])) + untagged = set(portlist(data[DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS])) + + tagged = egress - untagged + + yield int(vlan), (tagged, untagged) + +if __name__ == '__main__': + import doctest + doctest.testmod()