terom@0: from pvl.verkko import db, web terom@0: terom@0: from pvl.html import tags as html terom@0: terom@8: import re terom@0: import socket # dns terom@0: terom@0: import logging; log = logging.getLogger('pvl.verkko.hosts') terom@0: terom@1: # XXX: this should actually be DHCPHost terom@0: class Host (object) : terom@0: DATE_FMT = '%Y%m%d' terom@8: terom@8: MAC_HEX = r'([A-Za-z0-9]{2})' terom@8: MAC_SEP = r'[-:.]?' terom@8: MAC_RE = re.compile(MAC_SEP.join([MAC_HEX] * 6)) terom@8: terom@8: @classmethod terom@8: def normalize_mac (cls, mac) : terom@8: match = cls.MAC_RE.search(mac) terom@8: terom@8: if not match : terom@8: raise ValueError(mac) terom@8: terom@8: else : terom@8: return ':'.join(hh.lower() for hh in match.groups()) terom@0: terom@0: def __init__ (self, ip, mac, name=None) : terom@0: self.ip = ip terom@0: self.mac = mac terom@0: self.name = name terom@0: terom@0: def render_mac (self) : terom@0: if not self.mac : terom@0: return None terom@0: terom@0: elif len(self.mac) > (6 * 2 + 5) : terom@0: return u'???' terom@0: terom@0: else : terom@0: return unicode(self.mac) terom@0: terom@0: def render_name (self) : terom@0: if self.name : terom@0: return self.name.decode('ascii', 'replace') terom@0: else : terom@0: return None terom@0: terom@0: def when (self) : terom@0: return '{frm} - {to}'.format( terom@0: frm = self.first_seen.strftime(self.DATE_FMT), terom@0: to = self.last_seen.strftime(self.DATE_FMT), terom@0: ) terom@0: terom@0: def dns (self) : terom@0: """ terom@0: Reverse-DNS lookup. terom@0: """ terom@0: terom@0: if not self.ip : terom@0: return None terom@0: terom@0: sockaddrs = set(sockaddr for family, socktype, proto, canonname, sockaddr in socket.getaddrinfo(self.ip, 0, 0, 0, 0, socket.AI_NUMERICHOST)) terom@0: terom@0: for sockaddr in sockaddrs : terom@0: try : terom@0: host, port = socket.getnameinfo(sockaddr, socket.NI_NAMEREQD) terom@0: except socket.gaierror : terom@0: continue terom@0: terom@0: return host terom@0: terom@0: def __unicode__ (self) : terom@0: return u"{host.ip} ({host.mac})".format(host=self) terom@0: terom@0: db.mapper(Host, db.dhcp_hosts, properties=dict( terom@0: id = db.dhcp_hosts.c.rowid, terom@0: #_mac = db.dhcp_hosts.c.mac, terom@0: #_name = db.dhcp_hosts.c.name, terom@0: )) terom@0: terom@5: class BaseHandler (web.Handler) : terom@5: HOST_ATTRS = { terom@5: 'id': Host.id, terom@5: 'ip': Host.ip, terom@5: 'mac': Host.mac, terom@5: 'name': Host.name, terom@5: 'seen': Host.last_seen, terom@5: } terom@0: terom@5: HOST_SORT = Host.last_seen.desc() terom@0: terom@5: def query (self) : terom@3: hosts = self.db.query(Host) terom@0: terom@3: # sort ? terom@9: self.sort = self.request.args.get('sort') terom@0: terom@9: if self.sort : terom@9: sort = self.HOST_ATTRS[self.sort] terom@3: else : terom@5: sort = self.HOST_SORT terom@0: terom@3: log.debug("sort: %s", sort) terom@3: terom@3: hosts = hosts.order_by(sort) terom@3: terom@5: # k terom@5: return hosts terom@5: terom@9: def render_hosts (self, hosts, title=None, filters=False) : terom@5: COLS = ( terom@10: #title sort filter class terom@10: ('IP', 'ip', 'ip', 'ip' ), terom@10: ('MAC', 'mac', 'mac', 'mac' ), terom@10: ('Hostname', 'name', False, False ), terom@10: ('Seen', 'seen', False, False ), terom@5: ) terom@4: terom@9: def url (**opts) : terom@11: args = dict() terom@11: terom@11: if filters : terom@11: args.update(filters) terom@11: terom@9: args.update(opts) terom@9: terom@9: return self.url(**args) terom@9: terom@9: table = html.table( terom@5: html.caption(title) if title else None, terom@5: html.thead( terom@5: html.tr( terom@9: html.th('#'), terom@9: ( terom@9: html.th( terom@9: html.a(href=url(sort=sort))(title) if sort else (title) terom@10: ) for title, sort, filter, class_ in COLS terom@9: ) terom@9: ), terom@9: html.tr(class_='filter')( terom@9: html.td( terom@9: html.input(type='submit', value=u'\u00BF'), terom@9: ), terom@9: ( terom@10: html.td(class_=class_)( terom@9: html.input(type='text', name=filter, value=filters.get(filter)) if filter else None terom@10: ) for title, sort, filter, class_ in COLS terom@9: ) terom@9: ) if filters is not False else None terom@5: ), terom@5: html.tbody( terom@5: html.tr(class_=('alternate' if i % 2 else None), id=host.id)( terom@11: html.th( terom@6: html.a(href=self.url(ItemHandler, id=host.id))( terom@5: '#' #host['rowid']) terom@5: ) terom@5: ), terom@5: html.td(class_='ip')( terom@8: html.a(href=self.url(ListHandler, ip=host.ip))( terom@6: host.ip terom@6: ) terom@5: ), terom@5: html.td(class_='mac')( terom@8: html.a(href=self.url(ListHandler, mac=host.mac))( terom@5: host.render_mac() terom@5: ) terom@5: ), terom@5: html.td(host.render_name()), terom@5: html.td(host.when()), terom@5: ) for i, host in enumerate(hosts) terom@5: ) terom@5: ) terom@9: terom@9: if filters is False : terom@9: return table terom@9: else : terom@9: return html.form(method='get', action=self.url())( terom@9: html.input(type='hidden', name='sort', value=self.sort), terom@9: table, terom@9: ) terom@4: terom@5: def render_host (self, host, hosts) : terom@5: attrs = ( terom@5: ('IP', host.ip), terom@5: ('MAC', host.mac), terom@5: ('Hostname', host.name), terom@5: ('DNS', host.dns()), terom@8: ('First seen', host.first_seen), terom@8: ('Last seen', host.last_seen), terom@5: ) terom@4: terom@5: return ( terom@5: html.h2('Host'), terom@5: html.dl( terom@5: (html.dt(title), html.dd(value)) for title, value in attrs terom@5: ), terom@5: terom@5: html.h2('Related'), terom@5: self.render_hosts(hosts), terom@5: terom@8: html.a(href=self.url(ListHandler))(html('«'), 'Back'), terom@5: ) terom@5: terom@5: class ItemHandler (BaseHandler) : terom@5: def process (self, id) : terom@5: self.hosts = self.query() terom@5: self.host = self.hosts.get(id) terom@5: terom@5: if not self.host : terom@5: raise web.NotFound("No such host: {id}".format(id=id)) terom@5: terom@5: self.hosts = self.hosts.filter((Host.ip == self.host.ip) | (Host.mac == self.host.mac)) terom@5: terom@5: def title (self) : terom@5: return u"DHCP Host: {self.host}".format(self=self) terom@5: terom@5: def render (self) : terom@5: return self.render_host(self.host, self.hosts) terom@5: terom@5: class ListHandler (BaseHandler) : terom@8: def process (self) : terom@9: self.hosts = self.query() terom@9: terom@9: # filter? terom@8: self.filters = {} terom@5: terom@8: for attr in self.HOST_ATTRS : terom@8: value = self.request.args.get(attr) terom@5: terom@8: if not value : terom@8: continue terom@8: terom@8: # preprocess terom@9: like = False terom@9: terom@9: if value.endswith('*') : terom@9: like = value.replace('*', '%') terom@9: terom@9: elif attr == 'mac' : terom@8: value = Host.normalize_mac(value) terom@8: terom@8: # filter terom@9: col = self.HOST_ATTRS[attr] terom@9: terom@9: if like : terom@9: filter = (col.like(like)) terom@9: else : terom@9: filter = (col == value) terom@9: terom@9: self.hosts = self.hosts.filter(filter) terom@8: self.filters[attr] = value terom@5: terom@5: def title (self) : terom@8: if self.filters : terom@8: return "DHCP Hosts: {filters}".format(filters=', '.join(self.filters.itervalues())) terom@8: else : terom@8: return "DHCP Hosts" terom@5: terom@5: def render (self) : terom@8: return ( terom@9: self.render_hosts(self.hosts, filters=self.filters), terom@8: terom@8: html.a(href=self.url())(html('«'), 'Back') if self.filters else None, terom@8: )