--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/rrdweb/pmacct.py Sun Jan 23 13:50:13 2011 +0200
@@ -0,0 +1,239 @@
+"""
+ A pmacct -> rrd runner
+"""
+
+import collections
+import subprocess
+import os.path
+import logging
+
+from rrdweb import rrd
+
+log = logging.getLogger('rrdweb.pmacct')
+
+## pmacct
+# name to invoke `pmacct` as
+PMACCT_CMD = 'pmacct'
+
+# path to in/out client sockets
+PMACCT_IN_SOCK = 'var/pmacct/host-in.sock'
+PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock'
+
+## RRD
+# path to rrd data dir
+RRD_ROOT = 'var/rrd'
+
+# path to per-host RRD file
+HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd'
+
+## RRD create params
+# step interval (in seconds)
+RRD_STEP = 60
+
+# Data Source parameters
+DS_TYPE = 'COUNTER'
+DS_HEARTBEAT = RRD_STEP * 2
+DS_BYTES_MIN = 0
+DS_BYTES_MAX = (1000 ** 3) / 8 # 1gbps
+DS_PKTS_MIN = 0
+DS_PKTS_MAX = DS_BYTES_MAX / 64 # 1gbps @ 64 bytes-per-packet
+
+def pmacct_cmd (*args) :
+ """
+ Invoke the pmacct client, yielding the output as a series of lines.
+ """
+
+ # argv with name of executable in [0]
+ argv = (PMACCT_CMD, ) + args
+
+ # invoke
+ proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ # XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking..
+
+ # interact (nothing on stdin)
+ stdout, stderr = proc.communicate()
+
+ # error-check
+ if proc.returncode :
+ raise Exception("pmacct terminated with status=%d" % (proc.returncode, ))
+
+ # stderr?
+ if stderr :
+ raise Exception("pmacct terminated with errors: %s" % (stderr, ))
+
+ # output
+ for line in stdout.splitlines() :
+ yield line
+
+def pmacct_summary (socket) :
+ """
+ Query and return summary-form data from pacct, by querying the full table and parsing it.
+
+ Yields a series of { field: value } dicts for each row in the table.
+ """
+
+ # `pmacct -s`
+ output = pmacct_cmd('-p', socket, '-s')
+
+ # the list of fields, parsed from the first line of output
+ fields = None
+
+ for line in output :
+ if not fields :
+ # parse first row
+ fields = line.split()
+
+ continue
+
+ if not line :
+ # end of data
+ break
+
+ # parse field data
+ values = line.split()
+
+ # map by field name
+ data = dict(zip(fields, values))
+
+ yield data
+
+class Host (object) :
+ """
+ Traffic counters for some specific host.
+ """
+
+ __slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets')
+
+ def __init__ (self, ip=None) :
+ self.ip = ip
+
+ self.in_ok = self.out_ok = False
+ self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0
+
+ def __repr__ (self) :
+ return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__)
+
+
+def host_counters (in_table, out_table) :
+ """
+ Returns the full set of in/out traffic counters for all hosts currently in the summary table.
+
+ This processes the two tables given, by their SRC_IP/DST_IP fields.
+
+ For each SRC_IP in the out_table, outgoing traffic is counted.
+ For each DST_IP in the in_table, incoming traffic is counted.
+
+ Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables.
+ """
+
+ # combined set of hosts
+ hosts = collections.defaultdict(Host)
+
+ # process incoming
+ for row in in_table :
+ ip = row['DST_IP']
+
+ host = hosts[ip]
+
+ host.ip = ip
+ host.in_ok = True
+ host.in_bytes += int(row['BYTES'])
+ host.in_packets += int(row['PACKETS'])
+
+ # process outgoing
+ for row in out_table :
+ ip = row['SRC_IP']
+
+ host = hosts[ip]
+
+ host.ip = ip
+ host.out_ok = True
+ host.out_bytes += int(row['BYTES'])
+ host.out_packets += int(row['PACKETS'])
+
+ return dict(hosts)
+
+def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') :
+ """
+ Create a new .rrd file for the per-host traffic data at the given path.
+ """
+
+ # data sources
+ ds_list = (
+ # bytes
+ ('in', ds_type, bytes_min, bytes_max),
+ ('out', ds_type, bytes_min, bytes_max),
+
+ # packets
+ ('in_pkts', ds_type, pkts_min, pkts_max),
+ ('out_pkts', ds_type, pkts_min, pkts_max),
+ )
+
+ HOUR = 60 * 60
+
+ # archives
+ rra_list = (
+ ('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP), # 1 hour
+ )
+
+ # definitions
+ defs = [
+ (
+ 'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max)
+ ) for name, type, min, max in ds_list
+ ] + [
+ (
+ 'RRA:%s:%s:%s:%s' % (func, xff, steps, rows)
+ ) for func, xff, steps, rows in rra_list
+ ]
+
+ # create using the given step
+ rrd.create(path, *defs, step=rrd_step)
+
+def update_host_rrd (path, host) :
+ """
+ Update the .rrd file with the given current counter values.
+ """
+
+ # values to insert
+ values = (
+ # bytes
+ ('in', host.in_bytes if host.in_ok else 'U'),
+ ('out', host.out_bytes if host.out_ok else 'U'),
+
+ # packets
+ ('in_pkts', host.in_packets if host.in_ok else 'U'),
+ ('out_pkts', host.out_packets if host.out_ok else 'U'),
+ )
+
+ log.debug("Update %s: %r", path, values)
+
+ # go
+ rrd.update(path,
+ # new values for current time
+ 'N:' + ':'.join(str(value) for ds, value in values),
+
+ # names of DSs we give values for
+ template = ':'.join(name for name, value in values),
+ )
+
+def update_host (host, **rrd_opts) :
+ """
+ Update the host's .rrd file in rrd_root with the host's data.
+
+ Creates .rrd files for new hosts automatically, using the given options.
+ """
+
+ # path to .rrd
+ rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts)
+
+ # create?
+ if not os.path.exists(rrd) :
+ log.info("Create new .rrd for %s: %s", host.ip, rrd)
+
+ create_host_rrd(rrd, **rrd_opts)
+
+ # update
+ update_host_rrd(rrd, host)
+