--- a/.hgignore Wed Nov 03 00:51:06 2010 +0200
+++ b/.hgignore Sun Jan 23 13:50:13 2011 +0200
@@ -10,5 +10,5 @@
\.pyc$
\.sw[op]$
-# misc env setup
-^rrd$
+# data
+^var$
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/rrdweb-pmacct Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+"""
+ Import data from pmacct to RRD
+"""
+
+import optparse
+import logging
+
+from rrdweb import pmacct
+
+log = logging.getLogger('rrdweb-pmacct')
+
+def parse_args (args) :
+ global options
+
+ parser = optparse.OptionParser(
+ usage = "%prog [options]"
+ )
+
+ # generic
+ parser.add_option('-q', "--quiet", help="No output in normal operation",
+ action='store_const', dest="loglvl", const=logging.WARNING,
+ )
+ parser.add_option('-v', "--verbose", help="More output",
+ action='store_const', dest="loglvl", const=logging.INFO,
+ )
+ parser.add_option('-D', "--debug", help="Even more output",
+ action='store_const', dest="loglvl", const=logging.DEBUG,
+ )
+
+ # paths
+ parser.add_option('-R', "--rrd-dir", default="var/rrd", help="Path to directory containing .rrd files")
+ parser.add_option('-S', "--rrd-step", help="RRD step interval (seconds) for new .rrd's", metavar='STEP')
+
+ parser.add_option('-I', "--in-sock", help="Path to pmacct host-in.sock", metavar='SOCK')
+ parser.add_option('-O', "--out-sock", help="Path to pmacct host-out.sock", metavar='SOCK')
+
+
+ # defaults
+ parser.set_defaults(
+ loglvl = logging.INFO,
+ )
+
+ ## parse
+ options, args = parser.parse_args(args)
+
+ # validate
+ if not (options.in_sock and options.out_sock) :
+ raise Exception("Both --in-sock and --out-sock are required options")
+
+ ## apply
+ logging.basicConfig(
+ format = "[%(levelname)8s] %(funcName)20s : %(message)s",
+ level = options.loglvl,
+ )
+
+ return args
+
+def get_hosts_data () :
+ """
+ Returns the in/out host tables.
+ """
+
+ log.debug("fetching hosts data from %s + %s", options.in_sock, options.out_sock)
+
+ # read summaries
+ in_table = list(pmacct.pmacct_summary(options.in_sock))
+ out_table = list(pmacct.pmacct_summary(options.out_sock))
+
+ # merge into host data
+ hosts = pmacct.host_counters(in_table, out_table)
+
+ log.debug("got %d in entries + %d out entries -> %d hosts", len(in_table), len(out_table), len(hosts))
+
+ return hosts.values()
+
+def main (args) :
+ # parse
+ args = parse_args(args)
+
+ # list of Host objects
+ hosts = get_hosts_data()
+
+ log.debug("full set of host data: %s", hosts)
+ log.info("Updating %d hosts...", len(hosts))
+
+ # update
+ for host in hosts :
+ log.info("Updating host %s...", host.ip)
+
+ pmacct.update_host(host, rrd_root=options.rrd_dir, rrd_step=options.rrd_step)
+
+if __name__ == '__main__' :
+ from sys import argv
+
+ main(argv[1:])
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/doc/pmacct-sflow.conf Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,4 @@
+
+plugins: sfprobe
+sampling_rate: 10
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/doc/pmacct.conf Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,23 @@
+! Sample pmacct configuration file for the rrd loader
+
+! normalize data rates from incoming sFlow samples
+sfacctd_renormalize: true
+sfacctd_ext_sampling_rate: 10
+
+plugins: memory[in], memory[out], memory[sum]
+
+! src/dst cross-join
+aggregate[sum]: src_host,dst_host
+aggregate_filter[sum]: net 194.197.235.0/24
+imt_path[sum]: var/pmacct/host-sum.sock
+
+! dst ip
+aggregate[in]: dst_host
+aggregate_filter[in]: dst net 194.197.235.0/24
+imt_path[in]: var/pmacct/host-in.sock
+
+! src ip
+aggregate[out]: src_host
+aggregate_filter[out]: src net 194.197.235.0/24
+imt_path[out]: var/pmacct/host-out.sock
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/rrdweb/pmacct.py Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,239 @@
+"""
+ A pmacct -> rrd runner
+"""
+
+import collections
+import subprocess
+import os.path
+import logging
+
+from rrdweb import rrd
+
+log = logging.getLogger('rrdweb.pmacct')
+
+## pmacct
+# name to invoke `pmacct` as
+PMACCT_CMD = 'pmacct'
+
+# path to in/out client sockets
+PMACCT_IN_SOCK = 'var/pmacct/host-in.sock'
+PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock'
+
+## RRD
+# path to rrd data dir
+RRD_ROOT = 'var/rrd'
+
+# path to per-host RRD file
+HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd'
+
+## RRD create params
+# step interval (in seconds)
+RRD_STEP = 60
+
+# Data Source parameters
+DS_TYPE = 'COUNTER'
+DS_HEARTBEAT = RRD_STEP * 2
+DS_BYTES_MIN = 0
+DS_BYTES_MAX = (1000 ** 3) / 8 # 1gbps
+DS_PKTS_MIN = 0
+DS_PKTS_MAX = DS_BYTES_MAX / 64 # 1gbps @ 64 bytes-per-packet
+
+def pmacct_cmd (*args) :
+ """
+ Invoke the pmacct client, yielding the output as a series of lines.
+ """
+
+ # argv with name of executable in [0]
+ argv = (PMACCT_CMD, ) + args
+
+ # invoke
+ proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ # XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking..
+
+ # interact (nothing on stdin)
+ stdout, stderr = proc.communicate()
+
+ # error-check
+ if proc.returncode :
+ raise Exception("pmacct terminated with status=%d" % (proc.returncode, ))
+
+ # stderr?
+ if stderr :
+ raise Exception("pmacct terminated with errors: %s" % (stderr, ))
+
+ # output
+ for line in stdout.splitlines() :
+ yield line
+
+def pmacct_summary (socket) :
+ """
+ Query and return summary-form data from pacct, by querying the full table and parsing it.
+
+ Yields a series of { field: value } dicts for each row in the table.
+ """
+
+ # `pmacct -s`
+ output = pmacct_cmd('-p', socket, '-s')
+
+ # the list of fields, parsed from the first line of output
+ fields = None
+
+ for line in output :
+ if not fields :
+ # parse first row
+ fields = line.split()
+
+ continue
+
+ if not line :
+ # end of data
+ break
+
+ # parse field data
+ values = line.split()
+
+ # map by field name
+ data = dict(zip(fields, values))
+
+ yield data
+
+class Host (object) :
+ """
+ Traffic counters for some specific host.
+ """
+
+ __slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets')
+
+ def __init__ (self, ip=None) :
+ self.ip = ip
+
+ self.in_ok = self.out_ok = False
+ self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0
+
+ def __repr__ (self) :
+ return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__)
+
+
+def host_counters (in_table, out_table) :
+ """
+ Returns the full set of in/out traffic counters for all hosts currently in the summary table.
+
+ This processes the two tables given, by their SRC_IP/DST_IP fields.
+
+ For each SRC_IP in the out_table, outgoing traffic is counted.
+ For each DST_IP in the in_table, incoming traffic is counted.
+
+ Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables.
+ """
+
+ # combined set of hosts
+ hosts = collections.defaultdict(Host)
+
+ # process incoming
+ for row in in_table :
+ ip = row['DST_IP']
+
+ host = hosts[ip]
+
+ host.ip = ip
+ host.in_ok = True
+ host.in_bytes += int(row['BYTES'])
+ host.in_packets += int(row['PACKETS'])
+
+ # process outgoing
+ for row in out_table :
+ ip = row['SRC_IP']
+
+ host = hosts[ip]
+
+ host.ip = ip
+ host.out_ok = True
+ host.out_bytes += int(row['BYTES'])
+ host.out_packets += int(row['PACKETS'])
+
+ return dict(hosts)
+
+def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') :
+ """
+ Create a new .rrd file for the per-host traffic data at the given path.
+ """
+
+ # data sources
+ ds_list = (
+ # bytes
+ ('in', ds_type, bytes_min, bytes_max),
+ ('out', ds_type, bytes_min, bytes_max),
+
+ # packets
+ ('in_pkts', ds_type, pkts_min, pkts_max),
+ ('out_pkts', ds_type, pkts_min, pkts_max),
+ )
+
+ HOUR = 60 * 60
+
+ # archives
+ rra_list = (
+ ('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP), # 1 hour
+ )
+
+ # definitions
+ defs = [
+ (
+ 'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max)
+ ) for name, type, min, max in ds_list
+ ] + [
+ (
+ 'RRA:%s:%s:%s:%s' % (func, xff, steps, rows)
+ ) for func, xff, steps, rows in rra_list
+ ]
+
+ # create using the given step
+ rrd.create(path, *defs, step=rrd_step)
+
+def update_host_rrd (path, host) :
+ """
+ Update the .rrd file with the given current counter values.
+ """
+
+ # values to insert
+ values = (
+ # bytes
+ ('in', host.in_bytes if host.in_ok else 'U'),
+ ('out', host.out_bytes if host.out_ok else 'U'),
+
+ # packets
+ ('in_pkts', host.in_packets if host.in_ok else 'U'),
+ ('out_pkts', host.out_packets if host.out_ok else 'U'),
+ )
+
+ log.debug("Update %s: %r", path, values)
+
+ # go
+ rrd.update(path,
+ # new values for current time
+ 'N:' + ':'.join(str(value) for ds, value in values),
+
+ # names of DSs we give values for
+ template = ':'.join(name for name, value in values),
+ )
+
+def update_host (host, **rrd_opts) :
+ """
+ Update the host's .rrd file in rrd_root with the host's data.
+
+ Creates .rrd files for new hosts automatically, using the given options.
+ """
+
+ # path to .rrd
+ rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts)
+
+ # create?
+ if not os.path.exists(rrd) :
+ log.info("Create new .rrd for %s: %s", host.ip, rrd)
+
+ create_host_rrd(rrd, **rrd_opts)
+
+ # update
+ update_host_rrd(rrd, host)
+
--- a/rrdweb/rrd.py Wed Nov 03 00:51:06 2010 +0200
+++ b/rrdweb/rrd.py Sun Jan 23 13:50:13 2011 +0200
@@ -1,9 +1,13 @@
-import rrdtool
-
"""
Friendly wrapper around the rrdtool python interface
"""
+import rrdtool
+
+import logging
+
+log = logging.getLogger('rrdweb.rrd')
+
def normalize_option_key (key) :
"""
Normalize the given option key.
@@ -44,7 +48,7 @@
elif isinstance(value, list) :
# list of option values
- return normalize_option_multi(key, value)
+ return tuple(normalize_option_multi(key, value))
else :
# option value
@@ -68,10 +72,10 @@
Run the given rrdtool.* function, formatting the given positional arguments and options.
"""
- # series of (cmd-arg, cmd-arg, ...) tuples
+ # series of (cmd-arg, cmd-arg, ...) tuples, giving all '--opt' and 'value' arguments for each keyword argument
opt_items = (normalize_option(key, value) for key, value in opts.iteritems())
- # decomposed series of cmd-args
+ # decomposed series of cmd-args for options
opt_args = [item for items in opt_items for item in items]
# positional arguments
@@ -79,10 +83,36 @@
post_args = [str(arg) for arg in post_args]
# full arguments
- args = pre_args + opt_args + post_args
+ args = pre_args + opt_args + ['--'] + post_args
+
+ log.debug('rrdtool %s %s', func.__name__, args)
return func(*args)
def graph (out_path, *args, **opts) :
+ """
+ Create a graph from data stored in one or several RRDs.
+
+ Graph image output is written to the given path.
+
+ Returns... something to do with the image's dimensions, or even the data itself?
+ """
+
return run_cmd(rrdtool.graph, (out_path, ), opts, args)
+def create (rrd_path, *args, **opts) :
+ """
+ Set up a new Round Robin Database (RRD).
+ """
+
+ return run_cmd(rrdtool.create, (rrd_path, ), opts, args)
+
+def update (rrd_path, *args, **opts) :
+ """
+ Store new data values into an RRD.
+ """
+
+ return run_cmd(rrdtool.update, (rrd_path, ), opts, args)
+
+
+