rrdweb/pmacct.py
changeset 29 c756e522c9ac
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rrdweb/pmacct.py	Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,239 @@
+"""
+    A pmacct -> rrd runner
+"""
+
+import collections
+import subprocess
+import os.path
+import logging
+
+from rrdweb import rrd
+
+log = logging.getLogger('rrdweb.pmacct')
+
+## pmacct
+# name to invoke `pmacct` as
+PMACCT_CMD = 'pmacct'
+
+# path to in/out client sockets
+PMACCT_IN_SOCK = 'var/pmacct/host-in.sock'
+PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock'
+
+## RRD
+# path to rrd data dir
+RRD_ROOT = 'var/rrd'
+
+# path to per-host RRD file
+HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd'
+
+## RRD create params
+# step interval (in seconds)
+RRD_STEP = 60
+
+# Data Source parameters
+DS_TYPE = 'COUNTER'
+DS_HEARTBEAT = RRD_STEP * 2
+DS_BYTES_MIN = 0
+DS_BYTES_MAX = (1000 ** 3) / 8        # 1gbps
+DS_PKTS_MIN = 0
+DS_PKTS_MAX = DS_BYTES_MAX / 64       # 1gbps @ 64 bytes-per-packet
+
+def pmacct_cmd (*args) :
+    """
+        Invoke the pmacct client, yielding the output as a series of lines.
+    """
+    
+    # argv with name of executable in [0]
+    argv = (PMACCT_CMD, ) + args
+    
+    # invoke
+    proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+    # XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking..
+
+    # interact (nothing on stdin)
+    stdout, stderr = proc.communicate()
+
+    # error-check
+    if proc.returncode :
+        raise Exception("pmacct terminated with status=%d" % (proc.returncode, ))
+
+    # stderr?
+    if stderr :
+        raise Exception("pmacct terminated with errors: %s" % (stderr, ))
+
+    # output
+    for line in stdout.splitlines() :
+        yield line
+
+def pmacct_summary (socket) :
+    """
+        Query and return summary-form data from pacct, by querying the full table and parsing it.
+
+        Yields a series of { field: value } dicts for each row in the table.
+    """
+    
+    # `pmacct -s`
+    output = pmacct_cmd('-p', socket, '-s')
+    
+    # the list of fields, parsed from the first line of output
+    fields = None
+    
+    for line in output :
+        if not fields :
+            # parse first row
+            fields = line.split()
+
+            continue
+        
+        if not line :
+            # end of data
+            break
+
+        # parse field data
+        values = line.split()
+
+        # map by field name
+        data = dict(zip(fields, values))
+
+        yield data
+
+class Host (object) :
+    """
+        Traffic counters for some specific host.
+    """
+
+    __slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets')
+
+    def __init__ (self, ip=None) :
+        self.ip = ip
+
+        self.in_ok = self.out_ok = False
+        self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0
+
+    def __repr__ (self) :
+        return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__)
+
+
+def host_counters (in_table, out_table) :
+    """
+        Returns the full set of in/out traffic counters for all hosts currently in the summary table.
+
+        This processes the two tables given, by their SRC_IP/DST_IP fields.
+
+        For each SRC_IP in the out_table, outgoing traffic is counted.
+        For each DST_IP in the in_table, incoming traffic is counted.
+
+        Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables.
+    """
+    
+    # combined set of hosts
+    hosts = collections.defaultdict(Host)
+    
+    # process incoming
+    for row in in_table :
+        ip = row['DST_IP']
+
+        host = hosts[ip]
+        
+        host.ip = ip
+        host.in_ok = True
+        host.in_bytes += int(row['BYTES'])
+        host.in_packets += int(row['PACKETS'])
+    
+    # process outgoing
+    for row in out_table :
+        ip = row['SRC_IP']
+
+        host = hosts[ip]
+        
+        host.ip = ip
+        host.out_ok = True
+        host.out_bytes += int(row['BYTES'])
+        host.out_packets += int(row['PACKETS'])
+
+    return dict(hosts)
+
+def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') :
+    """
+        Create a new .rrd file for the per-host traffic data at the given path.
+    """
+
+    # data sources
+    ds_list = (
+        # bytes
+        ('in',          ds_type, bytes_min, bytes_max),
+        ('out',         ds_type, bytes_min, bytes_max),
+
+        # packets
+        ('in_pkts',     ds_type, pkts_min, pkts_max),
+        ('out_pkts',    ds_type, pkts_min, pkts_max),
+    )
+
+    HOUR = 60 * 60
+
+    # archives
+    rra_list = (
+        ('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP),  # 1 hour
+    )
+
+    # definitions
+    defs = [
+        (
+            'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max)
+        ) for name, type, min, max in ds_list
+    ] + [
+        (
+            'RRA:%s:%s:%s:%s' % (func, xff, steps, rows)
+        ) for func, xff, steps, rows in rra_list
+    ]
+    
+    # create using the given step
+    rrd.create(path, *defs, step=rrd_step)
+
+def update_host_rrd (path, host) :
+    """
+        Update the .rrd file with the given current counter values.
+    """
+    
+    # values to insert
+    values = (
+        # bytes
+        ('in',          host.in_bytes if host.in_ok else 'U'),
+        ('out',         host.out_bytes if host.out_ok else 'U'),
+
+        # packets
+        ('in_pkts',     host.in_packets if host.in_ok else 'U'),
+        ('out_pkts',    host.out_packets if host.out_ok else 'U'),
+    )
+
+    log.debug("Update %s: %r", path, values)
+
+    # go
+    rrd.update(path, 
+        # new values for current time
+        'N:' + ':'.join(str(value) for ds, value in values),
+        
+        # names of DSs we give values for
+        template    = ':'.join(name for name, value in values),
+    )
+
+def update_host (host, **rrd_opts) :
+    """
+        Update the host's .rrd file in rrd_root with the host's data.
+
+        Creates .rrd files for new hosts automatically, using the given options.
+    """
+
+    # path to .rrd
+    rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts)
+    
+    # create?
+    if not os.path.exists(rrd) :
+        log.info("Create new .rrd for %s: %s", host.ip, rrd)
+
+        create_host_rrd(rrd, **rrd_opts)
+    
+    # update
+    update_host_rrd(rrd, host)
+