pvl.snmp.vlan: read switch VLAN configuration over SNMP
authorTero Marttila <terom@paivola.fi>
Mon, 17 Mar 2014 23:54:51 +0200
changeset 390 e72f78b8bce4
parent 389 93e3b199cbe6
child 391 e2c367b1556f
pvl.snmp.vlan: read switch VLAN configuration over SNMP
pvl/snmp/vlan.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pvl/snmp/vlan.py	Mon Mar 17 23:54:51 2014 +0200
@@ -0,0 +1,76 @@
+"""
+    Requirements:
+        memoized-property
+
+    Setup:
+        ./opt/bin/build-pysnmp-mib -o usr/mibs/Q-BRIDGE-MIB.py etc/mibs/rfc2674_q.mib
+
+    Run:
+        PYSNMP_MIB_DIRS=usr/mibs/ ./opt/bin/python ...
+
+    SNMP:
+        Q-BRIDGE-MIB::dot1qVlanCurrentTable
+
+"""
+
+from pvl.snmp import snmp
+from pvl.snmp.snmp import pysnmp
+
+from memoized_property import memoized_property
+
+import logging; log = logging.getLogger('pvl.snmp.vlan')
+
+DOT1Q_NUM_VLANS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qNumVlans')
+DOT1Q_VLAN_CURRENT_EGRESS_PORTS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanCurrentEgressPorts')
+DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanCurrentUntaggedPorts')
+DOT1Q_VLAN_STATIC_NAME = pysnmp.MibVariable('Q-BRIDGE-MIB', 'dot1qVlanStaticName')
+
+def portlist (value) :
+    r"""
+        Unpack a OctetString bitmask into bit offsets.
+
+        >>> list(portlist('\x80'))
+        [1]
+        >>> list(portlist('\x01'))
+        [8]
+    """
+
+    for octet_idx, byte in enumerate(value) :
+        octet = ord(byte)
+
+        for bit_idx in xrange(0, 8) :
+            if octet & (1 << (7 - bit_idx)) :
+                yield octet_idx * 8 + bit_idx + 1
+
+class VLANAgent (snmp.SNMPAgent) :
+    """
+        Query VLAN configuration.
+    """
+
+    @memoized_property
+    def count (self) :
+        for name, value in self.get(DOT1Q_NUM_VLANS) :
+            # XXX: ????
+            if value.tagSet == pysnmp.rfc1905.NoSuchInstance.tagSet :
+                continue
+
+            return int(value)
+
+    def names (self) :
+        for idx, data in self.table(DOT1Q_VLAN_STATIC_NAME) :
+            yield str(data[DOT1Q_VLAN_STATIC_NAME])
+
+    def vlans (self) :
+        for idx, data in self.table(DOT1Q_VLAN_CURRENT_EGRESS_PORTS, DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS) :
+            time, vlan = idx
+
+            egress = set(portlist(data[DOT1Q_VLAN_CURRENT_EGRESS_PORTS]))
+            untagged = set(portlist(data[DOT1Q_VLAN_CURRENT_UNTAGGED_PORTS]))
+
+            tagged = egress - untagged
+            
+            yield int(vlan), (tagged, untagged)
+
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()