from pvl.verkko import db, web
from pvl.html import tags as html
import re
import socket # dns
import logging; log = logging.getLogger('pvl.verkko.hosts')
# XXX: this should actually be DHCPHost
class Host (object) :
DATE_FMT = '%Y%m%d'
MAC_HEX = r'([A-Za-z0-9]{2})'
MAC_SEP = r'[-:.]?'
MAC_RE = re.compile(MAC_SEP.join([MAC_HEX] * 6))
@classmethod
def normalize_mac (cls, mac) :
match = cls.MAC_RE.search(mac)
if not match :
raise ValueError(mac)
else :
return ':'.join(hh.lower() for hh in match.groups())
def __init__ (self, ip, mac, name=None) :
self.ip = ip
self.mac = mac
self.name = name
def render_mac (self) :
if not self.mac :
return None
elif len(self.mac) > (6 * 2 + 5) :
return u'???'
else :
return unicode(self.mac)
def render_name (self) :
if self.name :
return self.name.decode('ascii', 'replace')
else :
return None
def when (self) :
return '{frm} - {to}'.format(
frm = self.first_seen.strftime(self.DATE_FMT),
to = self.last_seen.strftime(self.DATE_FMT),
)
def dns (self) :
"""
Reverse-DNS lookup.
"""
if not self.ip :
return None
sockaddrs = set(sockaddr for family, socktype, proto, canonname, sockaddr in socket.getaddrinfo(self.ip, 0, 0, 0, 0, socket.AI_NUMERICHOST))
for sockaddr in sockaddrs :
try :
host, port = socket.getnameinfo(sockaddr, socket.NI_NAMEREQD)
except socket.gaierror :
continue
return host
def __unicode__ (self) :
return u"{host.ip} ({host.mac})".format(host=self)
db.mapper(Host, db.dhcp_hosts, properties=dict(
id = db.dhcp_hosts.c.rowid,
#_mac = db.dhcp_hosts.c.mac,
#_name = db.dhcp_hosts.c.name,
))
class BaseHandler (web.Handler) :
HOST_ATTRS = {
'id': Host.id,
'ip': Host.ip,
'mac': Host.mac,
'name': Host.name,
'seen': Host.last_seen,
}
HOST_SORT = Host.last_seen.desc()
def query (self) :
hosts = self.db.query(Host)
# sort ?
self.sort = self.request.args.get('sort')
if self.sort :
sort = self.HOST_ATTRS[self.sort]
else :
sort = self.HOST_SORT
log.debug("sort: %s", sort)
hosts = hosts.order_by(sort)
# k
return hosts
def render_hosts (self, hosts, title=None, filters=False) :
COLS = (
#title sort filter class
('IP', 'ip', 'ip', 'ip' ),
('MAC', 'mac', 'mac', 'mac' ),
('Hostname', 'name', False, False ),
('Seen', 'seen', False, False ),
)
def url (**opts) :
args = dict()
if filters :
args.update(filters)
args.update(opts)
return self.url(**args)
table = html.table(
html.caption(title) if title else None,
html.thead(
html.tr(
html.th('#'),
(
html.th(
html.a(href=url(sort=sort))(title) if sort else (title)
) for title, sort, filter, class_ in COLS
)
),
html.tr(class_='filter')(
html.td(
html.input(type='submit', value=u'\u00BF'),
),
(
html.td(class_=class_)(
html.input(type='text', name=filter, value=filters.get(filter)) if filter else None
) for title, sort, filter, class_ in COLS
)
) if filters is not False else None
),
html.tbody(
html.tr(class_=('alternate' if i % 2 else None), id=host.id)(
html.th(
html.a(href=self.url(ItemHandler, id=host.id))(
'#' #host['rowid'])
)
),
html.td(class_='ip')(
html.a(href=self.url(ListHandler, ip=host.ip))(
host.ip
)
),
html.td(class_='mac')(
html.a(href=self.url(ListHandler, mac=host.mac))(
host.render_mac()
)
),
html.td(host.render_name()),
html.td(host.when()),
) for i, host in enumerate(hosts)
),
html.tfoot(
html.tr(
html.td(colspan=(1 + len(COLS)))(
# XXX: does separate SELECT count()
"{count} hosts".format(count=hosts.count())
)
)
)
)
if filters is False :
return table
else :
return html.form(method='get', action=self.url())(
html.input(type='hidden', name='sort', value=self.sort),
table,
)
def render_host (self, host, hosts) :
attrs = (
('IP', host.ip),
('MAC', host.mac),
('Hostname', host.name),
('DNS', host.dns()),
('First seen', host.first_seen),
('Last seen', host.last_seen),
)
return (
html.h2('Host'),
html.dl(
(html.dt(title), html.dd(value)) for title, value in attrs
),
html.h2('Related'),
self.render_hosts(hosts),
html.a(href=self.url(ListHandler))(html('«'), 'Back'),
)
class ItemHandler (BaseHandler) :
def process (self, id) :
self.hosts = self.query()
self.host = self.hosts.get(id)
if not self.host :
raise web.NotFound("No such host: {id}".format(id=id))
self.hosts = self.hosts.filter((Host.ip == self.host.ip) | (Host.mac == self.host.mac))
def title (self) :
return u"DHCP Host: {self.host}".format(self=self)
def render (self) :
return self.render_host(self.host, self.hosts)
class ListHandler (BaseHandler) :
def process (self) :
self.hosts = self.query()
# filter?
self.filters = {}
for attr in self.HOST_ATTRS :
value = self.request.args.get(attr)
if not value :
continue
# preprocess
like = False
if value.endswith('*') :
like = value.replace('*', '%')
elif attr == 'mac' :
value = Host.normalize_mac(value)
# filter
col = self.HOST_ATTRS[attr]
if like :
filter = (col.like(like))
else :
filter = (col == value)
self.hosts = self.hosts.filter(filter)
self.filters[attr] = value
def title (self) :
if self.filters :
return "DHCP Hosts: {filters}".format(filters=', '.join(self.filters.itervalues()))
else :
return "DHCP Hosts"
def render (self) :
return (
self.render_hosts(self.hosts, filters=self.filters),
html.a(href=self.url())(html('«'), 'Back') if self.filters else None,
)