diff -r 416560b82116 -r cdb6403c2498 log_search.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/log_search.py Mon Feb 09 11:05:53 2009 +0200 @@ -0,0 +1,226 @@ +""" + Full-text searching of logs +""" + +import datetime, calendar, pytz + +import HyperEstraier as hype + +import log_line + +class LogSearchIndex (object) : + """ + An index on the logs for a group of channels. + + This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). + + These log documents have the following attributes: + @uri - channel/date/line + @channel - channel id + @type - the LogType id + @timestamp - UTC timestamp + @source - nickname + + Each document then has a single line of data, which is the log message itself + """ + + def __init__ (self, path, mode='r') : + """ + Open the database, with the given mode: + r - read-only + w - read-write, create if not exists + a - read-write, do not create + * - read-write, truncate and create new + """ + + # mapping of { mode -> flags } + mode_to_flag = { + 'r': hype.Database.DBREADER, + 'w': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT, + 'a': hype.Database.DBREADER | hype.Database.DBWRITER, + '*': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC, + } + + # look up flags + flags = mode_to_flag[mode] + + # make instance + self.db = hype.Database() + + # open + if not self.db.open(path, flags) : + raise Exception("Index open failed: %s" % (path, )) + + def insert (self, channel, lines) : + """ + Adds a sequence of LogLines from the given LogChannel to the index + """ + + # validate the LogChannel + assert channel.name + + # iterate + for line in lines : + # validate the LogLine + assert line.offset + assert line.timestamp + + # create new document + doc = hype.Document() + + # line date + date = line.timestamp.date() + + # convert to UTC timestamp + utc_timestamp = calendar.timegm(line.timestamp.utctimetuple()) + + # ensure that it's not 1900 + assert date.year != 1900 + + # add URI + doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) + + # add channel id + doc.add_attr('@channel', channel.id) + + # add type + doc.add_attr('@type', str(line.type)) + + # add UTC timestamp + doc.add_attr('@timestamp', str(utc_timestamp)) + + # add source attribute? + if line.source : + doc.add_attr('@source', str(line.source)) + + # add data text + doc.add_text(line.data.encode('utf8')) + + # put + # XXX: what does this flag mean? + if not self.db.put_doc(doc, hype.Database.PDCLEAN) : + raise Exeception("Index put_doc failed") + + def search_cond (self, cond) : + """ + Search using a raw hype.Condition + """ + + # execute search, unused 'flags' arg stays zero + results = self.db.search(cond, 0) + + # iterate over the document IDs + for doc_id in results : + # load document, this throws an exception... + # option constants are hype.Database.GDNOATTR/GDNOTEXT + doc = self.db.get_doc(doc_id, 0) + + # load the attributes/text + channel_id = doc.attr('@channel') + type = int(doc.attr('@type')) + timestamp = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc) + source = doc.attr('@source') + data = doc.cat_texts() + + # build+yield to (channel_id, LogLine) tuple + yield (channel_id, log_line.LogLine(None, type, timestamp, source, data)) + + def search_simple (self, channel, query) : + """ + Search for lines from the given channel for the given simple query + """ + + # build condition + cond = hype.Condition() + + # simplified phrase + cond.set_options(hype.Condition.SIMPLE) + + # add channel attribute + cond.add_attr("@channel STREQ %s" % (channel.id, )) + + # add phrase + cond.set_phrase(query) + + # set order + cond.set_order("@timestamp NUMA") + + # search with cond + for channel_id, line in self.search_cond(cond) : + assert channel_id == channel.id + + yield line + +def cmd_load (options, channel_name, date) : + """ + Loads the logs for a specific channel/date into the index + """ + + import channels + + # open the LogSearchIndex + index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a') + + # open the channel + channel = channels.channel_list.lookup(channel_name) + + # parse date + date = datetime.datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=channel.source.tz) + + # load lines for date + lines = channel.source.get_date(date) + + # insert + index.insert(channel, lines) + +def cmd_search (options, channel_name, query) : + """ + Search the index for events on a specific channel with the given query + """ + + import channels + + # open the LogSearchIndex + index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a') + + # open the channel + channel = channels.channel_list.lookup(channel_name) + + # search + lines = index.search_simple(channel, query) + + # display as plaintext + for line in options.formatter.format_txt(lines) : + print line + +if __name__ == '__main__' : + from optparse import OptionParser + import log_formatter + + # define parser + parser = OptionParser( + usage = "%prog [options] [ ... ]", + add_help_option = True, + ) + + # define command-line arguments + parser.add_option("-I", "--index", dest="index_path", help="Index database path", metavar="PATH", default="logs/index") + parser.add_option("--create", dest="create_index", help="Create index database", default=False) + parser.add_option("-f", "--formatter", dest="formatter_name", help="LogFormatter to use", default="irssi") + parser.add_option("-z", "--timezone", dest="tz_name", help="Timezone for output", metavar="TZ", default="UTC") + + # parse + options, args = parser.parse_args() + + # postprocess stuff + options.tz = pytz.timezone(options.tz_name) + options.formatter = log_formatter.by_name(options.formatter_name)(options.tz) + + # pop command + command = args.pop(0) + + # inspect + func = globals()['cmd_%s' % command] + + # call + func(options, *args)