log_search.py
changeset 64 cdb6403c2498
child 65 8b50694f841e
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/log_search.py	Mon Feb 09 11:05:53 2009 +0200
@@ -0,0 +1,226 @@
+"""
+    Full-text searching of logs
+"""
+
+import datetime, calendar, pytz
+
+import HyperEstraier as hype
+
+import log_line
+
+class LogSearchIndex (object) :
+    """
+        An index on the logs for a group of channels.
+
+        This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
+
+        These log documents have the following attributes:
+            @uri        - channel/date/line
+            @channel    - channel id
+            @type       - the LogType id
+            @timestamp  - UTC timestamp
+            @source     - nickname
+
+        Each document then has a single line of data, which is the log message itself
+    """
+
+    def __init__ (self, path, mode='r') :
+        """
+            Open the database, with the given mode:
+                r       - read-only
+                w       - read-write, create if not exists
+                a       - read-write, do not create
+                *       - read-write, truncate and create new
+        """
+        
+        # mapping of { mode -> flags }
+        mode_to_flag = {
+            'r':    hype.Database.DBREADER,
+            'w':    hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT,
+            'a':    hype.Database.DBREADER | hype.Database.DBWRITER,
+            '*':    hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC,
+        }
+
+        # look up flags
+        flags = mode_to_flag[mode]
+        
+        # make instance
+        self.db = hype.Database()
+        
+        # open
+        if not self.db.open(path, flags) :
+            raise Exception("Index open failed: %s" % (path, ))
+
+    def insert (self, channel, lines) :
+        """
+            Adds a sequence of LogLines from the given LogChannel to the index
+        """
+        
+        # validate the LogChannel
+        assert channel.name
+        
+        # iterate
+        for line in lines :
+            # validate the LogLine
+            assert line.offset
+            assert line.timestamp
+
+            # create new document
+            doc = hype.Document()
+
+            # line date
+            date = line.timestamp.date()
+
+            # convert to UTC timestamp
+            utc_timestamp = calendar.timegm(line.timestamp.utctimetuple())
+
+            # ensure that it's not 1900
+            assert date.year != 1900
+
+            # add URI
+            doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
+
+            # add channel id
+            doc.add_attr('@channel',    channel.id)
+
+            # add type
+            doc.add_attr('@type',       str(line.type))
+
+            # add UTC timestamp
+            doc.add_attr('@timestamp',  str(utc_timestamp))
+
+            # add source attribute?
+            if line.source :
+                doc.add_attr('@source', str(line.source))
+            
+            # add data text
+            doc.add_text(line.data.encode('utf8'))
+
+            # put
+            # XXX: what does this flag mean?
+            if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
+                raise Exeception("Index put_doc failed")
+    
+    def search_cond (self, cond) :
+        """
+            Search using a raw hype.Condition
+        """
+
+        # execute search, unused 'flags' arg stays zero
+        results = self.db.search(cond, 0)
+
+        # iterate over the document IDs
+        for doc_id in results :
+            # load document, this throws an exception...
+            # option constants are hype.Database.GDNOATTR/GDNOTEXT
+            doc = self.db.get_doc(doc_id, 0)
+
+            # load the attributes/text
+            channel_id  = doc.attr('@channel')
+            type        = int(doc.attr('@type'))
+            timestamp   = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc)
+            source      = doc.attr('@source')
+            data        = doc.cat_texts()
+
+            # build+yield to (channel_id, LogLine) tuple
+            yield (channel_id, log_line.LogLine(None, type, timestamp, source, data))
+
+    def search_simple (self, channel, query) :
+        """
+            Search for lines from the given channel for the given simple query
+        """
+
+        # build condition
+        cond = hype.Condition()
+
+        # simplified phrase
+        cond.set_options(hype.Condition.SIMPLE)
+
+        # add channel attribute
+        cond.add_attr("@channel STREQ %s" % (channel.id, ))
+
+        # add phrase
+        cond.set_phrase(query)
+
+        # set order
+        cond.set_order("@timestamp NUMA")
+
+        # search with cond
+        for channel_id, line in self.search_cond(cond) :
+            assert channel_id == channel.id
+
+            yield line
+
+def cmd_load (options, channel_name, date) :
+    """
+        Loads the logs for a specific channel/date into the index
+    """
+
+    import channels
+    
+    # open the LogSearchIndex
+    index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')
+
+    # open the channel
+    channel = channels.channel_list.lookup(channel_name)
+
+    # parse date
+    date = datetime.datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=channel.source.tz)
+
+    # load lines for date
+    lines = channel.source.get_date(date)
+
+    # insert
+    index.insert(channel, lines)
+
+def cmd_search (options, channel_name, query) :
+    """
+        Search the index for events on a specific channel with the given query
+    """
+
+    import channels
+    
+    # open the LogSearchIndex
+    index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')
+
+    # open the channel
+    channel = channels.channel_list.lookup(channel_name)
+    
+    # search
+    lines = index.search_simple(channel, query)
+    
+    # display as plaintext
+    for line in options.formatter.format_txt(lines) :
+        print line
+
+if __name__ == '__main__' :
+    from optparse import OptionParser
+    import log_formatter
+    
+    # define parser
+    parser = OptionParser(
+        usage           = "%prog [options] <command> [ ... ]",
+        add_help_option = True,
+    )
+
+    # define command-line arguments
+    parser.add_option("-I", "--index", dest="index_path", help="Index database path", metavar="PATH", default="logs/index")
+    parser.add_option("--create", dest="create_index", help="Create index database", default=False)
+    parser.add_option("-f", "--formatter", dest="formatter_name", help="LogFormatter to use", default="irssi")
+    parser.add_option("-z", "--timezone", dest="tz_name", help="Timezone for output", metavar="TZ", default="UTC")
+
+    # parse
+    options, args = parser.parse_args()
+
+    # postprocess stuff
+    options.tz = pytz.timezone(options.tz_name)
+    options.formatter = log_formatter.by_name(options.formatter_name)(options.tz)
+    
+    # pop command
+    command = args.pop(0)
+
+    # inspect
+    func = globals()['cmd_%s' % command]
+    
+    # call
+    func(options, *args)