terom@6: # coding: utf-8 terom@6: """ terom@6: Order data model/view/handler terom@6: """ terom@6: terom@15: from svv.controllers import PageHandler, DocumentHandler terom@6: from svv.html import tags terom@6: from svv import database as db terom@15: from svv import pdf terom@6: terom@7: import datetime terom@9: import logging terom@11: import collections terom@6: terom@18: try : terom@18: # Python2.6 stdlib terom@18: import json terom@18: terom@18: except ImportError : terom@18: # simplejson, API should be similar terom@18: import simplejson as json terom@18: terom@9: log = logging.getLogger('svv.orders') terom@6: terom@9: class FormError (Exception) : terom@9: """ terom@9: A user-level error in a form field terom@9: """ terom@6: terom@11: def __init__ (self, field, value, error) : terom@9: """ terom@9: field - name of field with error terom@11: value - the errenous value in the form that we recieved it terom@11: may be None if it was the *lack* of a value that caused the issue terom@9: error - descriptive text for user terom@9: """ terom@6: terom@9: self.field = field terom@11: self.value = value terom@9: terom@9: super(FormError, self).__init__(error) terom@9: terom@15: class BaseForm (object) : terom@9: # any POST data we have processed, updated from process() terom@9: data = None terom@9: terom@9: def __init__ (self, app) : terom@9: """ terom@9: app - bind this form to the app state (db etc) terom@9: """ terom@9: terom@9: self.app = app terom@9: terom@11: # accumulated errors terom@11: self.errors = collections.defaultdict(list) terom@11: terom@9: def defaults (self) : terom@9: """ terom@9: Update our attributes with default values terom@9: """ terom@9: terom@15: raise NotImplementedError() terom@9: terom@12: def fail_field (self, form_error, field=None) : terom@12: """ terom@12: Mark the field mentioned inside the given FormError as failed. terom@12: terom@12: form_error - the FormError to store terom@12: field - the name of the field to store the error under, if not the same as in form_error terom@12: """ terom@12: terom@12: field = field or form_error.field terom@12: terom@12: log.warn("Marking field %s as failed: %s", field, form_error) terom@12: terom@12: self.errors[field].append(form_error) terom@12: terom@9: def process_raw_field (self, name, default=None, required=None) : terom@9: """ terom@9: Process a generic incoming data field. terom@9: terom@9: default - value to return if no value was present terom@9: required - raise a FormError if no value present terom@9: terom@9: Returns the value as a str, or default terom@9: """ terom@9: terom@11: if name in self.data : terom@11: return self.data[name] terom@9: terom@9: elif required : terom@11: raise FormError(name, None, "Required field") terom@9: terom@9: else : terom@9: return default terom@9: terom@9: def process_string_field (self, name, default=None, required=None, strip=True) : terom@9: """ terom@9: Process a generic incoming string field. terom@9: terom@9: Trims extra whitespace from around the value, unless strip=False is given. terom@9: terom@9: Returns the value as unicode, or default. terom@9: """ terom@9: terom@9: value = self.process_raw_field(name, required=required) terom@9: terom@9: if value is None : terom@9: return default terom@9: terom@9: try : terom@9: # XXX: decode somehow, or can werkzeug handle that? terom@9: value = unicode(value) terom@9: terom@9: except UnicodeDecodeError : terom@11: raise FormError(name, value, "Failed to decode Unicode characters") terom@9: terom@9: if strip : terom@9: value = value.strip() terom@9: terom@9: return value terom@9: terom@9: def process_integer_field (self, name, default=None, required=None) : terom@9: """ terom@9: Process a generic incoming int field. terom@9: terom@9: Returns the value as int, or default. terom@9: """ terom@9: terom@9: value = self.process_raw_field(name, required=required) terom@9: terom@9: if value is None : terom@9: return default terom@9: terom@9: try : terom@9: return int(value) terom@9: terom@9: except ValueError : terom@11: raise FormError(name, value, "Must be a number") terom@9: terom@9: DATETIME_FORMAT = "%d.%m.%Y %H:%M" terom@9: terom@9: def process_datetime_field (self, name, default=None, required=None, format=DATETIME_FORMAT) : terom@9: """ terom@9: Process an incoming datetime field. terom@9: terom@9: Returns the value as datetime, or default. terom@9: """ terom@9: terom@9: value = self.process_raw_field(name, required=required) terom@9: terom@9: if value is None : terom@9: return default terom@9: terom@9: try : terom@9: return datetime.datetime.strptime(value, format) terom@9: terom@9: except ValueError, ex : terom@11: raise FormError(name, value, "Invalid date/time value: " + str(ex)) terom@9: terom@9: def process_multifield (self, table, id, fields) : terom@9: """ terom@9: Process a set of user-given field values for an object with an unique id, and some set of additional fields. terom@9: terom@9: If the id is given, look up the corresponding field values, and return those. terom@9: terom@9: If any of the fields are given, either look up a matching row, or create a new one, and return its id. terom@9: terom@9: Returns an (id_name, field_name, ...) N-tuple. terom@9: """ terom@9: terom@9: id_name, id_col, id_value = id terom@9: terom@9: if id_value : terom@9: # look up object from db terom@9: columns = [col for name, col, value in fields] terom@9: terom@9: sql = db.select(columns, (id_col == id_value)) terom@9: terom@11: for row in self.app.query(sql) : terom@9: # XXX: sanity-check row values vs our values terom@9: terom@9: # new values terom@9: fields = tuple( terom@9: (name, col, row[col]) for name, col, value in fields terom@9: ) terom@9: terom@9: # ok, just use the first one terom@9: break terom@9: terom@9: else : terom@9: # not found! terom@11: raise FormError(id_name, id_value, "Item selected does not seem to exist") terom@9: terom@9: log.info("Lookup %s=%d -> %s", id_name, id_value, dict((name, value) for name, col, value in fields)) terom@9: terom@9: elif any(value for name, col, value in fields) : terom@9: # look up identical object from db? terom@9: sql = db.select([id_col], db.and_(*[(col == value) for name, col, value in fields])) terom@9: terom@9: for row in self.app.query(sql) : terom@9: if id_value : terom@9: log.warn("Duplicate %s=%d for %s", id_name, id_value, dict((name, value) for name, col, value in fields)) terom@9: terom@9: # found object's id terom@9: id_value = row[id_col] terom@9: terom@9: log.info("Found %s -> %d", dict((name, value) for name, col, value in fields), id_value) terom@9: terom@9: # create new object? terom@9: if not id_value : terom@9: sql = db.insert(table).values(dict((col, value) for name, col, value in fields)) terom@9: terom@9: id_value, = self.app.insert(sql) terom@9: terom@9: log.info("Create %s -> %d", dict((name, value) for name, col, value in fields), id_value) terom@9: terom@9: else : terom@9: # not known terom@9: log.debug("No %s known for order...", id_name) terom@9: terom@9: # return full set of values terom@9: return (id_value, ) + tuple(value for name, col, value in fields) terom@9: terom@15: def process (self, data) : terom@15: """ terom@15: Bind ourselves to the given incoming POST data, and update our order field attributes. terom@15: terom@15: data - the submitted POST data as a MultiDict terom@15: terom@15: Returns True if all fields were processed without errors, False otherwise. terom@15: """ terom@15: terom@15: raise NotImplmentedError() terom@15: terom@15: return not self.errors terom@15: terom@15: def render_text_input (self, name, value=None, multiline=False) : terom@15: """ terom@15: Render HTML for a generic text field input. terom@15: terom@15: name - field name, as used for POST terom@15: value - default field value terom@15: multiline - use a multi-line