terom@64: """ terom@64: Full-text searching of logs terom@64: """ terom@64: terom@64: import datetime, calendar, pytz terom@87: import os.path terom@64: terom@64: import HyperEstraier as hype terom@64: terom@64: import log_line terom@64: terom@74: class LogSearchError (Exception) : terom@74: """ terom@74: General search error terom@74: """ terom@74: terom@74: pass terom@74: terom@74: class NoResultsFound (LogSearchError) : terom@74: """ terom@74: No results found terom@74: """ terom@74: terom@74: pass terom@74: terom@64: class LogSearchIndex (object) : terom@64: """ terom@64: An index on the logs for a group of channels. terom@64: terom@64: This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). terom@64: terom@64: These log documents have the following attributes: terom@87: @uri - channel/date/line terom@87: channel - channel code terom@87: type - the LogType id terom@87: timestamp - UTC timestamp terom@87: source_nickname - source nickname terom@64: terom@64: Each document then has a single line of data, which is the log message itself terom@64: """ terom@64: terom@87: def __init__ (self, channels, path, mode='r') : terom@64: """ terom@87: Open the database at the given path, with the given mode: terom@64: r - read-only terom@87: w - write, create if not exists terom@87: a - write, error if not exists terom@87: c - write, create, error if exists terom@87: * - write, create, truncate if exists terom@87: terom@87: Channels is the ChannelList. terom@64: """ terom@87: terom@87: # store terom@87: self.channels = channels terom@87: self.path = path terom@87: self.mode = mode terom@87: terom@87: # check it does not already exist? terom@87: if mode in 'c' and os.path.exists(path) : terom@87: raise LogSearchError("Index already exists: %s" % (path, )) terom@64: terom@64: # mapping of { mode -> flags } terom@64: mode_to_flag = { terom@64: 'r': hype.Database.DBREADER, terom@67: 'w': hype.Database.DBWRITER | hype.Database.DBCREAT, terom@67: 'a': hype.Database.DBWRITER, terom@87: 'c': hype.Database.DBWRITER | hype.Database.DBCREAT, terom@67: '*': hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC, terom@64: } terom@64: terom@64: # look up flags terom@64: flags = mode_to_flag[mode] terom@64: terom@64: # make instance terom@64: self.db = hype.Database() terom@64: terom@64: # open terom@64: if not self.db.open(path, flags) : terom@65: raise Exception("Index open failed: %s, mode=%s, flags=%#06x: %s" % (path, mode, flags, self.db.err_msg(self.db.error()))) terom@64: terom@64: def insert (self, channel, lines) : terom@64: """ terom@68: Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items terom@64: """ terom@64: terom@64: # validate the LogChannel terom@64: assert channel.name terom@68: terom@68: count = 0 terom@64: terom@64: # iterate terom@64: for line in lines : terom@64: # validate the LogLine terom@64: assert line.offset terom@64: assert line.timestamp terom@64: terom@64: # create new document terom@64: doc = hype.Document() terom@64: terom@64: # line date terom@64: date = line.timestamp.date() terom@64: terom@64: # convert to UTC timestamp terom@64: utc_timestamp = calendar.timegm(line.timestamp.utctimetuple()) terom@64: terom@64: # ensure that it's not 1900 terom@64: assert date.year != 1900 terom@64: terom@64: # add URI terom@64: doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) terom@64: terom@64: # add channel id terom@87: doc.add_attr('channel', channel.id) terom@64: terom@64: # add type terom@87: doc.add_attr('type', str(line.type)) terom@64: terom@64: # add UTC timestamp terom@87: doc.add_attr('timestamp', str(utc_timestamp)) terom@64: terom@64: # add source attribute? terom@64: if line.source : terom@87: source_nickname, source_username, source_hostname, source_chanflags = line.source terom@87: terom@87: # XXX: handle source_nickname is None terom@87: if not source_nickname is None : terom@87: source_nickname = str(source_nickname) terom@87: terom@87: doc.add_attr('source_nickname', source_nickname) terom@64: terom@87: # add data terom@87: if line.data : terom@87: doc.add_text(line.data.encode('utf8')) terom@64: terom@87: # put, "clean up dispensable regions of the overwritten document" terom@64: if not self.db.put_doc(doc, hype.Database.PDCLEAN) : terom@64: raise Exeception("Index put_doc failed") terom@68: terom@68: # count terom@68: count += 1 terom@68: terom@68: # return terom@68: return count terom@68: terom@64: def search_cond (self, cond) : terom@64: """ terom@74: Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results terom@64: """ terom@64: terom@64: # execute search, unused 'flags' arg stays zero terom@64: results = self.db.search(cond, 0) terom@64: terom@74: # no results? terom@74: if not results : terom@74: raise NoResultsFound() terom@74: terom@64: # iterate over the document IDs terom@64: for doc_id in results : terom@64: # load document, this throws an exception... terom@64: # option constants are hype.Database.GDNOATTR/GDNOTEXT terom@64: doc = self.db.get_doc(doc_id, 0) terom@64: terom@64: # load the attributes/text terom@87: channel = self.channels.lookup(doc.attr('channel')) terom@87: type = int(doc.attr('type')) terom@87: timestamp = datetime.datetime.fromtimestamp(int(doc.attr('timestamp')), pytz.utc) terom@87: source_nickname = doc.attr('source_nickname') terom@87: message = doc.cat_texts().decode('utf8') terom@64: terom@66: # build+yield to as LogLine terom@87: yield log_line.LogLine(channel, None, type, timestamp, (source_nickname, None, None, None), None, message) terom@66: terom@66: def search (self, options=None, channel=None, phrase=None, order=None, max=None, skip=None) : terom@66: """ terom@66: Search with flexible parameters terom@64: terom@66: options - bitmask of hype.Condition.* terom@66: channel - LogChannel object terom@66: phrase - the search query phrase terom@66: order - order attribute expression terom@66: max - number of results to return terom@66: skip - number of results to skip terom@64: """ terom@64: terom@64: # build condition terom@64: cond = hype.Condition() terom@66: terom@66: if options : terom@66: # set options terom@66: cond.set_options(options) terom@66: terom@66: if channel : terom@66: # add channel attribute terom@66: cond.add_attr("@channel STREQ %s" % (channel.id, )) terom@66: terom@66: if phrase : terom@66: # add phrase terom@66: cond.set_phrase(phrase) terom@66: terom@66: if order : terom@66: # set order terom@66: cond.set_order(order) terom@66: terom@66: if max : terom@66: # set max terom@66: cond.set_max(max) terom@64: terom@66: if skip : terom@66: # set skip terom@66: cond.set_skip(skip) terom@64: terom@66: # execute terom@66: return self.search_cond(cond) terom@64: terom@66: def search_simple (self, channel, query, count=None, offset=None) : terom@66: """ terom@66: Search for lines from the given channel for the given simple query terom@66: """ terom@66: terom@66: # use search(), backwards terom@66: results = list(self.search( terom@66: # simplified phrase terom@66: options = hype.Condition.SIMPLE, terom@64: terom@66: # specific channel terom@66: channel = channel, terom@66: terom@66: # given phrase terom@66: phrase = query, terom@66: terom@66: # order by timestamp terom@66: order = "@timestamp NUMD", terom@66: terom@66: # count/offset terom@66: max = count, terom@66: skip = offset, terom@66: )) terom@66: terom@66: # reverse terom@66: return reversed(results) terom@66: