terom@64: """ terom@64: Full-text searching of logs terom@64: """ terom@64: terom@64: import datetime, calendar, pytz terom@64: terom@64: import HyperEstraier as hype terom@64: terom@64: import log_line terom@64: terom@64: class LogSearchIndex (object) : terom@64: """ terom@64: An index on the logs for a group of channels. terom@64: terom@64: This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). terom@64: terom@64: These log documents have the following attributes: terom@64: @uri - channel/date/line terom@64: @channel - channel id terom@64: @type - the LogType id terom@64: @timestamp - UTC timestamp terom@64: @source - nickname terom@64: terom@64: Each document then has a single line of data, which is the log message itself terom@64: """ terom@64: terom@64: def __init__ (self, path, mode='r') : terom@64: """ terom@64: Open the database, with the given mode: terom@64: r - read-only terom@64: w - read-write, create if not exists terom@64: a - read-write, do not create terom@64: * - read-write, truncate and create new terom@64: """ terom@64: terom@64: # mapping of { mode -> flags } terom@64: mode_to_flag = { terom@64: 'r': hype.Database.DBREADER, terom@64: 'w': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT, terom@65: 'a': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT, terom@64: '*': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC, terom@64: } terom@64: terom@64: # look up flags terom@64: flags = mode_to_flag[mode] terom@64: terom@64: # make instance terom@64: self.db = hype.Database() terom@64: terom@64: # open terom@64: if not self.db.open(path, flags) : terom@65: raise Exception("Index open failed: %s, mode=%s, flags=%#06x: %s" % (path, mode, flags, self.db.err_msg(self.db.error()))) terom@64: terom@64: def insert (self, channel, lines) : terom@64: """ terom@64: Adds a sequence of LogLines from the given LogChannel to the index terom@64: """ terom@64: terom@64: # validate the LogChannel terom@64: assert channel.name terom@64: terom@64: # iterate terom@64: for line in lines : terom@64: # validate the LogLine terom@64: assert line.offset terom@64: assert line.timestamp terom@64: terom@64: # create new document terom@64: doc = hype.Document() terom@64: terom@64: # line date terom@64: date = line.timestamp.date() terom@64: terom@64: # convert to UTC timestamp terom@64: utc_timestamp = calendar.timegm(line.timestamp.utctimetuple()) terom@64: terom@64: # ensure that it's not 1900 terom@64: assert date.year != 1900 terom@64: terom@64: # add URI terom@64: doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) terom@64: terom@64: # add channel id terom@64: doc.add_attr('@channel', channel.id) terom@64: terom@64: # add type terom@64: doc.add_attr('@type', str(line.type)) terom@64: terom@64: # add UTC timestamp terom@64: doc.add_attr('@timestamp', str(utc_timestamp)) terom@64: terom@64: # add source attribute? terom@64: if line.source : terom@64: doc.add_attr('@source', str(line.source)) terom@64: terom@64: # add data text terom@64: doc.add_text(line.data.encode('utf8')) terom@64: terom@64: # put terom@64: # XXX: what does this flag mean? terom@64: if not self.db.put_doc(doc, hype.Database.PDCLEAN) : terom@64: raise Exeception("Index put_doc failed") terom@64: terom@64: def search_cond (self, cond) : terom@64: """ terom@64: Search using a raw hype.Condition terom@64: """ terom@64: terom@64: # execute search, unused 'flags' arg stays zero terom@64: results = self.db.search(cond, 0) terom@64: terom@64: # iterate over the document IDs terom@64: for doc_id in results : terom@64: # load document, this throws an exception... terom@64: # option constants are hype.Database.GDNOATTR/GDNOTEXT terom@64: doc = self.db.get_doc(doc_id, 0) terom@64: terom@64: # load the attributes/text terom@64: channel_id = doc.attr('@channel') terom@64: type = int(doc.attr('@type')) terom@64: timestamp = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc) terom@64: source = doc.attr('@source') terom@65: data = doc.cat_texts().decode('utf8') terom@64: terom@64: # build+yield to (channel_id, LogLine) tuple terom@64: yield (channel_id, log_line.LogLine(None, type, timestamp, source, data)) terom@64: terom@64: def search_simple (self, channel, query) : terom@64: """ terom@64: Search for lines from the given channel for the given simple query terom@64: """ terom@64: terom@64: # build condition terom@64: cond = hype.Condition() terom@64: terom@64: # simplified phrase terom@64: cond.set_options(hype.Condition.SIMPLE) terom@64: terom@64: # add channel attribute terom@64: cond.add_attr("@channel STREQ %s" % (channel.id, )) terom@64: terom@64: # add phrase terom@64: cond.set_phrase(query) terom@64: terom@64: # set order terom@64: cond.set_order("@timestamp NUMA") terom@64: terom@64: # search with cond terom@64: for channel_id, line in self.search_cond(cond) : terom@64: assert channel_id == channel.id terom@64: terom@64: yield line terom@64: