terom@0: from pvl.verkko import db, web terom@0: terom@0: from pvl.html import tags as html terom@0: terom@0: import socket # dns terom@0: terom@0: import logging; log = logging.getLogger('pvl.verkko.hosts') terom@0: terom@1: # XXX: this should actually be DHCPHost terom@0: class Host (object) : terom@0: DATE_FMT = '%Y%m%d' terom@0: terom@0: def __init__ (self, ip, mac, name=None) : terom@0: self.ip = ip terom@0: self.mac = mac terom@0: self.name = name terom@0: terom@0: def render_mac (self) : terom@0: if not self.mac : terom@0: return None terom@0: terom@0: elif len(self.mac) > (6 * 2 + 5) : terom@0: return u'???' terom@0: terom@0: else : terom@0: return unicode(self.mac) terom@0: terom@0: def render_name (self) : terom@0: if self.name : terom@0: return self.name.decode('ascii', 'replace') terom@0: else : terom@0: return None terom@0: terom@0: def when (self) : terom@0: return '{frm} - {to}'.format( terom@0: frm = self.first_seen.strftime(self.DATE_FMT), terom@0: to = self.last_seen.strftime(self.DATE_FMT), terom@0: ) terom@0: terom@0: def dns (self) : terom@0: """ terom@0: Reverse-DNS lookup. terom@0: """ terom@0: terom@0: if not self.ip : terom@0: return None terom@0: terom@0: sockaddrs = set(sockaddr for family, socktype, proto, canonname, sockaddr in socket.getaddrinfo(self.ip, 0, 0, 0, 0, socket.AI_NUMERICHOST)) terom@0: terom@0: for sockaddr in sockaddrs : terom@0: try : terom@0: host, port = socket.getnameinfo(sockaddr, socket.NI_NAMEREQD) terom@0: except socket.gaierror : terom@0: continue terom@0: terom@0: return host terom@0: terom@0: def __unicode__ (self) : terom@0: return u"{host.ip} ({host.mac})".format(host=self) terom@0: terom@0: db.mapper(Host, db.dhcp_hosts, properties=dict( terom@0: id = db.dhcp_hosts.c.rowid, terom@0: #_mac = db.dhcp_hosts.c.mac, terom@0: #_name = db.dhcp_hosts.c.name, terom@0: )) terom@0: terom@3: HOST_ATTRS = { terom@0: 'id': Host.id, terom@0: 'ip': Host.ip, terom@0: 'mac': Host.mac, terom@0: 'name': Host.name, terom@0: 'seen': Host.last_seen, terom@0: } terom@3: HOST_SORT = Host.last_seen.desc() terom@0: terom@0: def render_hosts (hosts, title=None) : terom@0: COLS = ( terom@0: #title sort terom@0: ('#', None), terom@0: ('IP', 'ip'), terom@0: ('MAC', 'mac'), terom@0: ('Hostname', 'name'), terom@0: ('Seen', 'seen'), terom@0: ) terom@0: terom@0: return html.table( terom@0: html.caption(title) if title else None, terom@0: html.thead( terom@0: html.tr( terom@0: html.th( terom@0: html.a(href='?sort={name}'.format(name=sort))(title) if sort else (title) terom@0: ) for title, sort in COLS terom@0: ) terom@0: ), terom@0: html.tbody( terom@0: html.tr(class_=('alternate' if i % 2 else None), id=host.id)( terom@0: html.td(class_='id')( terom@0: html.a(href='/hosts/{host.id}'.format(host=host))( terom@0: '#' #host['rowid']) terom@0: ) terom@0: ), terom@0: html.td(class_='ip')( terom@0: html.a(href='/hosts/ip/{host.ip}'.format(host=host))(host.ip) terom@0: ), terom@0: html.td(class_='mac')( terom@0: html.a(href='/hosts/mac/{host.mac}'.format(host=host))( terom@0: host.render_mac() terom@0: ) terom@0: ), terom@0: html.td(host.render_name()), terom@0: html.td(host.when()), terom@0: ) for i, host in enumerate(hosts) terom@0: ) terom@0: ) terom@0: terom@0: def render_host (host, hosts) : terom@0: title = 'DHCP Host: {host}'.format(host=host) terom@0: terom@0: attrs = ( terom@0: ('IP', host.ip), terom@0: ('MAC', host.mac), terom@0: ('Hostname', host.name), terom@0: ('DNS', host.dns()), terom@0: ) terom@0: terom@0: return ( terom@0: html.h2('Host'), terom@0: html.dl( terom@0: (html.dt(title), html.dd(value)) for title, value in attrs terom@0: ), terom@0: terom@0: html.h2('Related'), terom@0: render_hosts(hosts), terom@0: terom@1: html.a(href='/hosts')(html('«'), 'Back'), terom@0: ) terom@0: terom@3: class Handler (web.Handler) : terom@3: TITLE = "DHCP Hosts" terom@0: terom@3: def index (self) : terom@3: return render_hosts(self.hosts) terom@0: terom@3: def host (self, id) : terom@3: host = self.hosts.get(id) terom@0: terom@3: if not host : terom@3: raise web.NotFound("No such host: {id}".format(id=id)) terom@3: terom@3: hosts = self.hosts.filter((Host.ip == host.ip) | (Host.mac == host.mac)) terom@3: terom@3: # XXX terom@3: #self.title = "DHCP Host: {host}".format(host=unicode(host)) terom@3: terom@3: return render_host(host, hosts) terom@0: terom@3: def list (self, attr, value) : terom@0: # fake host terom@0: host = { 'ip': None, 'mac': None, 'name': None } terom@3: terom@3: if attr not in HOST_ATTRS : terom@3: raise web.BadRequest("Invalid attribute: {attr}".format(attr=attr)) terom@3: terom@0: host[attr] = value terom@0: terom@0: host = Host(**host) terom@0: terom@0: # query terom@3: attr = HOST_ATTRS[attr] terom@0: log.debug("%s == %s", attr, value) terom@0: terom@3: hosts = self.hosts.filter(attr == value) terom@0: terom@3: # XXX terom@3: #self.title = "DHCP Hosts: {value}".format(value=value) terom@0: terom@3: return render_host(host, hosts) terom@3: terom@3: def process (self) : terom@3: hosts = self.db.query(Host) terom@0: terom@3: # sort ? terom@3: sort = self.request.args.get('sort') terom@0: terom@3: if sort : terom@3: sort = HOST_ATTRS[sort] terom@3: else : terom@3: sort = HOST_SORT terom@0: terom@3: log.debug("sort: %s", sort) terom@3: terom@3: hosts = hosts.order_by(sort) terom@3: terom@3: # store terom@3: self.hosts = hosts terom@3: terom@3: def render (self) : terom@3: # index terom@3: if not self.path : terom@3: return self.index() terom@3: terom@3: # id terom@3: elif len(self.path) == 1 : terom@3: try : terom@3: id, = self.path terom@3: id = int(id) terom@3: except ValueError as ex : terom@3: raise web.BadRequest("Invalid host ID: {id}: {ex}".format(id=id, ex=ex)) terom@3: terom@3: return self.host(id) terom@3: terom@3: # query terom@3: elif len(self.path) == 2 : terom@3: attr, value = self.path terom@3: terom@3: return self.list(attr, value) terom@3: terom@3: else : terom@3: raise web.NotFound terom@3: