pmacct: load pmacct data to rrd
authorTero Marttila <terom@fixme.fi>
Sun, 23 Jan 2011 13:50:13 +0200
changeset 29 c756e522c9ac
parent 28 89a4d9879171
child 30 38577618daca
pmacct: load pmacct data to rrd
.hgignore
bin/rrdweb-pmacct
doc/pmacct-sflow.conf
doc/pmacct.conf
rrdweb/pmacct.py
rrdweb/rrd.py
--- a/.hgignore	Wed Nov 03 00:51:06 2010 +0200
+++ b/.hgignore	Sun Jan 23 13:50:13 2011 +0200
@@ -10,5 +10,5 @@
 \.pyc$
 \.sw[op]$
 
-# misc env setup
-^rrd$
+# data
+^var$
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/rrdweb-pmacct	Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+"""
+    Import data from pmacct to RRD
+"""
+
+import optparse
+import logging
+
+from rrdweb import pmacct
+
+log = logging.getLogger('rrdweb-pmacct')
+
+def parse_args (args) :
+    global options
+
+    parser = optparse.OptionParser(
+        usage   = "%prog [options]"
+    )
+    
+    # generic
+    parser.add_option('-q', "--quiet", help="No output in normal operation",
+        action='store_const', dest="loglvl", const=logging.WARNING,
+    )
+    parser.add_option('-v', "--verbose", help="More output",
+        action='store_const', dest="loglvl", const=logging.INFO,
+    )
+    parser.add_option('-D', "--debug", help="Even more output",
+        action='store_const', dest="loglvl", const=logging.DEBUG,
+    )
+
+    # paths
+    parser.add_option('-R', "--rrd-dir", default="var/rrd", help="Path to directory containing .rrd files")
+    parser.add_option('-S', "--rrd-step", help="RRD step interval (seconds) for new .rrd's", metavar='STEP')
+
+    parser.add_option('-I', "--in-sock", help="Path to pmacct host-in.sock", metavar='SOCK')
+    parser.add_option('-O', "--out-sock", help="Path to pmacct host-out.sock", metavar='SOCK')
+    
+
+    # defaults
+    parser.set_defaults(
+        loglvl      = logging.INFO,
+    )
+    
+    ## parse
+    options, args = parser.parse_args(args)
+    
+    # validate
+    if not (options.in_sock and options.out_sock) :
+        raise Exception("Both --in-sock and --out-sock are required options")
+
+    ## apply
+    logging.basicConfig(
+        format      = "[%(levelname)8s] %(funcName)20s : %(message)s",
+        level       = options.loglvl,
+    )
+    
+    return args
+
+def get_hosts_data () :
+    """
+        Returns the in/out host tables.
+    """
+
+    log.debug("fetching hosts data from %s + %s", options.in_sock, options.out_sock)
+
+    # read summaries
+    in_table = list(pmacct.pmacct_summary(options.in_sock))
+    out_table = list(pmacct.pmacct_summary(options.out_sock))
+
+    # merge into host data
+    hosts = pmacct.host_counters(in_table, out_table)
+    
+    log.debug("got %d in entries + %d out entries -> %d hosts", len(in_table), len(out_table), len(hosts))
+
+    return hosts.values()
+
+def main (args) :
+    # parse
+    args = parse_args(args)
+
+    # list of Host objects
+    hosts = get_hosts_data()
+    
+    log.debug("full set of host data: %s", hosts)
+    log.info("Updating %d hosts...", len(hosts))
+
+    # update
+    for host in hosts :
+        log.info("Updating host %s...", host.ip)
+
+        pmacct.update_host(host, rrd_root=options.rrd_dir, rrd_step=options.rrd_step)
+
+if __name__ == '__main__' :
+    from sys import argv
+
+    main(argv[1:])
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/doc/pmacct-sflow.conf	Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,4 @@
+
+plugins: sfprobe
+sampling_rate: 10
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/doc/pmacct.conf	Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,23 @@
+! Sample pmacct configuration file for the rrd loader
+
+! normalize data rates from incoming sFlow samples
+sfacctd_renormalize:        true
+sfacctd_ext_sampling_rate:  10
+
+plugins:                memory[in], memory[out], memory[sum]
+
+! src/dst cross-join
+aggregate[sum]:         src_host,dst_host
+aggregate_filter[sum]:  net 194.197.235.0/24
+imt_path[sum]:          var/pmacct/host-sum.sock
+
+! dst ip
+aggregate[in]:          dst_host
+aggregate_filter[in]:   dst net 194.197.235.0/24
+imt_path[in]:           var/pmacct/host-in.sock
+
+! src ip
+aggregate[out]:         src_host
+aggregate_filter[out]:  src net 194.197.235.0/24
+imt_path[out]:          var/pmacct/host-out.sock
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rrdweb/pmacct.py	Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,239 @@
+"""
+    A pmacct -> rrd runner
+"""
+
+import collections
+import subprocess
+import os.path
+import logging
+
+from rrdweb import rrd
+
+log = logging.getLogger('rrdweb.pmacct')
+
+## pmacct
+# name to invoke `pmacct` as
+PMACCT_CMD = 'pmacct'
+
+# path to in/out client sockets
+PMACCT_IN_SOCK = 'var/pmacct/host-in.sock'
+PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock'
+
+## RRD
+# path to rrd data dir
+RRD_ROOT = 'var/rrd'
+
+# path to per-host RRD file
+HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd'
+
+## RRD create params
+# step interval (in seconds)
+RRD_STEP = 60
+
+# Data Source parameters
+DS_TYPE = 'COUNTER'
+DS_HEARTBEAT = RRD_STEP * 2
+DS_BYTES_MIN = 0
+DS_BYTES_MAX = (1000 ** 3) / 8        # 1gbps
+DS_PKTS_MIN = 0
+DS_PKTS_MAX = DS_BYTES_MAX / 64       # 1gbps @ 64 bytes-per-packet
+
+def pmacct_cmd (*args) :
+    """
+        Invoke the pmacct client, yielding the output as a series of lines.
+    """
+    
+    # argv with name of executable in [0]
+    argv = (PMACCT_CMD, ) + args
+    
+    # invoke
+    proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+    # XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking..
+
+    # interact (nothing on stdin)
+    stdout, stderr = proc.communicate()
+
+    # error-check
+    if proc.returncode :
+        raise Exception("pmacct terminated with status=%d" % (proc.returncode, ))
+
+    # stderr?
+    if stderr :
+        raise Exception("pmacct terminated with errors: %s" % (stderr, ))
+
+    # output
+    for line in stdout.splitlines() :
+        yield line
+
+def pmacct_summary (socket) :
+    """
+        Query and return summary-form data from pacct, by querying the full table and parsing it.
+
+        Yields a series of { field: value } dicts for each row in the table.
+    """
+    
+    # `pmacct -s`
+    output = pmacct_cmd('-p', socket, '-s')
+    
+    # the list of fields, parsed from the first line of output
+    fields = None
+    
+    for line in output :
+        if not fields :
+            # parse first row
+            fields = line.split()
+
+            continue
+        
+        if not line :
+            # end of data
+            break
+
+        # parse field data
+        values = line.split()
+
+        # map by field name
+        data = dict(zip(fields, values))
+
+        yield data
+
+class Host (object) :
+    """
+        Traffic counters for some specific host.
+    """
+
+    __slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets')
+
+    def __init__ (self, ip=None) :
+        self.ip = ip
+
+        self.in_ok = self.out_ok = False
+        self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0
+
+    def __repr__ (self) :
+        return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__)
+
+
+def host_counters (in_table, out_table) :
+    """
+        Returns the full set of in/out traffic counters for all hosts currently in the summary table.
+
+        This processes the two tables given, by their SRC_IP/DST_IP fields.
+
+        For each SRC_IP in the out_table, outgoing traffic is counted.
+        For each DST_IP in the in_table, incoming traffic is counted.
+
+        Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables.
+    """
+    
+    # combined set of hosts
+    hosts = collections.defaultdict(Host)
+    
+    # process incoming
+    for row in in_table :
+        ip = row['DST_IP']
+
+        host = hosts[ip]
+        
+        host.ip = ip
+        host.in_ok = True
+        host.in_bytes += int(row['BYTES'])
+        host.in_packets += int(row['PACKETS'])
+    
+    # process outgoing
+    for row in out_table :
+        ip = row['SRC_IP']
+
+        host = hosts[ip]
+        
+        host.ip = ip
+        host.out_ok = True
+        host.out_bytes += int(row['BYTES'])
+        host.out_packets += int(row['PACKETS'])
+
+    return dict(hosts)
+
+def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') :
+    """
+        Create a new .rrd file for the per-host traffic data at the given path.
+    """
+
+    # data sources
+    ds_list = (
+        # bytes
+        ('in',          ds_type, bytes_min, bytes_max),
+        ('out',         ds_type, bytes_min, bytes_max),
+
+        # packets
+        ('in_pkts',     ds_type, pkts_min, pkts_max),
+        ('out_pkts',    ds_type, pkts_min, pkts_max),
+    )
+
+    HOUR = 60 * 60
+
+    # archives
+    rra_list = (
+        ('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP),  # 1 hour
+    )
+
+    # definitions
+    defs = [
+        (
+            'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max)
+        ) for name, type, min, max in ds_list
+    ] + [
+        (
+            'RRA:%s:%s:%s:%s' % (func, xff, steps, rows)
+        ) for func, xff, steps, rows in rra_list
+    ]
+    
+    # create using the given step
+    rrd.create(path, *defs, step=rrd_step)
+
+def update_host_rrd (path, host) :
+    """
+        Update the .rrd file with the given current counter values.
+    """
+    
+    # values to insert
+    values = (
+        # bytes
+        ('in',          host.in_bytes if host.in_ok else 'U'),
+        ('out',         host.out_bytes if host.out_ok else 'U'),
+
+        # packets
+        ('in_pkts',     host.in_packets if host.in_ok else 'U'),
+        ('out_pkts',    host.out_packets if host.out_ok else 'U'),
+    )
+
+    log.debug("Update %s: %r", path, values)
+
+    # go
+    rrd.update(path, 
+        # new values for current time
+        'N:' + ':'.join(str(value) for ds, value in values),
+        
+        # names of DSs we give values for
+        template    = ':'.join(name for name, value in values),
+    )
+
+def update_host (host, **rrd_opts) :
+    """
+        Update the host's .rrd file in rrd_root with the host's data.
+
+        Creates .rrd files for new hosts automatically, using the given options.
+    """
+
+    # path to .rrd
+    rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts)
+    
+    # create?
+    if not os.path.exists(rrd) :
+        log.info("Create new .rrd for %s: %s", host.ip, rrd)
+
+        create_host_rrd(rrd, **rrd_opts)
+    
+    # update
+    update_host_rrd(rrd, host)
+
--- a/rrdweb/rrd.py	Wed Nov 03 00:51:06 2010 +0200
+++ b/rrdweb/rrd.py	Sun Jan 23 13:50:13 2011 +0200
@@ -1,9 +1,13 @@
-import rrdtool
-
 """
     Friendly wrapper around the rrdtool python interface
 """
 
+import rrdtool
+
+import logging
+
+log = logging.getLogger('rrdweb.rrd')
+
 def normalize_option_key (key) :
     """
         Normalize the given option key.
@@ -44,7 +48,7 @@
     
     elif isinstance(value, list) :
         # list of option values
-        return normalize_option_multi(key, value)
+        return tuple(normalize_option_multi(key, value))
 
     else :
         # option value
@@ -68,10 +72,10 @@
         Run the given rrdtool.* function, formatting the given positional arguments and options.
     """
     
-    # series of (cmd-arg, cmd-arg, ...) tuples
+    # series of (cmd-arg, cmd-arg, ...) tuples, giving all '--opt' and 'value' arguments for each keyword argument
     opt_items = (normalize_option(key, value) for key, value in opts.iteritems())
 
-    # decomposed series of cmd-args
+    # decomposed series of cmd-args for options
     opt_args = [item for items in opt_items for item in items]
 
     # positional arguments
@@ -79,10 +83,36 @@
     post_args = [str(arg) for arg in post_args]
 
     # full arguments
-    args = pre_args + opt_args + post_args
+    args = pre_args + opt_args + ['--'] + post_args
+
+    log.debug('rrdtool %s %s', func.__name__, args)
 
     return func(*args)
 
 def graph (out_path, *args, **opts) :
+    """
+        Create a graph from data stored in one or several RRDs.
+
+        Graph image output is written to the given path.
+
+        Returns... something to do with the image's dimensions, or even the data itself?
+    """
+
     return run_cmd(rrdtool.graph, (out_path, ), opts, args)
 
+def create (rrd_path, *args, **opts) :
+    """
+        Set up a new Round Robin Database (RRD).
+    """
+
+    return run_cmd(rrdtool.create, (rrd_path, ), opts, args)
+
+def update (rrd_path, *args, **opts) :
+    """
+        Store new data values into an RRD.
+    """
+
+    return run_cmd(rrdtool.update, (rrd_path, ), opts, args)
+
+
+