29
|
1 |
"""
|
|
2 |
A pmacct -> rrd runner
|
|
3 |
"""
|
|
4 |
|
|
5 |
import collections
|
|
6 |
import subprocess
|
|
7 |
import os.path
|
|
8 |
import logging
|
|
9 |
|
|
10 |
from rrdweb import rrd
|
|
11 |
|
|
12 |
log = logging.getLogger('rrdweb.pmacct')
|
|
13 |
|
|
14 |
## pmacct
|
|
15 |
# name to invoke `pmacct` as
|
|
16 |
PMACCT_CMD = 'pmacct'
|
|
17 |
|
|
18 |
# path to in/out client sockets
|
|
19 |
PMACCT_IN_SOCK = 'var/pmacct/host-in.sock'
|
|
20 |
PMACCT_OUT_SOCK = 'var/pmacct/host-out.sock'
|
|
21 |
|
|
22 |
## RRD
|
|
23 |
# path to rrd data dir
|
|
24 |
RRD_ROOT = 'var/rrd'
|
|
25 |
|
|
26 |
# path to per-host RRD file
|
|
27 |
HOST_RRD_PATH = '{rrd_root}/{host_ip}.rrd'
|
|
28 |
|
|
29 |
## RRD create params
|
|
30 |
# step interval (in seconds)
|
|
31 |
RRD_STEP = 60
|
|
32 |
|
|
33 |
# Data Source parameters
|
|
34 |
DS_TYPE = 'COUNTER'
|
|
35 |
DS_HEARTBEAT = RRD_STEP * 2
|
|
36 |
DS_BYTES_MIN = 0
|
|
37 |
DS_BYTES_MAX = (1000 ** 3) / 8 # 1gbps
|
|
38 |
DS_PKTS_MIN = 0
|
|
39 |
DS_PKTS_MAX = DS_BYTES_MAX / 64 # 1gbps @ 64 bytes-per-packet
|
|
40 |
|
|
41 |
def pmacct_cmd (*args) :
|
|
42 |
"""
|
|
43 |
Invoke the pmacct client, yielding the output as a series of lines.
|
|
44 |
"""
|
|
45 |
|
|
46 |
# argv with name of executable in [0]
|
|
47 |
argv = (PMACCT_CMD, ) + args
|
|
48 |
|
|
49 |
# invoke
|
|
50 |
proc = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
51 |
|
|
52 |
# XXX: manipulate the pipes directly, to efficiently stream the output... without deadlocking..
|
|
53 |
|
|
54 |
# interact (nothing on stdin)
|
|
55 |
stdout, stderr = proc.communicate()
|
|
56 |
|
|
57 |
# error-check
|
|
58 |
if proc.returncode :
|
|
59 |
raise Exception("pmacct terminated with status=%d" % (proc.returncode, ))
|
|
60 |
|
|
61 |
# stderr?
|
|
62 |
if stderr :
|
|
63 |
raise Exception("pmacct terminated with errors: %s" % (stderr, ))
|
|
64 |
|
|
65 |
# output
|
|
66 |
for line in stdout.splitlines() :
|
|
67 |
yield line
|
|
68 |
|
|
69 |
def pmacct_summary (socket) :
|
|
70 |
"""
|
|
71 |
Query and return summary-form data from pacct, by querying the full table and parsing it.
|
|
72 |
|
|
73 |
Yields a series of { field: value } dicts for each row in the table.
|
|
74 |
"""
|
|
75 |
|
|
76 |
# `pmacct -s`
|
|
77 |
output = pmacct_cmd('-p', socket, '-s')
|
|
78 |
|
|
79 |
# the list of fields, parsed from the first line of output
|
|
80 |
fields = None
|
|
81 |
|
|
82 |
for line in output :
|
|
83 |
if not fields :
|
|
84 |
# parse first row
|
|
85 |
fields = line.split()
|
|
86 |
|
|
87 |
continue
|
|
88 |
|
|
89 |
if not line :
|
|
90 |
# end of data
|
|
91 |
break
|
|
92 |
|
|
93 |
# parse field data
|
|
94 |
values = line.split()
|
|
95 |
|
|
96 |
# map by field name
|
|
97 |
data = dict(zip(fields, values))
|
|
98 |
|
|
99 |
yield data
|
|
100 |
|
|
101 |
class Host (object) :
|
|
102 |
"""
|
|
103 |
Traffic counters for some specific host.
|
|
104 |
"""
|
|
105 |
|
|
106 |
__slots__ = ('ip', 'in_ok', 'out_ok', 'in_bytes', 'out_bytes', 'in_packets', 'out_packets')
|
|
107 |
|
|
108 |
def __init__ (self, ip=None) :
|
|
109 |
self.ip = ip
|
|
110 |
|
|
111 |
self.in_ok = self.out_ok = False
|
|
112 |
self.in_bytes = self.out_bytes = self.in_packets = self.out_packets = 0
|
|
113 |
|
|
114 |
def __repr__ (self) :
|
|
115 |
return 'Host(%s)' % ', '.join('%s=%r' % (name, getattr(self, name)) for name in self.__slots__)
|
|
116 |
|
|
117 |
|
|
118 |
def host_counters (in_table, out_table) :
|
|
119 |
"""
|
|
120 |
Returns the full set of in/out traffic counters for all hosts currently in the summary table.
|
|
121 |
|
|
122 |
This processes the two tables given, by their SRC_IP/DST_IP fields.
|
|
123 |
|
|
124 |
For each SRC_IP in the out_table, outgoing traffic is counted.
|
|
125 |
For each DST_IP in the in_table, incoming traffic is counted.
|
|
126 |
|
|
127 |
Returns a { host: Host } mapping containing the traffic counters for all hosts in the two tables.
|
|
128 |
"""
|
|
129 |
|
|
130 |
# combined set of hosts
|
|
131 |
hosts = collections.defaultdict(Host)
|
|
132 |
|
|
133 |
# process incoming
|
|
134 |
for row in in_table :
|
|
135 |
ip = row['DST_IP']
|
|
136 |
|
|
137 |
host = hosts[ip]
|
|
138 |
|
|
139 |
host.ip = ip
|
|
140 |
host.in_ok = True
|
|
141 |
host.in_bytes += int(row['BYTES'])
|
|
142 |
host.in_packets += int(row['PACKETS'])
|
|
143 |
|
|
144 |
# process outgoing
|
|
145 |
for row in out_table :
|
|
146 |
ip = row['SRC_IP']
|
|
147 |
|
|
148 |
host = hosts[ip]
|
|
149 |
|
|
150 |
host.ip = ip
|
|
151 |
host.out_ok = True
|
|
152 |
host.out_bytes += int(row['BYTES'])
|
|
153 |
host.out_packets += int(row['PACKETS'])
|
|
154 |
|
|
155 |
return dict(hosts)
|
|
156 |
|
|
157 |
def create_host_rrd (path, rrd_step=RRD_STEP, ds_type=DS_TYPE, heartbeat=DS_HEARTBEAT, bytes_min=DS_BYTES_MIN, bytes_max=DS_BYTES_MAX, pkts_min=DS_PKTS_MIN, pkts_max=DS_PKTS_MAX, rrd_root='XXX') :
|
|
158 |
"""
|
|
159 |
Create a new .rrd file for the per-host traffic data at the given path.
|
|
160 |
"""
|
|
161 |
|
|
162 |
# data sources
|
|
163 |
ds_list = (
|
|
164 |
# bytes
|
|
165 |
('in', ds_type, bytes_min, bytes_max),
|
|
166 |
('out', ds_type, bytes_min, bytes_max),
|
|
167 |
|
|
168 |
# packets
|
|
169 |
('in_pkts', ds_type, pkts_min, pkts_max),
|
|
170 |
('out_pkts', ds_type, pkts_min, pkts_max),
|
|
171 |
)
|
|
172 |
|
|
173 |
HOUR = 60 * 60
|
|
174 |
|
|
175 |
# archives
|
|
176 |
rra_list = (
|
|
177 |
('AVERAGE', 0.5, 1, 1 * HOUR / RRD_STEP), # 1 hour
|
|
178 |
)
|
|
179 |
|
|
180 |
# definitions
|
|
181 |
defs = [
|
|
182 |
(
|
|
183 |
'DS:%s:%s:%s:%s:%s' % (name, type, heartbeat, min, max)
|
|
184 |
) for name, type, min, max in ds_list
|
|
185 |
] + [
|
|
186 |
(
|
|
187 |
'RRA:%s:%s:%s:%s' % (func, xff, steps, rows)
|
|
188 |
) for func, xff, steps, rows in rra_list
|
|
189 |
]
|
|
190 |
|
|
191 |
# create using the given step
|
|
192 |
rrd.create(path, *defs, step=rrd_step)
|
|
193 |
|
|
194 |
def update_host_rrd (path, host) :
|
|
195 |
"""
|
|
196 |
Update the .rrd file with the given current counter values.
|
|
197 |
"""
|
|
198 |
|
|
199 |
# values to insert
|
|
200 |
values = (
|
|
201 |
# bytes
|
|
202 |
('in', host.in_bytes if host.in_ok else 'U'),
|
|
203 |
('out', host.out_bytes if host.out_ok else 'U'),
|
|
204 |
|
|
205 |
# packets
|
|
206 |
('in_pkts', host.in_packets if host.in_ok else 'U'),
|
|
207 |
('out_pkts', host.out_packets if host.out_ok else 'U'),
|
|
208 |
)
|
|
209 |
|
|
210 |
log.debug("Update %s: %r", path, values)
|
|
211 |
|
|
212 |
# go
|
|
213 |
rrd.update(path,
|
|
214 |
# new values for current time
|
|
215 |
'N:' + ':'.join(str(value) for ds, value in values),
|
|
216 |
|
|
217 |
# names of DSs we give values for
|
|
218 |
template = ':'.join(name for name, value in values),
|
|
219 |
)
|
|
220 |
|
|
221 |
def update_host (host, **rrd_opts) :
|
|
222 |
"""
|
|
223 |
Update the host's .rrd file in rrd_root with the host's data.
|
|
224 |
|
|
225 |
Creates .rrd files for new hosts automatically, using the given options.
|
|
226 |
"""
|
|
227 |
|
|
228 |
# path to .rrd
|
|
229 |
rrd = HOST_RRD_PATH.format(host_ip = host.ip, **rrd_opts)
|
|
230 |
|
|
231 |
# create?
|
|
232 |
if not os.path.exists(rrd) :
|
|
233 |
log.info("Create new .rrd for %s: %s", host.ip, rrd)
|
|
234 |
|
|
235 |
create_host_rrd(rrd, **rrd_opts)
|
|
236 |
|
|
237 |
# update
|
|
238 |
update_host_rrd(rrd, host)
|
|
239 |
|