pvl/login/ssl.py
author Tero Marttila <terom@paivola.fi>
Tue, 14 Jan 2014 23:15:36 +0200
changeset 375 df3bf49634a1
parent 373 6beb06b59ee6
permissions -rw-r--r--
pvl.login.server: separate redirect/refresh'd step for cert download to display html first; fix set-cookie quoting for werkzeug 0.9
# encoding: utf-8

import base64
import datetime
import hashlib
import os
import os.path
import string

import pvl.invoke

import logging; log = logging.getLogger('pvl.login.ssl')

class Error (Exception) :
    pass

class UsersCA (object) :
    OPENSSL = '/usr/bin/openssl'

    SIGN_DAYS = 1

    VALID_USER = set(string.letters + string.digits + '-.')
    
    O = u"Päivölän Kansanopisto"
    OU = u"People"
    DC = ('paivola', 'fi')

    def __init__ (self, ca, users) :
        self.ca = ca
        self.users = users

        self.ca_config = os.path.join(ca, 'openssl.cnf')

    def sign_spkac (self, out, spkac, days=SIGN_DAYS) :
        """
            Sign given request file (path).

            Creates the given output file (path). Empty file on errors..
        """

        pvl.invoke.invoke(self.OPENSSL, ('ca',
                '-config', self.ca_config,
                '-spkac', spkac,
                '-out', out,
                '-policy', 'policy_user',
                '-days', str(days),
                '-utf8',
            ),
            setenv={
                'CA':   self.ca,
            },
        )

    def generate_dn (self, uid, cn=None) :
        """
            Generate OpenSSL (rdn, value) pairs for given user.
        """

        if self.O :
            yield 'O', self.O

        elif self.DC :
            for index, dc in enumerate(self.DC, 1) :
                yield '{index}.DC'.format(index=index), dc
        
        yield 'OU', self.OU
         
        yield 'UID', uid 
        
        if cn :
            yield 'CN', cn

    def write_spkac (self, path, spkac, dn) :
        """
            Write out a spkac file to the given path, containing the given base64-encoded spkac and DN.
        """

        # roundtrip the spkac for consistent formatting
        spkac = base64.b64encode(base64.b64decode(spkac))

        file = open(path, 'w')

        file.write('SPKAC=')
        file.write(spkac)
        file.write('\n')

        for rdn, value in dn :
            file.write(u'{rdn}={value}\n'.format(rdn=rdn, value=value).encode('utf-8'))
        
        file.close()

    def sign_user (self, user, spkac, userinfo=None) :
        """
            Sign given spkac string (base64-encoded) for given user.

            Returns a name for the signed cert.
        """

        if not set(user).issubset(self.VALID_USER) :
            raise Error("Invalid username: {user}".format(user=user))

        dir = os.path.join(self.users, user)

        if not os.path.exists(dir) :
            os.mkdir(dir)

        name = hashlib.sha1(user + spkac).hexdigest()
        spkac_file = os.path.join(dir, name) + '.spkac'
        cert_file = os.path.join(dir, name)
        tmp_file = os.path.join(dir, name) + '.tmp'
        
        # the req to sign
        if os.path.exists(spkac_file) :
            log.warning("spkac already exists: %s", spkac_file)
        else :
            log.info("%s: write spkac: %s", user, spkac_file)
            self.write_spkac(os.path.join(dir, name) + '.spkac', spkac, self.generate_dn(user, userinfo))
        
        # sign it
        if os.path.exists(cert_file) :
            log.warning("cert already exists: %s", cert_file)
            return name
        
        if os.path.exists(tmp_file) :
            log.warning("cleaning out previous tmp file: %s", tmp_file)
            os.unlink(tmp_file)

        log.info("%s: sign cert: %s", user, cert_file)
        self.sign_spkac(tmp_file, spkac_file)

        log.debug("%s: rename %s -> %s", user, tmp_file, cert_file)
        os.rename(tmp_file, cert_file)

        return name

    def open_cert (self, user, name) :
        """
            Return an opened cert file by username / cert name.
        """

        if not set(user).issubset(self.VALID_USER) :
            raise Error("Invalid username: {user}".format(user=user))

        path = os.path.join(self.users, user, name)

        if not os.path.exists(path) :
            raise Error("No cert found on server")

        return open(path)