http.py
author Tero Marttila <terom@fixme.fi>
Mon, 16 Feb 2009 19:02:59 +0200
changeset 77 bef7196f7682
parent 73 1554d3d083b8
permissions -rw-r--r--
add tree_parse test and fix treeparse to handle other than filesystem paths
"""
    WSGI HTTP utility code
"""

# for utility functions
import cgi

# for header handling
import wsgiref.headers

# for path handling
import os.path

def request_url (env) :
    """
        Attempt to reconstruct the URL of the given request environment.

        Works best when env is a WSGI-compliant env dict
    """
    
    # HTTP/HTTPs scheme
    scheme = env.get('wsgi.url_scheme', '[???]')

    # the host
    host = env.get('HTTP_HOST', '[???]')

    # the path
    path = env.get('REQUEST_URI', '[???]')

    # return
    return "%s://%s%s" % (scheme, host, path)

class Request (object) :
    """
        HTTP Request with associated metadata
    """

    def __init__ (self, env) :
        """
            Parse env data
        """

        # store env
        self.env = env

        # get the querystring
        self.arg_str = env.get('QUERY_STRING', '')

        # parse query args
        self.arg_dict = cgi.parse_qs(self.arg_str, keep_blank_values=True)

        # load post data?
        if self.is_post() :
            # content-type of post data
            content_type = self.env.get('CONTENT_TYPE', 'application/x-www-form-urlencoded')
            
            # valid content-type?
            # XXX: how to handle errors?
            assert any(content_type.startswith(x) for x in (
                'application/x-www-form-urlencoded',
                'multipart/form-data'
            ))

            # input stream
            input = self.env['wsgi.input']

            # use cgi.FieldStorage to parse this
            self.post_data = cgi.FieldStorage(fp=input, environ=self.env, keep_blank_values=True)

        else :
            # no post data
            self.post_data = None
 
    @property
    def site_host (self) :
        """
            Returns the site's hostname (DNS name)
        """
        
        return self.env['HTTP_HOST']
  
    @property
    def site_root (self) :
        """
            Returns the URL path to the requested script's directory with no trailing slash, i.e.

            /               -> 
            /foo.cgi        -> 
            /foo/bar.cgi    -> /foo
        """

        return os.path.dirname(self.env['SCRIPT_NAME']).rstrip('/')
    
    def _normalize_path (self, path) :
        """
            Normalizes an URL path to remove back-references, but keep any trailing-slash indicator
        """

        # keep trailing / postfix
        path_postfix = '/' if path.endswith('/') else ''
        
        # avoid nasty '.' paths
        if path :
            return os.path.normpath(path) + path_postfix

        else :
            return ''

    @property
    def page_prefix (self) :
        """
            Returns the URL path root for page URLs, based on REQUEST_URI with PATH_INFO removed. This will have a
            a preceeding slash, but no trailing slash, unless it's empty:

            /                   -> 
            /foo.cgi            -> /foo.cgi
            /foo.cgi/index      -> /foo.cgi
            /foo.cgi/quux/bar/  -> /foo.cgi
            /quux/foo.cgi/bar   -> /quux/foo.cgi
            /bar                -> 
        """
        
        # request uri path without the query string
        request_path = self._normalize_path(self.env.get('REQUEST_URI', '').split('?', 1)[0])

        # path info
        page_name = self.get_page_name()

        # special-case for empty page_name
        if not page_name :
            return request_path.rstrip('/')
        
        # sanity-check
        assert request_path.endswith(page_name)
        
        # trim
        return request_path[:-len(page_name)].rstrip('/')
    
    def get_page_name (self) :
        """
            Returns the requested page path with no leading slash, but preserved trailing slash:

            /foo.cgi        -> 
            /foo.cgi/       -> 
            /foo.cgi/bar    -> bar
            /foo.cgi/quux/  -> quux/
            /foo/../        -> foo/
        """
        
        # the normalized PATH_INFO
        return self._normalize_path(self.env.get('PATH_INFO')).lstrip('/')
    
    def get_arg (self, name, default=None) :
        """
            Get a single value for an argument with the given key, or the default if missing.

            This evaluates valueless query args (?foo&bar=) as empty strings.
        """

        if name in self.arg_dict :
            return self.arg_dict[name][0]

        else :
            return default

    def get_args (self) :
        """
            Iterate over all available (key, value) pairs from the query string
        """

        return cgi.parse_qsl(self.arg_str, keep_blank_values=True)
    
    def is_post (self) :
        """
            Is this a POST request?
        """

        # just check REQUEST_METHOD
        return (self.env['REQUEST_METHOD'].upper() == 'POST')
    
    class _nodefault : pass
    def get_post (self, name, default=_nodefault) :
        """
            Get the value of the given POST field.

            If the optional `default` arg is given, it is returned if the key is not found
        """
        
        # sanity-check
        assert self.post_data is not None
        
        if name in self.post_data :
            # return the FieldStorage value
            return self.post_data[name].value
        
        elif default != self._nodefault :
            # return the default value
            return default

        else :
            # fail
            raise KeyError(name)

class Response (object) :
    """
        HTTP Response with headers and data
    """

    def __init__ (self, data, content_type='text/html', status='200 OK', charset='UTF-8') :
        """
            Create the response. The Content-type header is built from the given values.
            
            The given data must be a string-like object, which will be encoded with the given charset, or None,
            whereupon an empty response body will be sent.

            The content_type argument can also be forced to None to not send a Content-type header (e.g. for
            redirects).

            The charset can be given as None to not encode the output (for binary data).
        """

        # store info
        self.status = status
        self.data = data
        self.charset = charset

        # headers
        self.headers = wsgiref.headers.Headers([])
        
        # add Content-type header?
        if content_type :
            self.add_header('Content-type', content_type, charset=charset)

    def add_header (self, name, value, **params) :
        """
            Add response header with the given name/value, plus option params

            XXX: uses the wsgiref.headers code, not sure how that behaves re multiple headers with the same name, etc
        """
        
        self.headers.add_header(name, value, **params)
    
    def get_status (self) :
        """
            Returns response status string (XXX Foo)
        """

        return self.status
    
    def get_headers (self) :
        """
            Returns the list of header (name, value) pairs
        """

        return self.headers.items()

    def get_data (self) :
        """
            Returns the response data - as an encoded string
        """

        if self.data :
            if self.charset :
                return self.data.encode(self.charset)

            else :
                return self.data
        
        else :
            return ''

class ErrorResponse (Response) :
    """
        A response with an error code / message
    """

    def __init__ (self, status, message, details=None) :
        """
            Build a plain error message response with the given status/message

            @param status HTTP status code
            @param message short message to describe errors
            @param details optional details, plaintext
        """

        data = """\
<html><head><title>%(title)s</title></head><body>
<h1>%(title)s</h1>
<p>%(message)s</p>
%(details)s
</body></html>
""" % dict(
            title       = status, 
            message     = message,
            details     = '<pre>%s</pre>' % details if details else ''
        )
            
        super(ErrorResponse, self).__init__(data, status=status)

class ResponseError (Exception) :
    """
        An exception that results in a specfic 4xx ErrorResponse message to the client
    """

    def __init__ (self, message, status='400 Bad Request', details=None) :
        self.status = status
        self.message = message
        self.details = details

        super(ResponseError, self).__init__(message)

    def get_response (self) :
        return ErrorResponse(self.status, self.message, self.details)

class Redirect (Response) :
    """
        Redirect response
    """

    def __init__ (self, url) :
        """
            Redirect to given *absolute* URL
        """
        
        # no content-type or data
        super(Redirect, self).__init__(None, content_type=None, status='302 Found')

        # add Location: header
        self.add_header("Location", url)