--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/log_search.py Mon Feb 09 11:05:53 2009 +0200
@@ -0,0 +1,226 @@
+"""
+ Full-text searching of logs
+"""
+
+import datetime, calendar, pytz
+
+import HyperEstraier as hype
+
+import log_line
+
+class LogSearchIndex (object) :
+ """
+ An index on the logs for a group of channels.
+
+ This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
+
+ These log documents have the following attributes:
+ @uri - channel/date/line
+ @channel - channel id
+ @type - the LogType id
+ @timestamp - UTC timestamp
+ @source - nickname
+
+ Each document then has a single line of data, which is the log message itself
+ """
+
+ def __init__ (self, path, mode='r') :
+ """
+ Open the database, with the given mode:
+ r - read-only
+ w - read-write, create if not exists
+ a - read-write, do not create
+ * - read-write, truncate and create new
+ """
+
+ # mapping of { mode -> flags }
+ mode_to_flag = {
+ 'r': hype.Database.DBREADER,
+ 'w': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT,
+ 'a': hype.Database.DBREADER | hype.Database.DBWRITER,
+ '*': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC,
+ }
+
+ # look up flags
+ flags = mode_to_flag[mode]
+
+ # make instance
+ self.db = hype.Database()
+
+ # open
+ if not self.db.open(path, flags) :
+ raise Exception("Index open failed: %s" % (path, ))
+
+ def insert (self, channel, lines) :
+ """
+ Adds a sequence of LogLines from the given LogChannel to the index
+ """
+
+ # validate the LogChannel
+ assert channel.name
+
+ # iterate
+ for line in lines :
+ # validate the LogLine
+ assert line.offset
+ assert line.timestamp
+
+ # create new document
+ doc = hype.Document()
+
+ # line date
+ date = line.timestamp.date()
+
+ # convert to UTC timestamp
+ utc_timestamp = calendar.timegm(line.timestamp.utctimetuple())
+
+ # ensure that it's not 1900
+ assert date.year != 1900
+
+ # add URI
+ doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
+
+ # add channel id
+ doc.add_attr('@channel', channel.id)
+
+ # add type
+ doc.add_attr('@type', str(line.type))
+
+ # add UTC timestamp
+ doc.add_attr('@timestamp', str(utc_timestamp))
+
+ # add source attribute?
+ if line.source :
+ doc.add_attr('@source', str(line.source))
+
+ # add data text
+ doc.add_text(line.data.encode('utf8'))
+
+ # put
+ # XXX: what does this flag mean?
+ if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
+ raise Exeception("Index put_doc failed")
+
+ def search_cond (self, cond) :
+ """
+ Search using a raw hype.Condition
+ """
+
+ # execute search, unused 'flags' arg stays zero
+ results = self.db.search(cond, 0)
+
+ # iterate over the document IDs
+ for doc_id in results :
+ # load document, this throws an exception...
+ # option constants are hype.Database.GDNOATTR/GDNOTEXT
+ doc = self.db.get_doc(doc_id, 0)
+
+ # load the attributes/text
+ channel_id = doc.attr('@channel')
+ type = int(doc.attr('@type'))
+ timestamp = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc)
+ source = doc.attr('@source')
+ data = doc.cat_texts()
+
+ # build+yield to (channel_id, LogLine) tuple
+ yield (channel_id, log_line.LogLine(None, type, timestamp, source, data))
+
+ def search_simple (self, channel, query) :
+ """
+ Search for lines from the given channel for the given simple query
+ """
+
+ # build condition
+ cond = hype.Condition()
+
+ # simplified phrase
+ cond.set_options(hype.Condition.SIMPLE)
+
+ # add channel attribute
+ cond.add_attr("@channel STREQ %s" % (channel.id, ))
+
+ # add phrase
+ cond.set_phrase(query)
+
+ # set order
+ cond.set_order("@timestamp NUMA")
+
+ # search with cond
+ for channel_id, line in self.search_cond(cond) :
+ assert channel_id == channel.id
+
+ yield line
+
+def cmd_load (options, channel_name, date) :
+ """
+ Loads the logs for a specific channel/date into the index
+ """
+
+ import channels
+
+ # open the LogSearchIndex
+ index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')
+
+ # open the channel
+ channel = channels.channel_list.lookup(channel_name)
+
+ # parse date
+ date = datetime.datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=channel.source.tz)
+
+ # load lines for date
+ lines = channel.source.get_date(date)
+
+ # insert
+ index.insert(channel, lines)
+
+def cmd_search (options, channel_name, query) :
+ """
+ Search the index for events on a specific channel with the given query
+ """
+
+ import channels
+
+ # open the LogSearchIndex
+ index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')
+
+ # open the channel
+ channel = channels.channel_list.lookup(channel_name)
+
+ # search
+ lines = index.search_simple(channel, query)
+
+ # display as plaintext
+ for line in options.formatter.format_txt(lines) :
+ print line
+
+if __name__ == '__main__' :
+ from optparse import OptionParser
+ import log_formatter
+
+ # define parser
+ parser = OptionParser(
+ usage = "%prog [options] <command> [ ... ]",
+ add_help_option = True,
+ )
+
+ # define command-line arguments
+ parser.add_option("-I", "--index", dest="index_path", help="Index database path", metavar="PATH", default="logs/index")
+ parser.add_option("--create", dest="create_index", help="Create index database", default=False)
+ parser.add_option("-f", "--formatter", dest="formatter_name", help="LogFormatter to use", default="irssi")
+ parser.add_option("-z", "--timezone", dest="tz_name", help="Timezone for output", metavar="TZ", default="UTC")
+
+ # parse
+ options, args = parser.parse_args()
+
+ # postprocess stuff
+ options.tz = pytz.timezone(options.tz_name)
+ options.formatter = log_formatter.by_name(options.formatter_name)(options.tz)
+
+ # pop command
+ command = args.pop(0)
+
+ # inspect
+ func = globals()['cmd_%s' % command]
+
+ # call
+ func(options, *args)