--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/snmp/vlan.py Mon Mar 17 23:54:51 2014 +0200
@@ -0,0 +1,76 @@
+"""
+ Requirements:
+ memoized-property
+
+ Setup:
+ ./opt/bin/build-pysnmp-mib -o usr/mibs/Q-BRIDGE-MIB.py etc/mibs/rfc2674_q.mib
+
+ Run:
+ PYSNMP_MIB_DIRS=usr/mibs/ ./opt/bin/python ...
+
+ SNMP:
+ Q-BRIDGE-MIB::dot1qVlanCurrentTable
+
+"""
+
+from pvl.snmp import snmp
+from pvl.snmp.snmp import pysnmp
+
+from memoized_property import memoized_property
+
+import logging; log = logging.getLogger('pvl.snmp.vlan')
+
+DOT1Q_NUM_VLANS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qNumVlans')
+DOT1Q_VLAN_CURRENT_EGRESS_PORTS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanCurrentEgressPorts')
+DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanCurrentUntaggedPorts')
+DOT1Q_VLAN_STATIC_NAME = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanStaticName')
+
+def portlist (value) :
+ r"""
+ Unpack a OctetString bitmask into bit offsets.
+
+ >>> list(portlist('\x80'))
+ [1]
+ >>> list(portlist('\x01'))
+ [8]
+ """
+
+ for octet_idx, byte in enumerate(value) :
+ octet = ord(byte)
+
+ for bit_idx in xrange(0, 8) :
+ if octet & (1 << (7 - bit_idx)) :
+ yield octet_idx * 8 + bit_idx + 1
+
+class VLANAgent (snmp.SNMPAgent) :
+ """
+ Query VLAN configuration.
+ """
+
+ @memoized_property
+ def count (self) :
+ for name, value in self.get(DOT1Q_NUM_VLANS) :
+ # XXX: ????
+ if value.tagSet == pysnmp.rfc1905.NoSuchInstance.tagSet :
+ continue
+
+ return int(value)
+
+ def names (self) :
+ for idx, data in self.table(DOT1Q_VLAN_STATIC_NAME) :
+ yield str(data[DOT1Q_VLAN_STATIC_NAME])
+
+ def vlans (self) :
+ for idx, data in self.table(DOT1Q_VLAN_CURRENT_EGRESS_PORTS, DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS) :
+ time, vlan = idx
+
+ egress = set(portlist(data[DOT1Q_VLAN_CURRENT_EGRESS_PORTS]))
+ untagged = set(portlist(data[DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS]))
+
+ tagged = egress - untagged
+
+ yield int(vlan), (tagged, untagged)
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()