# HG changeset patch # User Tero Marttila # Date 1295783413 -7200 # Node ID c756e522c9ac13f009f8556243efdca319aedd3e # Parent 89a4d9879171b0ae617d3a08ad347d82f722c9cc pmacct: load pmacct data to rrd diff -r 89a4d9879171 -r c756e522c9ac .hgignore --- a/.hgignore Wed Nov 03 00:51:06 2010 +0200 +++ b/.hgignore Sun Jan 23 13:50:13 2011 +0200 @@ -10,5 +10,5 @@ \.pyc$ \.sw[op]$ -# misc env setup -^rrd$ +# data +^var$ diff -r 89a4d9879171 -r c756e522c9ac bin/rrdweb-pmacct --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/rrdweb-pmacct Sun Jan 23 13:50:13 2011 +0200 @@ -0,0 +1,96 @@ +#!/usr/bin/env python +""" + Import data from pmacct to RRD +""" + +import optparse +import logging + +from rrdweb import pmacct + +log = logging.getLogger('rrdweb-pmacct') + +def parse_args (args) : + global options + + parser = optparse.OptionParser( + usage = "%prog [options]" + ) + + # generic + parser.add_option('-q', "--quiet", help="No output in normal operation", + action='store_const', dest="loglvl", const=logging.WARNING, + ) + parser.add_option('-v', "--verbose", help="More output", + action='store_const', dest="loglvl", const=logging.INFO, + ) + parser.add_option('-D', "--debug", help="Even more output", + action='store_const', dest="loglvl", const=logging.DEBUG, + ) + + # paths + parser.add_option('-R', "--rrd-dir", default="var/rrd", help="Path to directory containing .rrd files") + parser.add_option('-S', "--rrd-step", help="RRD step interval (seconds) for new .rrd's", metavar='STEP') + + parser.add_option('-I', "--in-sock", help="Path to pmacct host-in.sock", metavar='SOCK') + parser.add_option('-O', "--out-sock", help="Path to pmacct host-out.sock", metavar='SOCK') + + + # defaults + parser.set_defaults( + loglvl = logging.INFO, + ) + + ## parse + options, args = parser.parse_args(args) + + # validate + if not (options.in_sock and options.out_sock) : + raise Exception("Both --in-sock and --out-sock are required options") + + ## apply + logging.basicConfig( + format = "[%(levelname)8s] %(funcName)20s : %(message)s", + level = options.loglvl, + ) + + return args + +def get_hosts_data () : + """ + Returns the in/out host tables. + """ + + log.debug("fetching hosts data from %s + %s", options.in_sock, options.out_sock) + + # read summaries + in_table = list(pmacct.pmacct_summary(options.in_sock)) + out_table = list(pmacct.pmacct_summary(options.out_sock)) + + # merge into host data + hosts = pmacct.host_counters(in_table, out_table) + + log.debug("got %d in entries + %d out entries -> %d hosts", len(in_table), len(out_table), len(hosts)) + + return hosts.values() + +def main (args) : + # parse + args = parse_args(args) + + # list of Host objects + hosts = get_hosts_data() + + log.debug("full set of host data: %s", hosts) + log.info("Updating %d hosts...", len(hosts)) + + # update + for host in hosts : + log.info("Updating host %s...", host.ip) + + pmacct.update_host(host, rrd_root=options.rrd_dir, rrd_step=options.rrd_step) + +if __name__ == '__main__' : + from sys import argv + + main(argv[1:]) diff -r 89a4d9879171 -r c756e522c9ac doc/pmacct-sflow.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/pmacct-sflow.conf Sun Jan 23 13:50:13 2011 +0200 @@ -0,0 +1,4 @@ + +plugins: sfprobe +sampling_rate: 10 + diff -r 89a4d9879171 -r c756e522c9ac doc/pmacct.conf --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/pmacct.conf Sun Jan 23 13:50:13 2011 +0200 @@ -0,0 +1,23 @@ +! Sample pmacct configuration file for the rrd loader + +! normalize data rates from incoming sFlow samples +sfacctd_renormalize: true +sfacctd_ext_sampling_rate: 10 + +plugins: memory[in], memory[out], memory[sum] + +! src/dst cross-join +aggregate[sum]: src_host,dst_host +aggregate_filter[sum]: net 194.197.235.0/24 +imt_path[sum]: var/pmacct/host-sum.sock + +! dst ip +aggregate[in]: dst_host +aggregate_filter[in]: dst net 194.197.235.0/24 +imt_path[in]: var/pmacct/host-in.sock + +! src ip +aggregate[out]: src_host +aggregate_filter[out]: src net 194.197.235.0/24 +imt_path[out]: var/pmacct/host-out.sock + diff -r 89a4d9879171 -r c756e522c9ac rrdweb/pmacct.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rrdweb/pmacct.py Sun Jan 23 13:50:13 2011 +0200 @@ -0,0 +1,239 @@ +""" + A pmacct -> rrd runner +""" + +import collections +import subprocess +import os.path +import logging + +from rrdweb import rrd + +log = logging.getLogger('rrdweb.pmacct') + +## pmacct +# name to invoke `pmacct` as +PMACCT_CMD = 'pmacct' + +# path to in/out client sockets +PMACCT_IN_SOCK = 'var/pmacct/host-in.sock' +PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock' + +## RRD +# path to rrd data dir +RRD_ROOT = 'var/rrd' + +# path to per-host RRD file +HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd' + +## RRD create params +# step interval (in seconds) +RRD_STEP = 60 + +# Data Source parameters +DS_TYPE = 'COUNTER' +DS_HEARTBEAT = RRD_STEP * 2 +DS_BYTES_MIN = 0 +DS_BYTES_MAX = (1000 ** 3) / 8 # 1gbps +DS_PKTS_MIN = 0 +DS_PKTS_MAX = DS_BYTES_MAX / 64 # 1gbps @ 64 bytes-per-packet + +def pmacct_cmd (*args) : + """ + Invoke the pmacct client, yielding the output as a series of lines. + """ + + # argv with name of executable in [0] + argv = (PMACCT_CMD, ) + args + + # invoke + proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking.. + + # interact (nothing on stdin) + stdout, stderr = proc.communicate() + + # error-check + if proc.returncode : + raise Exception("pmacct terminated with status=%d" % (proc.returncode, )) + + # stderr? + if stderr : + raise Exception("pmacct terminated with errors: %s" % (stderr, )) + + # output + for line in stdout.splitlines() : + yield line + +def pmacct_summary (socket) : + """ + Query and return summary-form data from pacct, by querying the full table and parsing it. + + Yields a series of { field: value } dicts for each row in the table. + """ + + # `pmacct -s` + output = pmacct_cmd('-p', socket, '-s') + + # the list of fields, parsed from the first line of output + fields = None + + for line in output : + if not fields : + # parse first row + fields = line.split() + + continue + + if not line : + # end of data + break + + # parse field data + values = line.split() + + # map by field name + data = dict(zip(fields, values)) + + yield data + +class Host (object) : + """ + Traffic counters for some specific host. + """ + + __slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets') + + def __init__ (self, ip=None) : + self.ip = ip + + self.in_ok = self.out_ok = False + self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0 + + def __repr__ (self) : + return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__) + + +def host_counters (in_table, out_table) : + """ + Returns the full set of in/out traffic counters for all hosts currently in the summary table. + + This processes the two tables given, by their SRC_IP/DST_IP fields. + + For each SRC_IP in the out_table, outgoing traffic is counted. + For each DST_IP in the in_table, incoming traffic is counted. + + Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables. + """ + + # combined set of hosts + hosts = collections.defaultdict(Host) + + # process incoming + for row in in_table : + ip = row['DST_IP'] + + host = hosts[ip] + + host.ip = ip + host.in_ok = True + host.in_bytes += int(row['BYTES']) + host.in_packets += int(row['PACKETS']) + + # process outgoing + for row in out_table : + ip = row['SRC_IP'] + + host = hosts[ip] + + host.ip = ip + host.out_ok = True + host.out_bytes += int(row['BYTES']) + host.out_packets += int(row['PACKETS']) + + return dict(hosts) + +def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') : + """ + Create a new .rrd file for the per-host traffic data at the given path. + """ + + # data sources + ds_list = ( + # bytes + ('in', ds_type, bytes_min, bytes_max), + ('out', ds_type, bytes_min, bytes_max), + + # packets + ('in_pkts', ds_type, pkts_min, pkts_max), + ('out_pkts', ds_type, pkts_min, pkts_max), + ) + + HOUR = 60 * 60 + + # archives + rra_list = ( + ('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP), # 1 hour + ) + + # definitions + defs = [ + ( + 'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max) + ) for name, type, min, max in ds_list + ] + [ + ( + 'RRA:%s:%s:%s:%s' % (func, xff, steps, rows) + ) for func, xff, steps, rows in rra_list + ] + + # create using the given step + rrd.create(path, *defs, step=rrd_step) + +def update_host_rrd (path, host) : + """ + Update the .rrd file with the given current counter values. + """ + + # values to insert + values = ( + # bytes + ('in', host.in_bytes if host.in_ok else 'U'), + ('out', host.out_bytes if host.out_ok else 'U'), + + # packets + ('in_pkts', host.in_packets if host.in_ok else 'U'), + ('out_pkts', host.out_packets if host.out_ok else 'U'), + ) + + log.debug("Update %s: %r", path, values) + + # go + rrd.update(path, + # new values for current time + 'N:' + ':'.join(str(value) for ds, value in values), + + # names of DSs we give values for + template = ':'.join(name for name, value in values), + ) + +def update_host (host, **rrd_opts) : + """ + Update the host's .rrd file in rrd_root with the host's data. + + Creates .rrd files for new hosts automatically, using the given options. + """ + + # path to .rrd + rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts) + + # create? + if not os.path.exists(rrd) : + log.info("Create new .rrd for %s: %s", host.ip, rrd) + + create_host_rrd(rrd, **rrd_opts) + + # update + update_host_rrd(rrd, host) + diff -r 89a4d9879171 -r c756e522c9ac rrdweb/rrd.py --- a/rrdweb/rrd.py Wed Nov 03 00:51:06 2010 +0200 +++ b/rrdweb/rrd.py Sun Jan 23 13:50:13 2011 +0200 @@ -1,9 +1,13 @@ -import rrdtool - """ Friendly wrapper around the rrdtool python interface """ +import rrdtool + +import logging + +log = logging.getLogger('rrdweb.rrd') + def normalize_option_key (key) : """ Normalize the given option key. @@ -44,7 +48,7 @@ elif isinstance(value, list) : # list of option values - return normalize_option_multi(key, value) + return tuple(normalize_option_multi(key, value)) else : # option value @@ -68,10 +72,10 @@ Run the given rrdtool.* function, formatting the given positional arguments and options. """ - # series of (cmd-arg, cmd-arg, ...) tuples + # series of (cmd-arg, cmd-arg, ...) tuples, giving all '--opt' and 'value' arguments for each keyword argument opt_items = (normalize_option(key, value) for key, value in opts.iteritems()) - # decomposed series of cmd-args + # decomposed series of cmd-args for options opt_args = [item for items in opt_items for item in items] # positional arguments @@ -79,10 +83,36 @@ post_args = [str(arg) for arg in post_args] # full arguments - args = pre_args + opt_args + post_args + args = pre_args + opt_args + ['--'] + post_args + + log.debug('rrdtool %s %s', func.__name__, args) return func(*args) def graph (out_path, *args, **opts) : + """ + Create a graph from data stored in one or several RRDs. + + Graph image output is written to the given path. + + Returns... something to do with the image's dimensions, or even the data itself? + """ + return run_cmd(rrdtool.graph, (out_path, ), opts, args) +def create (rrd_path, *args, **opts) : + """ + Set up a new Round Robin Database (RRD). + """ + + return run_cmd(rrdtool.create, (rrd_path, ), opts, args) + +def update (rrd_path, *args, **opts) : + """ + Store new data values into an RRD. + """ + + return run_cmd(rrdtool.update, (rrd_path, ), opts, args) + + +