pvl.login.server: separate redirect/refresh'd step for cert download to display html first; fix set-cookie quoting for werkzeug 0.9
# encoding: utf-8
import base64
import datetime
import hashlib
import os
import os.path
import string
import pvl.invoke
import logging; log = logging.getLogger('pvl.login.ssl')
class Error (Exception) :
pass
class UsersCA (object) :
OPENSSL = '/usr/bin/openssl'
SIGN_DAYS = 1
VALID_USER = set(string.letters + string.digits + '-.')
O = u"Päivölän Kansanopisto"
OU = u"People"
DC = ('paivola', 'fi')
def __init__ (self, ca, users) :
self.ca = ca
self.users = users
self.ca_config = os.path.join(ca, 'openssl.cnf')
def sign_spkac (self, out, spkac, days=SIGN_DAYS) :
"""
Sign given request file (path).
Creates the given output file (path). Empty file on errors..
"""
pvl.invoke.invoke(self.OPENSSL, ('ca',
'-config', self.ca_config,
'-spkac', spkac,
'-out', out,
'-policy', 'policy_user',
'-days', str(days),
'-utf8',
),
setenv={
'CA': self.ca,
},
)
def generate_dn (self, uid, cn=None) :
"""
Generate OpenSSL (rdn, value) pairs for given user.
"""
if self.O :
yield 'O', self.O
elif self.DC :
for index, dc in enumerate(self.DC, 1) :
yield '{index}.DC'.format(index=index), dc
yield 'OU', self.OU
yield 'UID', uid
if cn :
yield 'CN', cn
def write_spkac (self, path, spkac, dn) :
"""
Write out a spkac file to the given path, containing the given base64-encoded spkac and DN.
"""
# roundtrip the spkac for consistent formatting
spkac = base64.b64encode(base64.b64decode(spkac))
file = open(path, 'w')
file.write('SPKAC=')
file.write(spkac)
file.write('\n')
for rdn, value in dn :
file.write(u'{rdn}={value}\n'.format(rdn=rdn, value=value).encode('utf-8'))
file.close()
def sign_user (self, user, spkac, userinfo=None) :
"""
Sign given spkac string (base64-encoded) for given user.
Returns a name for the signed cert.
"""
if not set(user).issubset(self.VALID_USER) :
raise Error("Invalid username: {user}".format(user=user))
dir = os.path.join(self.users, user)
if not os.path.exists(dir) :
os.mkdir(dir)
name = hashlib.sha1(user + spkac).hexdigest()
spkac_file = os.path.join(dir, name) + '.spkac'
cert_file = os.path.join(dir, name)
tmp_file = os.path.join(dir, name) + '.tmp'
# the req to sign
if os.path.exists(spkac_file) :
log.warning("spkac already exists: %s", spkac_file)
else :
log.info("%s: write spkac: %s", user, spkac_file)
self.write_spkac(os.path.join(dir, name) + '.spkac', spkac, self.generate_dn(user, userinfo))
# sign it
if os.path.exists(cert_file) :
log.warning("cert already exists: %s", cert_file)
return name
if os.path.exists(tmp_file) :
log.warning("cleaning out previous tmp file: %s", tmp_file)
os.unlink(tmp_file)
log.info("%s: sign cert: %s", user, cert_file)
self.sign_spkac(tmp_file, spkac_file)
log.debug("%s: rename %s -> %s", user, tmp_file, cert_file)
os.rename(tmp_file, cert_file)
return name
def open_cert (self, user, name) :
"""
Return an opened cert file by username / cert name.
"""
if not set(user).issubset(self.VALID_USER) :
raise Error("Invalid username: {user}".format(user=user))
path = os.path.join(self.users, user, name)
if not os.path.exists(path) :
raise Error("No cert found on server")
return open(path)