terom@64: """ terom@64: Full-text searching of logs terom@64: """ terom@64: terom@64: import datetime, calendar, pytz terom@87: import os.path terom@64: terom@64: import HyperEstraier as hype terom@64: terom@96: import log_line, utils, config terom@64: terom@74: class LogSearchError (Exception) : terom@74: """ terom@74: General search error terom@74: """ terom@74: terom@74: pass terom@74: terom@127: class SearchIndexError (LogSearchError) : terom@127: """ terom@127: Error manipulating the index terom@127: """ terom@127: terom@127: def __init__ (self, msg, db) : terom@127: """ terom@127: Build the error from the given message + HyperEstraier.Database terom@127: """ terom@127: terom@127: super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error()))) terom@127: terom@74: class NoResultsFound (LogSearchError) : terom@74: """ terom@74: No results found terom@74: """ terom@74: terom@74: pass terom@74: terom@64: class LogSearchIndex (object) : terom@64: """ terom@64: An index on the logs for a group of channels. terom@64: terom@64: This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). terom@64: terom@64: These log documents have the following attributes: terom@89: @uri - channel/date/line terom@89: channel - channel code terom@89: type - the LogType id terom@89: timestamp - UTC timestamp terom@89: source_nickname - source nickname terom@89: source_username - source username terom@89: source_hostname - source hostname terom@89: source_chanflags - source channel flags terom@89: target_nickname - target nickname terom@64: terom@89: Each document then has a single line of data, which is the log data message terom@64: """ terom@64: terom@87: def __init__ (self, channels, path, mode='r') : terom@64: """ terom@87: Open the database at the given path, with the given mode: terom@99: First char: terom@99: r - read, error if not exists terom@99: w - write, create if not exists terom@99: a - write, error if not exists terom@99: c - create, error if exists terom@99: terom@99: Additional chars: terom@99: trunc - truncate if exists terom@99: + - read as well as write terom@99: ? - non-blocking lock open, i.e. it fails if already open terom@87: terom@87: Channels is the ChannelList. terom@64: """ terom@87: terom@87: # store terom@87: self.channels = channels terom@87: self.path = path terom@87: self.mode = mode terom@87: terom@87: # check it does not already exist? terom@87: if mode in 'c' and os.path.exists(path) : terom@87: raise LogSearchError("Index already exists: %s" % (path, )) terom@64: terom@64: # mapping of { mode -> flags } terom@64: mode_to_flag = { terom@64: 'r': hype.Database.DBREADER, terom@67: 'w': hype.Database.DBWRITER | hype.Database.DBCREAT, terom@67: 'a': hype.Database.DBWRITER, terom@121: 'c': hype.Database.DBWRITER | hype.Database.DBCREAT, terom@64: } terom@64: terom@99: # flags to use, standard modes terom@99: flags = mode_to_flag[mode[0]] terom@99: terom@99: # mode-flags terom@99: if '?' in mode : terom@99: # non-blocking locking terom@99: flags |= hype.Database.DBLCKNB terom@64: terom@99: elif '+' in mode : terom@99: # read terom@99: flags |= hype.Database.DBREADER terom@99: terom@99: elif 'trunc' in mode : terom@99: # truncate. Dangerous! terom@99: flags |= hype.Database.DBTRUNC terom@99: terom@64: # make instance terom@64: self.db = hype.Database() terom@64: terom@64: # open terom@64: if not self.db.open(path, flags) : terom@127: raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db) terom@127: terom@127: def close (self) : terom@127: """ terom@127: Explicitly close the index, this is done automatically on del terom@127: """ terom@127: terom@127: if not self.db.close() : terom@127: raise SearchIndexError("Index close failed", self.db) terom@64: terom@64: def insert (self, channel, lines) : terom@64: """ terom@68: Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items terom@64: """ terom@64: terom@93: # count from zero terom@68: count = 0 terom@64: terom@64: # iterate terom@64: for line in lines : terom@93: # insert terom@93: self.insert_line(channel, line) terom@64: terom@68: # count terom@68: count += 1 terom@68: terom@68: # return terom@68: return count terom@68: terom@93: def insert_line (self, channel, line) : terom@93: """ terom@93: Adds a single LogLine for the given LogChannel to the index terom@93: """ terom@93: terom@93: # validate the LogChannel terom@93: assert channel.id terom@93: terom@93: # validate the LogLine terom@93: assert line.offset terom@93: assert line.timestamp terom@93: terom@93: # create new document terom@93: doc = hype.Document() terom@93: terom@93: # line date terom@93: date = line.timestamp.date() terom@93: terom@93: # ensure that it's not 1900 terom@93: assert date.year != 1900 terom@93: terom@93: # add URI terom@93: doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) terom@93: terom@93: # add channel id terom@93: doc.add_attr('channel', channel.id) terom@93: terom@93: # add type terom@93: doc.add_attr('type', str(line.type)) terom@93: terom@93: # add UTC timestamp terom@93: doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp))) terom@93: terom@93: # add source attribute? terom@93: if line.source : terom@93: source_nickname, source_username, source_hostname, source_chanflags = line.source terom@93: terom@93: if source_nickname : terom@93: doc.add_attr('source_nickname', source_nickname.encode('utf8')) terom@93: terom@93: if source_username : terom@93: doc.add_attr('source_username', source_username.encode('utf8')) terom@93: terom@93: if source_hostname : terom@93: doc.add_attr('source_hostname', source_hostname.encode('utf8')) terom@93: terom@93: if source_chanflags : terom@93: doc.add_attr('source_chanflags', source_chanflags.encode('utf8')) terom@93: terom@93: # add target attributes? terom@93: if line.target : terom@93: target_nickname = line.target terom@93: terom@93: if target_nickname : terom@93: doc.add_attr('target_nickname', target_nickname.encode('utf8')) terom@93: terom@93: # add data terom@93: if line.data : terom@93: doc.add_text(line.data.encode('utf8')) terom@93: terom@93: # put, "clean up dispensable regions of the overwritten document" terom@93: if not self.db.put_doc(doc, hype.Database.PDCLEAN) : terom@127: raise SearchIndexError("put_doc", self.db) terom@93: terom@64: def search_cond (self, cond) : terom@64: """ terom@74: Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results terom@64: """ terom@64: terom@64: # execute search, unused 'flags' arg stays zero terom@64: results = self.db.search(cond, 0) terom@64: terom@74: # no results? terom@74: if not results : terom@74: raise NoResultsFound() terom@74: terom@64: # iterate over the document IDs terom@64: for doc_id in results : terom@64: # load document, this throws an exception... terom@64: # option constants are hype.Database.GDNOATTR/GDNOTEXT terom@64: doc = self.db.get_doc(doc_id, 0) terom@64: terom@64: # load the attributes/text terom@87: channel = self.channels.lookup(doc.attr('channel')) terom@87: type = int(doc.attr('type')) terom@89: timestamp = utils.from_utc_timestamp(int(doc.attr('timestamp'))) terom@89: terom@89: # source terom@89: source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags')) terom@89: terom@89: # target terom@89: target = doc.attr('target_nickname') terom@89: terom@89: # message text terom@87: message = doc.cat_texts().decode('utf8') terom@64: terom@66: # build+yield to as LogLine terom@89: yield log_line.LogLine(channel, None, type, timestamp, source, target, message) terom@66: terom@89: def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) : terom@66: """ terom@66: Search with flexible parameters terom@64: terom@66: options - bitmask of hype.Condition.* terom@66: channel - LogChannel object terom@89: attrs - raw attribute expressions terom@66: phrase - the search query phrase terom@66: order - order attribute expression terom@66: max - number of results to return terom@66: skip - number of results to skip terom@64: """ terom@64: terom@64: # build condition terom@64: cond = hype.Condition() terom@66: terom@66: if options : terom@66: # set options terom@66: cond.set_options(options) terom@66: terom@66: if channel : terom@66: # add channel attribute terom@118: cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8')) terom@66: terom@89: if attrs : terom@89: # add attributes terom@89: for attr in attrs : terom@118: cond.add_attr(attr.encode('utf8')) terom@89: terom@66: if phrase : terom@66: # add phrase terom@118: cond.set_phrase(phrase.encode('utf8')) terom@66: terom@66: if order : terom@66: # set order terom@66: cond.set_order(order) terom@66: terom@66: if max : terom@66: # set max terom@66: cond.set_max(max) terom@64: terom@66: if skip : terom@66: # set skip terom@66: cond.set_skip(skip) terom@64: terom@66: # execute terom@66: return self.search_cond(cond) terom@64: terom@118: def search_simple (self, channel, query, count=None, offset=None, search_msg=True, search_nick=False) : terom@66: """ terom@118: Search for lines from the given channel for the given simple query. terom@118: terom@118: The search_* params define which attributes to search for (using fulltext search for the message, STROR for terom@118: attributes). terom@66: """ terom@66: terom@118: # search attributes terom@118: attrs = [] terom@118: terom@118: # nickname target query terom@118: if search_nick : terom@118: attrs.append("source_nickname STRINC %s" % query) terom@118: # attrs.append("target_nickname STRINC %s" % query) terom@118: terom@66: # use search(), backwards terom@66: results = list(self.search( terom@66: # simplified phrase terom@66: options = hype.Condition.SIMPLE, terom@64: terom@66: # specific channel terom@66: channel = channel, terom@66: terom@66: # given phrase terom@118: phrase = query if search_msg else None, terom@118: terom@118: # attributes defined above terom@118: attrs = attrs, terom@66: terom@89: # order by timestamp, descending (backwards) terom@89: order = "timestamp NUMD", terom@66: terom@66: # count/offset terom@66: max = count, terom@66: skip = offset, terom@66: )) terom@66: terom@66: # reverse terom@66: return reversed(results) terom@66: terom@89: def list (self, channel, date, count=None, skip=None) : terom@89: """ terom@89: List all indexed log items for the given UTC date terom@89: """ terom@89: terom@89: # start/end dates terom@89: dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0) terom@89: dt_end = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999) terom@89: terom@89: # search terom@89: return self.search( terom@89: # specific channel terom@89: channel = channel, terom@89: terom@89: # specific date range terom@89: attrs = [ terom@89: "timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end)) terom@89: ], terom@89: terom@89: # order correctly terom@89: order = "timestamp NUMA", terom@89: terom@89: # max count/offset terom@89: max = count, terom@89: skip = skip terom@89: ) terom@96: terom@96: def get_index () : terom@96: """ terom@96: Returns the default read-only index, suitable for searching terom@96: """ terom@96: terom@127: # XXX: no caching, just open it every time terom@127: _index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r') terom@96: terom@96: # return terom@96: return _index terom@96: