terom@29: """ terom@29: A pmacct -> rrd runner terom@29: """ terom@29: terom@29: import collections terom@29: import subprocess terom@29: import os.path terom@29: import logging terom@29: terom@29: from rrdweb import rrd terom@29: terom@29: log = logging.getLogger('rrdweb.pmacct') terom@29: terom@29: ## pmacct terom@29: # name to invoke `pmacct` as terom@29: PMACCT_CMD = 'pmacct' terom@29: terom@29: # path to in/out client sockets terom@29: PMACCT_IN_SOCK = 'var/pmacct/host-in.sock' terom@29: PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock' terom@29: terom@29: ## RRD terom@29: # path to rrd data dir terom@29: RRD_ROOT = 'var/rrd' terom@29: terom@29: # path to per-host RRD file terom@29: HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd' terom@29: terom@29: ## RRD create params terom@29: # step interval (in seconds) terom@29: RRD_STEP = 60 terom@29: terom@29: # Data Source parameters terom@29: DS_TYPE = 'COUNTER' terom@29: DS_HEARTBEAT = RRD_STEP * 2 terom@29: DS_BYTES_MIN = 0 terom@29: DS_BYTES_MAX = (1000 ** 3) / 8 # 1gbps terom@29: DS_PKTS_MIN = 0 terom@29: DS_PKTS_MAX = DS_BYTES_MAX / 64 # 1gbps @ 64 bytes-per-packet terom@29: terom@29: def pmacct_cmd (*args) : terom@29: """ terom@29: Invoke the pmacct client, yielding the output as a series of lines. terom@29: """ terom@29: terom@29: # argv with name of executable in [0] terom@29: argv = (PMACCT_CMD, ) + args terom@29: terom@29: # invoke terom@29: proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) terom@29: terom@29: # XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking.. terom@29: terom@29: # interact (nothing on stdin) terom@29: stdout, stderr = proc.communicate() terom@29: terom@29: # error-check terom@29: if proc.returncode : terom@29: raise Exception("pmacct terminated with status=%d" % (proc.returncode, )) terom@29: terom@29: # stderr? terom@29: if stderr : terom@29: raise Exception("pmacct terminated with errors: %s" % (stderr, )) terom@29: terom@29: # output terom@29: for line in stdout.splitlines() : terom@29: yield line terom@29: terom@29: def pmacct_summary (socket) : terom@29: """ terom@29: Query and return summary-form data from pacct, by querying the full table and parsing it. terom@29: terom@29: Yields a series of { field: value } dicts for each row in the table. terom@29: """ terom@29: terom@29: # `pmacct -s` terom@29: output = pmacct_cmd('-p', socket, '-s') terom@29: terom@29: # the list of fields, parsed from the first line of output terom@29: fields = None terom@29: terom@29: for line in output : terom@29: if not fields : terom@29: # parse first row terom@29: fields = line.split() terom@29: terom@29: continue terom@29: terom@29: if not line : terom@29: # end of data terom@29: break terom@29: terom@29: # parse field data terom@29: values = line.split() terom@29: terom@29: # map by field name terom@29: data = dict(zip(fields, values)) terom@29: terom@29: yield data terom@29: terom@29: class Host (object) : terom@29: """ terom@29: Traffic counters for some specific host. terom@29: """ terom@29: terom@29: __slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets') terom@29: terom@29: def __init__ (self, ip=None) : terom@29: self.ip = ip terom@29: terom@29: self.in_ok = self.out_ok = False terom@29: self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0 terom@29: terom@29: def __repr__ (self) : terom@29: return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__) terom@29: terom@29: terom@29: def host_counters (in_table, out_table) : terom@29: """ terom@29: Returns the full set of in/out traffic counters for all hosts currently in the summary table. terom@29: terom@29: This processes the two tables given, by their SRC_IP/DST_IP fields. terom@29: terom@29: For each SRC_IP in the out_table, outgoing traffic is counted. terom@29: For each DST_IP in the in_table, incoming traffic is counted. terom@29: terom@29: Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables. terom@29: """ terom@29: terom@29: # combined set of hosts terom@29: hosts = collections.defaultdict(Host) terom@29: terom@29: # process incoming terom@29: for row in in_table : terom@29: ip = row['DST_IP'] terom@29: terom@29: host = hosts[ip] terom@29: terom@29: host.ip = ip terom@29: host.in_ok = True terom@29: host.in_bytes += int(row['BYTES']) terom@29: host.in_packets += int(row['PACKETS']) terom@29: terom@29: # process outgoing terom@29: for row in out_table : terom@29: ip = row['SRC_IP'] terom@29: terom@29: host = hosts[ip] terom@29: terom@29: host.ip = ip terom@29: host.out_ok = True terom@29: host.out_bytes += int(row['BYTES']) terom@29: host.out_packets += int(row['PACKETS']) terom@29: terom@29: return dict(hosts) terom@29: terom@29: def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') : terom@29: """ terom@29: Create a new .rrd file for the per-host traffic data at the given path. terom@29: """ terom@29: terom@29: # data sources terom@29: ds_list = ( terom@29: # bytes terom@29: ('in', ds_type, bytes_min, bytes_max), terom@29: ('out', ds_type, bytes_min, bytes_max), terom@29: terom@29: # packets terom@29: ('in_pkts', ds_type, pkts_min, pkts_max), terom@29: ('out_pkts', ds_type, pkts_min, pkts_max), terom@29: ) terom@29: terom@29: HOUR = 60 * 60 terom@29: terom@29: # archives terom@29: rra_list = ( terom@29: ('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP), # 1 hour terom@29: ) terom@29: terom@29: # definitions terom@29: defs = [ terom@29: ( terom@29: 'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max) terom@29: ) for name, type, min, max in ds_list terom@29: ] + [ terom@29: ( terom@29: 'RRA:%s:%s:%s:%s' % (func, xff, steps, rows) terom@29: ) for func, xff, steps, rows in rra_list terom@29: ] terom@29: terom@29: # create using the given step terom@29: rrd.create(path, *defs, step=rrd_step) terom@29: terom@29: def update_host_rrd (path, host) : terom@29: """ terom@29: Update the .rrd file with the given current counter values. terom@29: """ terom@29: terom@29: # values to insert terom@29: values = ( terom@29: # bytes terom@29: ('in', host.in_bytes if host.in_ok else 'U'), terom@29: ('out', host.out_bytes if host.out_ok else 'U'), terom@29: terom@29: # packets terom@29: ('in_pkts', host.in_packets if host.in_ok else 'U'), terom@29: ('out_pkts', host.out_packets if host.out_ok else 'U'), terom@29: ) terom@29: terom@29: log.debug("Update %s: %r", path, values) terom@29: terom@29: # go terom@29: rrd.update(path, terom@29: # new values for current time terom@29: 'N:' + ':'.join(str(value) for ds, value in values), terom@29: terom@29: # names of DSs we give values for terom@29: template = ':'.join(name for name, value in values), terom@29: ) terom@29: terom@29: def update_host (host, **rrd_opts) : terom@29: """ terom@29: Update the host's .rrd file in rrd_root with the host's data. terom@29: terom@29: Creates .rrd files for new hosts automatically, using the given options. terom@29: """ terom@29: terom@29: # path to .rrd terom@29: rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts) terom@29: terom@29: # create? terom@29: if not os.path.exists(rrd) : terom@29: log.info("Create new .rrd for %s: %s", host.ip, rrd) terom@29: terom@29: create_host_rrd(rrd, **rrd_opts) terom@29: terom@29: # update terom@29: update_host_rrd(rrd, host) terom@29: