terom@7: """ terom@7: WSGI HTTP utility code terom@7: """ terom@7: terom@7: # for utility functions terom@7: import cgi terom@7: terom@7: # for header handling terom@7: import wsgiref.headers terom@7: terom@8: # for path handling terom@8: import os.path terom@8: terom@73: def request_url (env) : terom@73: """ terom@73: Attempt to reconstruct the URL of the given request environment. terom@73: terom@73: Works best when env is a WSGI-compliant env dict terom@73: """ terom@73: terom@73: # HTTP/HTTPs scheme terom@73: scheme = env.get('wsgi.url_scheme', '[???]') terom@73: terom@73: # the host terom@73: host = env.get('HTTP_HOST', '[???]') terom@73: terom@73: # the path terom@73: path = env.get('REQUEST_URI', '[???]') terom@73: terom@73: # return terom@73: return "%s://%s%s" % (scheme, host, path) terom@73: terom@7: class Request (object) : terom@7: """ terom@7: HTTP Request with associated metadata terom@7: """ terom@7: terom@7: def __init__ (self, env) : terom@7: """ terom@7: Parse env data terom@7: """ terom@7: terom@7: # store env terom@7: self.env = env terom@7: terom@7: # get the querystring terom@7: self.arg_str = env.get('QUERY_STRING', '') terom@7: terom@7: # parse query args terom@73: self.arg_dict = cgi.parse_qs(self.arg_str, keep_blank_values=True) terom@55: terom@55: # load post data? terom@55: if self.is_post() : terom@55: # content-type of post data terom@55: content_type = self.env.get('CONTENT_TYPE', 'application/x-www-form-urlencoded') terom@55: terom@55: # valid content-type? terom@55: # XXX: how to handle errors? terom@55: assert any(content_type.startswith(x) for x in ( terom@55: 'application/x-www-form-urlencoded', terom@55: 'multipart/form-data' terom@55: )) terom@55: terom@55: # input stream terom@55: input = self.env['wsgi.input'] terom@55: terom@55: # use cgi.FieldStorage to parse this terom@73: self.post_data = cgi.FieldStorage(fp=input, environ=self.env, keep_blank_values=True) terom@73: terom@55: else : terom@55: # no post data terom@55: self.post_data = None terom@42: terom@42: @property terom@42: def site_host (self) : terom@42: """ terom@42: Returns the site's hostname (DNS name) terom@42: """ terom@42: terom@42: return self.env['HTTP_HOST'] terom@42: terom@42: @property terom@42: def site_root (self) : terom@8: """ terom@8: Returns the URL path to the requested script's directory with no trailing slash, i.e. terom@8: terom@8: / -> terom@8: /foo.cgi -> terom@8: /foo/bar.cgi -> /foo terom@8: """ terom@8: terom@14: return os.path.dirname(self.env['SCRIPT_NAME']).rstrip('/') terom@8: terom@73: def _normalize_path (self, path) : terom@73: """ terom@73: Normalizes an URL path to remove back-references, but keep any trailing-slash indicator terom@73: """ terom@73: terom@73: # keep trailing / postfix terom@73: path_postfix = '/' if path.endswith('/') else '' terom@73: terom@73: # avoid nasty '.' paths terom@73: if path : terom@73: return os.path.normpath(path) + path_postfix terom@73: terom@73: else : terom@73: return '' terom@73: terom@42: @property terom@42: def page_prefix (self) : terom@10: """ terom@73: Returns the URL path root for page URLs, based on REQUEST_URI with PATH_INFO removed. This will have a terom@73: a preceeding slash, but no trailing slash, unless it's empty: terom@10: terom@10: / -> terom@10: /foo.cgi -> /foo.cgi terom@10: /foo.cgi/index -> /foo.cgi terom@73: /foo.cgi/quux/bar/ -> /foo.cgi terom@10: /quux/foo.cgi/bar -> /quux/foo.cgi terom@10: /bar -> terom@10: """ terom@10: terom@73: # request uri path without the query string terom@73: request_path = self._normalize_path(self.env.get('REQUEST_URI', '').split('?', 1)[0]) terom@10: terom@10: # path info terom@10: page_name = self.get_page_name() terom@10: terom@10: # special-case for empty page_name terom@10: if not page_name : terom@73: return request_path.rstrip('/') terom@10: terom@10: # sanity-check terom@10: assert request_path.endswith(page_name) terom@10: terom@10: # trim terom@10: return request_path[:-len(page_name)].rstrip('/') terom@42: terom@8: def get_page_name (self) : terom@8: """ terom@73: Returns the requested page path with no leading slash, but preserved trailing slash: terom@8: terom@8: /foo.cgi -> terom@8: /foo.cgi/ -> terom@8: /foo.cgi/bar -> bar terom@8: /foo.cgi/quux/ -> quux/ terom@73: /foo/../ -> foo/ terom@8: """ terom@8: terom@73: # the normalized PATH_INFO terom@73: return self._normalize_path(self.env.get('PATH_INFO')).lstrip('/') terom@42: terom@49: def get_arg (self, name, default=None) : terom@49: """ terom@73: Get a single value for an argument with the given key, or the default if missing. terom@73: terom@73: This evaluates valueless query args (?foo&bar=) as empty strings. terom@49: """ terom@49: terom@49: if name in self.arg_dict : terom@49: return self.arg_dict[name][0] terom@49: terom@49: else : terom@49: return default terom@49: terom@42: def get_args (self) : terom@42: """ terom@42: Iterate over all available (key, value) pairs from the query string terom@42: """ terom@42: terom@73: return cgi.parse_qsl(self.arg_str, keep_blank_values=True) terom@55: terom@55: def is_post (self) : terom@55: """ terom@55: Is this a POST request? terom@55: """ terom@55: terom@55: # just check REQUEST_METHOD terom@55: return (self.env['REQUEST_METHOD'].upper() == 'POST') terom@55: terom@69: class _nodefault : pass terom@69: def get_post (self, name, default=_nodefault) : terom@55: """ terom@69: Get the value of the given POST field. terom@69: terom@69: If the optional `default` arg is given, it is returned if the key is not found terom@55: """ terom@55: terom@55: # sanity-check terom@73: assert self.post_data is not None terom@55: terom@69: if name in self.post_data : terom@69: # return the FieldStorage value terom@69: return self.post_data[name].value terom@69: terom@69: elif default != self._nodefault : terom@69: # return the default value terom@69: return default terom@69: terom@69: else : terom@69: # fail terom@69: raise KeyError(name) terom@7: terom@7: class Response (object) : terom@7: """ terom@7: HTTP Response with headers and data terom@7: """ terom@7: terom@25: def __init__ (self, data, content_type='text/html', status='200 OK', charset='UTF-8') : terom@7: """ terom@57: Create the response. The Content-type header is built from the given values. terom@57: terom@57: The given data must be a string-like object, which will be encoded with the given charset, or None, terom@57: whereupon an empty response body will be sent. terom@57: terom@57: The content_type argument can also be forced to None to not send a Content-type header (e.g. for terom@57: redirects). terom@57: terom@57: The charset can be given as None to not encode the output (for binary data). terom@7: """ terom@7: terom@7: # store info terom@7: self.status = status terom@7: self.data = data terom@7: self.charset = charset terom@7: terom@7: # headers terom@7: self.headers = wsgiref.headers.Headers([]) terom@7: terom@7: # add Content-type header? terom@7: if content_type : terom@7: self.add_header('Content-type', content_type, charset=charset) terom@7: terom@7: def add_header (self, name, value, **params) : terom@7: """ terom@7: Add response header with the given name/value, plus option params terom@7: terom@7: XXX: uses the wsgiref.headers code, not sure how that behaves re multiple headers with the same name, etc terom@7: """ terom@7: terom@7: self.headers.add_header(name, value, **params) terom@7: terom@7: def get_status (self) : terom@7: """ terom@7: Returns response status string (XXX Foo) terom@7: """ terom@7: terom@7: return self.status terom@7: terom@7: def get_headers (self) : terom@7: """ terom@7: Returns the list of header (name, value) pairs terom@7: """ terom@7: terom@7: return self.headers.items() terom@7: terom@7: def get_data (self) : terom@7: """ terom@7: Returns the response data - as an encoded string terom@7: """ terom@7: terom@7: if self.data : terom@57: if self.charset : terom@57: return self.data.encode(self.charset) terom@7: terom@57: else : terom@57: return self.data terom@57: terom@7: else : terom@7: return '' terom@7: terom@7: class ErrorResponse (Response) : terom@7: """ terom@7: A response with an error code / message terom@7: """ terom@7: terom@8: def __init__ (self, status, message, details=None) : terom@7: """ terom@7: Build a plain error message response with the given status/message terom@8: terom@8: @param status HTTP status code terom@8: @param message short message to describe errors terom@8: @param details optional details, plaintext terom@7: """ terom@7: terom@7: data = """\ terom@7: %(title)s terom@7:

%(title)s

terom@7:

%(message)s

terom@8: %(details)s terom@7: terom@7: """ % dict( terom@7: title = status, terom@8: message = message, terom@8: details = '
%s
' % details if details else '' terom@7: ) terom@7: terom@7: super(ErrorResponse, self).__init__(data, status=status) terom@7: terom@7: class ResponseError (Exception) : terom@7: """ terom@7: An exception that results in a specfic 4xx ErrorResponse message to the client terom@7: """ terom@7: terom@8: def __init__ (self, message, status='400 Bad Request', details=None) : terom@7: self.status = status terom@7: self.message = message terom@8: self.details = details terom@7: terom@7: super(ResponseError, self).__init__(message) terom@7: terom@7: def get_response (self) : terom@8: return ErrorResponse(self.status, self.message, self.details) terom@7: terom@7: class Redirect (Response) : terom@7: """ terom@7: Redirect response terom@7: """ terom@7: terom@7: def __init__ (self, url) : terom@7: """ terom@7: Redirect to given *absolute* URL terom@7: """ terom@7: terom@7: # no content-type or data terom@7: super(Redirect, self).__init__(None, content_type=None, status='302 Found') terom@7: terom@7: # add Location: header terom@7: self.add_header("Location", url) terom@7: terom@7: