# encoding: utf-8
import datetime
import urlparse
import werkzeug
import werkzeug.urls
import pvl.web
import pvl.web.response
from pvl.login import pubtkt
from pvl.web import urls, html
import logging; log = logging.getLogger('pvl.login.server')
class Handler (pvl.web.Handler) :
# Bootstrap
DOCTYPE = 'html'
HTML_XMLNS = None
HTML_LANG = 'en'
CSS = (
'//netdna.bootstrapcdn.com/bootstrap/3.0.3/css/bootstrap.min.css',
)
JS = (
'//code.jquery.com/jquery.js',
'//netdna.bootstrapcdn.com/bootstrap/3.0.3/js/bootstrap.min.js',
)
STYLE = """
body {
padding-top: 2em;
text-align: center;
}
.container {
padding: 2em 1em;
text-align: left;
}
"""
def redirect (self, *url, **params) :
return pvl.web.response.redirect(self.url(*url, **params))
pubtkt = None
def init (self) :
self.alerts = []
def alert (self, type, alert) :
log.info(u"%s: %s", type, alert)
self.alerts.append((type, unicode(alert)))
def process_cookie (self) :
"""
Reverse the urlencoding used for the cookie...
"""
log.debug("cookies: %s", self.request.cookies)
cookie = self.request.cookies.get(self.app.cookie_name)
if not cookie :
return
log.debug("cookie %s=%s", self.app.cookie_name, cookie)
cookie = werkzeug.urls.url_unquote(cookie)
log.debug("cookie decoded: %s", cookie)
if not cookie :
return
try :
self.pubtkt = self.app.load(cookie)
except pubtkt.ParseError as ex :
self.alert('danger', ex)
except pubtkt.ExpiredError as ex :
self.pubtkt = ex.pubtkt
self.alert('warning', ex)
except pubtkt.VerifyError as ex :
self.pubtkt = ex.pubtkt
self.alert('danger', ex)
def process_back (self) :
self.server = None
self.back = urlparse.urlunparse((self.app.login_scheme, self.app.login_server, '/', '', '', ''))
back = self.request.args.get('back')
if back :
url = urlparse.urlparse(back, self.app.login_scheme)
if not self.app.login_scheme :
scheme = url.scheme
elif url.scheme == self.app.login_scheme :
scheme = url.scheme
else :
self.alert('info', "Using SSL for application URL")
scheme = self.app.login_scheme
self.server = self.app.check_server(url.hostname)
self.back = urlparse.urlunparse((scheme, self.server, url.path, url.params, url.query, url.fragment))
class Index (Handler) :
TITLE = u"Päivölä Network Login"
STYLE = Handler.STYLE + """
.pubtkt {
width: 30em;
margin: 1em auto;
}
.pubtkt form {
display: inline;
}
"""
def process (self) :
self.process_cookie()
if not self.pubtkt :
return self.redirect(Login)
def render_valid (self, valid) :
seconds = valid.seconds + valid.days * (24 * 60 * 60)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return "%2d:%02d:%02d" % (hours, minutes, seconds)
def render_status (self, pubtkt) :
valid = pubtkt.valid()
grace = pubtkt.grace()
if valid and grace :
return 'success'
elif valid and grace is False :
return 'warning'
elif valid :
return 'success'
else :
return 'danger'
def render_pubtkt_fields (self, pubtkt) :
"""
Yield (glyphicon, text) to render as fields for ticket.
"""
yield 'user', None, "User account", pubtkt.uid
valid = pubtkt.valid()
grace = pubtkt.grace()
if valid and grace :
valid = "{grace} ({valid})".format(valid=self.render_valid(valid), grace=self.render_valid(grace))
valid_status = 'success'
elif valid and grace is False :
valid = "Renewable ({valid})".format(valid=self.render_valid(valid))
valid_status = 'warning'
elif valid :
valid = "{valid}".format(valid=self.render_valid(valid))
valid_status = 'success'
else :
valid = "Expired"
valid_status = 'danger'
yield 'time', valid_status, "Remaining validity", valid
if pubtkt.cip :
yield 'cloud', None, "Network address", pubtkt.cip
if pubtkt.udata :
yield 'comment', None, "Associated data", pubtkt.udata
for token in pubtkt.tokens :
yield 'flag', None, "Access token", token
if pubtkt.bauth :
yield 'keys', None, "Authentication token", pubtkt.bauth
def render_pubtkt (self, pubtkt) :
status = self.render_status(pubtkt)
return html.div(class_='pubtkt panel panel-{status}'.format(status=status))(
html.div(class_='panel-heading')("Login: {pubtkt.uid}".format(pubtkt=self.pubtkt)),
html.ul(class_='list-group')(
html.li(class_='list-group-item {status}'.format(status=('alert-'+status if status else '')), title=title)(
html.span(class_='glyphicon glyphicon-{glyphicon}'.format(glyphicon=icon)) if icon else None,
info,
) for icon, status, title, info in self.render_pubtkt_fields(pubtkt)
),
html.div(class_='panel-footer')(
#html.div(class_='btn-toolbar', role='toolbar')(
(
html.form(action=self.url(Login), method='post', class_='form-inline')(
html.button(type='submit', class_='btn btn-success')("Renew"),
)
) if pubtkt.valid() else (
html.form(action=self.url(Login), method='get', class_='form-inline')(
html.button(type='submit', class_='btn btn-info')("Login"),
),
),
html.form(action=self.url(Logout), method='post', class_='form-inline pull-right')(
html.button(type='submit', class_='btn btn-warning')("Logout"),
),
#),
),
)
def render_info (self) :
for type, alert in self.alerts :
yield html.div(class_='alert alert-{type}'.format(type=type))(alert)
if self.pubtkt :
yield self.render_pubtkt(self.pubtkt)
def render (self) :
return html.div(class_='container')(
self.render_info(),
)
class Login (Handler) :
TITLE = "Login"
STYLE = Handler.STYLE + """
form#login {
max-width: 50%;
padding: 1em;
margin: 0 auto;
}
"""
def process (self) :
self.process_cookie()
try :
self.process_back()
except pubtkt.Error as ex :
self.alert('danger', ex)
if self.request.method == 'POST' :
username = self.request.form.get('username')
password = self.request.form.get('username')
if username and password :
# preprocess
username = username.strip().lower()
try :
self.pubtkt = self.app.auth(username, password)
except pubtkt.Error as ex :
self.auth_errors = ex
elif self.pubtkt and self.pubtkt.valid() :
# renew
self.app.renew(self.pubtkt)
else :
return
# browsers seem to be very particular about quoting ;'s in cookie values...
# this follows PHP's setcookie() encoding...
cookie = werkzeug.urls.url_quote(self.app.sign(self.pubtkt))
# redirect with cookie
response = pvl.web.response.redirect(self.back)
response.set_cookie(self.app.cookie_name, cookie,
domain = self.app.cookie_domain,
secure = self.app.cookie_secure,
httponly = self.app.cookie_httponly,
)
return response
def render (self) :
if self.pubtkt :
username = self.pubtkt.uid
else :
username = None
domain = self.app.login_domain
if 'logout' in self.request.args :
self.alert('info', "You have been logged out.")
return html.div(class_='container')(
html.form(action=self.url(back=self.back), method='POST', id='login')(
(
html.div(class_='alert alert-{alert}'.format(alert=type))(alert)
for type, alert in self.alerts
),
html.fieldset(
html.legend(
(
"Login @ ",
html.a(href=self.back)(self.server),
) if self.server else (
"Login"
)
),
html.div(class_='form-group')(
html.div(class_='input-group')(
html.label(for_='username', class_='sr-only')("Username"),
html.input(name='username', type='text', class_='form-control', placeholder="username", required=True, autofocus=True, value=username),
html.span(class_='input-group-addon')("@{domain}".format(domain=domain)),
),
html.label(for_='password', class_='sr-only')("Password"),
html.input(name='password', type='password', class_='form-control', placeholder="Password", required=True),
),
html.button(type='submit', class_='btn btn-primary')("Login"),
)
)
)
class Logout (Handler) :
TITLE = "Logout"
def process (self) :
self.process_cookie()
if not self.pubtkt :
return self.redirect(Login)
if self.request.method == 'POST' :
response = pvl.web.response.redirect(self.url(Login, logout=1))
response.set_cookie(self.app.cookie_name, '',
expires = 0,
domain = self.app.cookie_domain,
secure = self.app.cookie_secure,
httponly = self.app.cookie_httponly,
)
return response
def render (self) :
return html.div(class_='container')(
html.form(action=self.url(), method='post')(
html.fieldset(
html.legend("Logout {pubtkt.uid}".format(pubtkt=self.pubtkt)),
html.button(type='submit', class_='btn btn-warning')("Logout"),
)
)
)
class LoginApplication (pvl.web.Application) :
URLS = urls.Map((
urls.rule('/', Index),
urls.rule('/login', Login),
urls.rule('/logout', Logout),
))
PUBLIC_KEY = 'etc/login/public.pem'
PRIVATE_KEY = 'etc/login/private.pem'
login_domain = 'test.paivola.fi'
login_server = 'login.test.paivola.fi'
login_valid = datetime.timedelta(seconds=60)
login_grace = datetime.timedelta(seconds=30)
login_scheme = 'https'
cookie_name = 'auth_pubtkt'
cookie_domain = 'test.paivola.fi'
cookie_secure = True
cookie_httponly = True
def __init__ (self, public_key=PUBLIC_KEY, private_key=PRIVATE_KEY, **opts) :
super(LoginApplication, self).__init__(**opts)
self.server_keys = pubtkt.ServerKeys.config(
public_key = public_key,
private_key = private_key,
)
def check_server (self, server) :
"""
Check that the given target server is valid.
"""
server = server.lower()
if server == self.login_domain or server.endswith('.' + self.login_domain) :
return server
else :
raise pubtkt.ServerError("Target server is not covered by our authentication domain: {domain}".format(domain=self.login_domain))
def load (self, cookie) :
"""
Load a pubtkt from a cookie, and verify it.
"""
return pubtkt.PubTkt.load(cookie, self.server_keys.public)
def auth (self, username, password) :
"""
Perform authentication, returning a PubTkt, signed
"""
return pubtkt.PubTkt.new(username,
valid = self.login_valid,
grace = self.login_grace,
)
def sign (self, pubtkt) :
"""
Create a cookie by signing the given pubtkt.
"""
return pubtkt.sign(self.server_keys.private)
def renew (self, pubtkt) :
"""
Renew and re-sign the given pubtkt.
"""
pubtkt.renew(self.login_valid, self.login_grace)