terom@64: """ terom@64: Full-text searching of logs terom@64: """ terom@64: terom@64: import datetime, calendar, pytz terom@64: terom@64: import HyperEstraier as hype terom@64: terom@64: import log_line terom@64: terom@64: class LogSearchIndex (object) : terom@64: """ terom@64: An index on the logs for a group of channels. terom@64: terom@64: This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). terom@64: terom@64: These log documents have the following attributes: terom@64: @uri - channel/date/line terom@64: @channel - channel id terom@64: @type - the LogType id terom@64: @timestamp - UTC timestamp terom@64: @source - nickname terom@64: terom@64: Each document then has a single line of data, which is the log message itself terom@64: """ terom@64: terom@64: def __init__ (self, path, mode='r') : terom@64: """ terom@64: Open the database, with the given mode: terom@64: r - read-only terom@64: w - read-write, create if not exists terom@64: a - read-write, do not create terom@64: * - read-write, truncate and create new terom@64: """ terom@64: terom@64: # mapping of { mode -> flags } terom@64: mode_to_flag = { terom@64: 'r': hype.Database.DBREADER, terom@64: 'w': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT, terom@65: 'a': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT, terom@64: '*': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC, terom@64: } terom@64: terom@64: # look up flags terom@64: flags = mode_to_flag[mode] terom@64: terom@64: # make instance terom@64: self.db = hype.Database() terom@64: terom@64: # open terom@64: if not self.db.open(path, flags) : terom@65: raise Exception("Index open failed: %s, mode=%s, flags=%#06x: %s" % (path, mode, flags, self.db.err_msg(self.db.error()))) terom@64: terom@64: def insert (self, channel, lines) : terom@64: """ terom@64: Adds a sequence of LogLines from the given LogChannel to the index terom@64: """ terom@64: terom@64: # validate the LogChannel terom@64: assert channel.name terom@64: terom@64: # iterate terom@64: for line in lines : terom@64: # validate the LogLine terom@64: assert line.offset terom@64: assert line.timestamp terom@64: terom@64: # create new document terom@64: doc = hype.Document() terom@64: terom@64: # line date terom@64: date = line.timestamp.date() terom@64: terom@64: # convert to UTC timestamp terom@64: utc_timestamp = calendar.timegm(line.timestamp.utctimetuple()) terom@64: terom@64: # ensure that it's not 1900 terom@64: assert date.year != 1900 terom@64: terom@64: # add URI terom@64: doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) terom@64: terom@64: # add channel id terom@64: doc.add_attr('@channel', channel.id) terom@64: terom@64: # add type terom@64: doc.add_attr('@type', str(line.type)) terom@64: terom@64: # add UTC timestamp terom@64: doc.add_attr('@timestamp', str(utc_timestamp)) terom@64: terom@64: # add source attribute? terom@64: if line.source : terom@64: doc.add_attr('@source', str(line.source)) terom@64: terom@64: # add data text terom@64: doc.add_text(line.data.encode('utf8')) terom@64: terom@64: # put terom@64: # XXX: what does this flag mean? terom@64: if not self.db.put_doc(doc, hype.Database.PDCLEAN) : terom@64: raise Exeception("Index put_doc failed") terom@64: terom@64: def search_cond (self, cond) : terom@64: """ terom@64: Search using a raw hype.Condition terom@64: """ terom@64: terom@64: # execute search, unused 'flags' arg stays zero terom@64: results = self.db.search(cond, 0) terom@64: terom@64: # iterate over the document IDs terom@64: for doc_id in results : terom@64: # load document, this throws an exception... terom@64: # option constants are hype.Database.GDNOATTR/GDNOTEXT terom@64: doc = self.db.get_doc(doc_id, 0) terom@64: terom@64: # load the attributes/text terom@64: channel_id = doc.attr('@channel') terom@64: type = int(doc.attr('@type')) terom@64: timestamp = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc) terom@64: source = doc.attr('@source') terom@65: data = doc.cat_texts().decode('utf8') terom@64: terom@66: # build+yield to as LogLine terom@66: # XXX: ignore channel_id for now terom@66: yield log_line.LogLine(None, type, timestamp, source, data) terom@66: terom@66: def search (self, options=None, channel=None, phrase=None, order=None, max=None, skip=None) : terom@66: """ terom@66: Search with flexible parameters terom@64: terom@66: options - bitmask of hype.Condition.* terom@66: channel - LogChannel object terom@66: phrase - the search query phrase terom@66: order - order attribute expression terom@66: max - number of results to return terom@66: skip - number of results to skip terom@64: """ terom@64: terom@64: # build condition terom@64: cond = hype.Condition() terom@66: terom@66: if options : terom@66: # set options terom@66: cond.set_options(options) terom@66: terom@66: if channel : terom@66: # add channel attribute terom@66: cond.add_attr("@channel STREQ %s" % (channel.id, )) terom@66: terom@66: if phrase : terom@66: # add phrase terom@66: cond.set_phrase(phrase) terom@66: terom@66: if order : terom@66: # set order terom@66: cond.set_order(order) terom@66: terom@66: if max : terom@66: # set max terom@66: cond.set_max(max) terom@64: terom@66: if skip : terom@66: # set skip terom@66: cond.set_skip(skip) terom@64: terom@66: # execute terom@66: return self.search_cond(cond) terom@64: terom@66: def search_simple (self, channel, query, count=None, offset=None) : terom@66: """ terom@66: Search for lines from the given channel for the given simple query terom@66: """ terom@66: terom@66: # use search(), backwards terom@66: results = list(self.search( terom@66: # simplified phrase terom@66: options = hype.Condition.SIMPLE, terom@64: terom@66: # specific channel terom@66: channel = channel, terom@66: terom@66: # given phrase terom@66: phrase = query, terom@66: terom@66: # order by timestamp terom@66: order = "@timestamp NUMD", terom@66: terom@66: # count/offset terom@66: max = count, terom@66: skip = offset, terom@66: )) terom@66: terom@66: # reverse terom@66: return reversed(results) terom@66: