rrdweb/pmacct.py
author Tero Marttila <terom@fixme.fi>
Sun, 23 Jan 2011 13:50:13 +0200
changeset 29 c756e522c9ac
permissions -rw-r--r--
pmacct: load pmacct data to rrd
"""
    A pmacct -> rrd runner
"""

import collections
import subprocess
import os.path
import logging

from rrdweb import rrd

log = logging.getLogger('rrdweb.pmacct')

## pmacct
# name to invoke `pmacct` as
PMACCT_CMD = 'pmacct'

# path to in/out client sockets
PMACCT_IN_SOCK = 'var/pmacct/host-in.sock'
PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock'

## RRD
# path to rrd data dir
RRD_ROOT = 'var/rrd'

# path to per-host RRD file
HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd'

## RRD create params
# step interval (in seconds)
RRD_STEP = 60

# Data Source parameters
DS_TYPE = 'COUNTER'
DS_HEARTBEAT = RRD_STEP * 2
DS_BYTES_MIN = 0
DS_BYTES_MAX = (1000 ** 3) / 8        # 1gbps
DS_PKTS_MIN = 0
DS_PKTS_MAX = DS_BYTES_MAX / 64       # 1gbps @ 64 bytes-per-packet

def pmacct_cmd (*args) :
    """
        Invoke the pmacct client, yielding the output as a series of lines.
    """
    
    # argv with name of executable in [0]
    argv = (PMACCT_CMD, ) + args
    
    # invoke
    proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking..

    # interact (nothing on stdin)
    stdout, stderr = proc.communicate()

    # error-check
    if proc.returncode :
        raise Exception("pmacct terminated with status=%d" % (proc.returncode, ))

    # stderr?
    if stderr :
        raise Exception("pmacct terminated with errors: %s" % (stderr, ))

    # output
    for line in stdout.splitlines() :
        yield line

def pmacct_summary (socket) :
    """
        Query and return summary-form data from pacct, by querying the full table and parsing it.

        Yields a series of { field: value } dicts for each row in the table.
    """
    
    # `pmacct -s`
    output = pmacct_cmd('-p', socket, '-s')
    
    # the list of fields, parsed from the first line of output
    fields = None
    
    for line in output :
        if not fields :
            # parse first row
            fields = line.split()

            continue
        
        if not line :
            # end of data
            break

        # parse field data
        values = line.split()

        # map by field name
        data = dict(zip(fields, values))

        yield data

class Host (object) :
    """
        Traffic counters for some specific host.
    """

    __slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets')

    def __init__ (self, ip=None) :
        self.ip = ip

        self.in_ok = self.out_ok = False
        self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0

    def __repr__ (self) :
        return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__)


def host_counters (in_table, out_table) :
    """
        Returns the full set of in/out traffic counters for all hosts currently in the summary table.

        This processes the two tables given, by their SRC_IP/DST_IP fields.

        For each SRC_IP in the out_table, outgoing traffic is counted.
        For each DST_IP in the in_table, incoming traffic is counted.

        Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables.
    """
    
    # combined set of hosts
    hosts = collections.defaultdict(Host)
    
    # process incoming
    for row in in_table :
        ip = row['DST_IP']

        host = hosts[ip]
        
        host.ip = ip
        host.in_ok = True
        host.in_bytes += int(row['BYTES'])
        host.in_packets += int(row['PACKETS'])
    
    # process outgoing
    for row in out_table :
        ip = row['SRC_IP']

        host = hosts[ip]
        
        host.ip = ip
        host.out_ok = True
        host.out_bytes += int(row['BYTES'])
        host.out_packets += int(row['PACKETS'])

    return dict(hosts)

def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') :
    """
        Create a new .rrd file for the per-host traffic data at the given path.
    """

    # data sources
    ds_list = (
        # bytes
        ('in',          ds_type, bytes_min, bytes_max),
        ('out',         ds_type, bytes_min, bytes_max),

        # packets
        ('in_pkts',     ds_type, pkts_min, pkts_max),
        ('out_pkts',    ds_type, pkts_min, pkts_max),
    )

    HOUR = 60 * 60

    # archives
    rra_list = (
        ('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP),  # 1 hour
    )

    # definitions
    defs = [
        (
            'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max)
        ) for name, type, min, max in ds_list
    ] + [
        (
            'RRA:%s:%s:%s:%s' % (func, xff, steps, rows)
        ) for func, xff, steps, rows in rra_list
    ]
    
    # create using the given step
    rrd.create(path, *defs, step=rrd_step)

def update_host_rrd (path, host) :
    """
        Update the .rrd file with the given current counter values.
    """
    
    # values to insert
    values = (
        # bytes
        ('in',          host.in_bytes if host.in_ok else 'U'),
        ('out',         host.out_bytes if host.out_ok else 'U'),

        # packets
        ('in_pkts',     host.in_packets if host.in_ok else 'U'),
        ('out_pkts',    host.out_packets if host.out_ok else 'U'),
    )

    log.debug("Update %s: %r", path, values)

    # go
    rrd.update(path, 
        # new values for current time
        'N:' + ':'.join(str(value) for ds, value in values),
        
        # names of DSs we give values for
        template    = ':'.join(name for name, value in values),
    )

def update_host (host, **rrd_opts) :
    """
        Update the host's .rrd file in rrd_root with the host's data.

        Creates .rrd files for new hosts automatically, using the given options.
    """

    # path to .rrd
    rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts)
    
    # create?
    if not os.path.exists(rrd) :
        log.info("Create new .rrd for %s: %s", host.ip, rrd)

        create_host_rrd(rrd, **rrd_opts)
    
    # update
    update_host_rrd(rrd, host)