"""
A pmacct -> rrd runner
"""
import collections
import subprocess
import os.path
import logging
from rrdweb import rrd
log = logging.getLogger('rrdweb.pmacct')
## pmacct
# name to invoke `pmacct` as
PMACCT_CMD = 'pmacct'
# path to in/out client sockets
PMACCT_IN_SOCK = 'var/pmacct/host-in.sock'
PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock'
## RRD
# path to rrd data dir
RRD_ROOT = 'var/rrd'
# path to per-host RRD file
HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd'
## RRD create params
# step interval (in seconds)
RRD_STEP = 60
# Data Source parameters
DS_TYPE = 'COUNTER'
DS_HEARTBEAT = RRD_STEP * 2
DS_BYTES_MIN = 0
DS_BYTES_MAX = (1000 ** 3) / 8 # 1gbps
DS_PKTS_MIN = 0
DS_PKTS_MAX = DS_BYTES_MAX / 64 # 1gbps @ 64 bytes-per-packet
def pmacct_cmd (*args) :
"""
Invoke the pmacct client, yielding the output as a series of lines.
"""
# argv with name of executable in [0]
argv = (PMACCT_CMD, ) + args
# invoke
proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking..
# interact (nothing on stdin)
stdout, stderr = proc.communicate()
# error-check
if proc.returncode :
raise Exception("pmacct terminated with status=%d" % (proc.returncode, ))
# stderr?
if stderr :
raise Exception("pmacct terminated with errors: %s" % (stderr, ))
# output
for line in stdout.splitlines() :
yield line
def pmacct_summary (socket) :
"""
Query and return summary-form data from pacct, by querying the full table and parsing it.
Yields a series of { field: value } dicts for each row in the table.
"""
# `pmacct -s`
output = pmacct_cmd('-p', socket, '-s')
# the list of fields, parsed from the first line of output
fields = None
for line in output :
if not fields :
# parse first row
fields = line.split()
continue
if not line :
# end of data
break
# parse field data
values = line.split()
# map by field name
data = dict(zip(fields, values))
yield data
class Host (object) :
"""
Traffic counters for some specific host.
"""
__slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets')
def __init__ (self, ip=None) :
self.ip = ip
self.in_ok = self.out_ok = False
self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0
def __repr__ (self) :
return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__)
def host_counters (in_table, out_table) :
"""
Returns the full set of in/out traffic counters for all hosts currently in the summary table.
This processes the two tables given, by their SRC_IP/DST_IP fields.
For each SRC_IP in the out_table, outgoing traffic is counted.
For each DST_IP in the in_table, incoming traffic is counted.
Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables.
"""
# combined set of hosts
hosts = collections.defaultdict(Host)
# process incoming
for row in in_table :
ip = row['DST_IP']
host = hosts[ip]
host.ip = ip
host.in_ok = True
host.in_bytes += int(row['BYTES'])
host.in_packets += int(row['PACKETS'])
# process outgoing
for row in out_table :
ip = row['SRC_IP']
host = hosts[ip]
host.ip = ip
host.out_ok = True
host.out_bytes += int(row['BYTES'])
host.out_packets += int(row['PACKETS'])
return dict(hosts)
def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') :
"""
Create a new .rrd file for the per-host traffic data at the given path.
"""
# data sources
ds_list = (
# bytes
('in', ds_type, bytes_min, bytes_max),
('out', ds_type, bytes_min, bytes_max),
# packets
('in_pkts', ds_type, pkts_min, pkts_max),
('out_pkts', ds_type, pkts_min, pkts_max),
)
HOUR = 60 * 60
# archives
rra_list = (
('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP), # 1 hour
)
# definitions
defs = [
(
'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max)
) for name, type, min, max in ds_list
] + [
(
'RRA:%s:%s:%s:%s' % (func, xff, steps, rows)
) for func, xff, steps, rows in rra_list
]
# create using the given step
rrd.create(path, *defs, step=rrd_step)
def update_host_rrd (path, host) :
"""
Update the .rrd file with the given current counter values.
"""
# values to insert
values = (
# bytes
('in', host.in_bytes if host.in_ok else 'U'),
('out', host.out_bytes if host.out_ok else 'U'),
# packets
('in_pkts', host.in_packets if host.in_ok else 'U'),
('out_pkts', host.out_packets if host.out_ok else 'U'),
)
log.debug("Update %s: %r", path, values)
# go
rrd.update(path,
# new values for current time
'N:' + ':'.join(str(value) for ds, value in values),
# names of DSs we give values for
template = ':'.join(name for name, value in values),
)
def update_host (host, **rrd_opts) :
"""
Update the host's .rrd file in rrd_root with the host's data.
Creates .rrd files for new hosts automatically, using the given options.
"""
# path to .rrd
rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts)
# create?
if not os.path.exists(rrd) :
log.info("Create new .rrd for %s: %s", host.ip, rrd)
create_host_rrd(rrd, **rrd_opts)
# update
update_host_rrd(rrd, host)