log_search.py
author Tero Marttila <terom@fixme.fi>
Mon, 09 Feb 2009 11:05:53 +0200
changeset 64 cdb6403c2498
child 65 8b50694f841e
permissions -rw-r--r--
beginnings of a LogSearchIndex class
"""
    Full-text searching of logs
"""

import datetime, calendar, pytz

import HyperEstraier as hype

import log_line

class LogSearchIndex (object) :
    """
        An index on the logs for a group of channels.

        This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).

        These log documents have the following attributes:
            @uri        - channel/date/line
            @channel    - channel id
            @type       - the LogType id
            @timestamp  - UTC timestamp
            @source     - nickname

        Each document then has a single line of data, which is the log message itself
    """

    def __init__ (self, path, mode='r') :
        """
            Open the database, with the given mode:
                r       - read-only
                w       - read-write, create if not exists
                a       - read-write, do not create
                *       - read-write, truncate and create new
        """
        
        # mapping of { mode -> flags }
        mode_to_flag = {
            'r':    hype.Database.DBREADER,
            'w':    hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT,
            'a':    hype.Database.DBREADER | hype.Database.DBWRITER,
            '*':    hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC,
        }

        # look up flags
        flags = mode_to_flag[mode]
        
        # make instance
        self.db = hype.Database()
        
        # open
        if not self.db.open(path, flags) :
            raise Exception("Index open failed: %s" % (path, ))

    def insert (self, channel, lines) :
        """
            Adds a sequence of LogLines from the given LogChannel to the index
        """
        
        # validate the LogChannel
        assert channel.name
        
        # iterate
        for line in lines :
            # validate the LogLine
            assert line.offset
            assert line.timestamp

            # create new document
            doc = hype.Document()

            # line date
            date = line.timestamp.date()

            # convert to UTC timestamp
            utc_timestamp = calendar.timegm(line.timestamp.utctimetuple())

            # ensure that it's not 1900
            assert date.year != 1900

            # add URI
            doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))

            # add channel id
            doc.add_attr('@channel',    channel.id)

            # add type
            doc.add_attr('@type',       str(line.type))

            # add UTC timestamp
            doc.add_attr('@timestamp',  str(utc_timestamp))

            # add source attribute?
            if line.source :
                doc.add_attr('@source', str(line.source))
            
            # add data text
            doc.add_text(line.data.encode('utf8'))

            # put
            # XXX: what does this flag mean?
            if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
                raise Exeception("Index put_doc failed")
    
    def search_cond (self, cond) :
        """
            Search using a raw hype.Condition
        """

        # execute search, unused 'flags' arg stays zero
        results = self.db.search(cond, 0)

        # iterate over the document IDs
        for doc_id in results :
            # load document, this throws an exception...
            # option constants are hype.Database.GDNOATTR/GDNOTEXT
            doc = self.db.get_doc(doc_id, 0)

            # load the attributes/text
            channel_id  = doc.attr('@channel')
            type        = int(doc.attr('@type'))
            timestamp   = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc)
            source      = doc.attr('@source')
            data        = doc.cat_texts()

            # build+yield to (channel_id, LogLine) tuple
            yield (channel_id, log_line.LogLine(None, type, timestamp, source, data))

    def search_simple (self, channel, query) :
        """
            Search for lines from the given channel for the given simple query
        """

        # build condition
        cond = hype.Condition()

        # simplified phrase
        cond.set_options(hype.Condition.SIMPLE)

        # add channel attribute
        cond.add_attr("@channel STREQ %s" % (channel.id, ))

        # add phrase
        cond.set_phrase(query)

        # set order
        cond.set_order("@timestamp NUMA")

        # search with cond
        for channel_id, line in self.search_cond(cond) :
            assert channel_id == channel.id

            yield line

def cmd_load (options, channel_name, date) :
    """
        Loads the logs for a specific channel/date into the index
    """

    import channels
    
    # open the LogSearchIndex
    index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')

    # open the channel
    channel = channels.channel_list.lookup(channel_name)

    # parse date
    date = datetime.datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=channel.source.tz)

    # load lines for date
    lines = channel.source.get_date(date)

    # insert
    index.insert(channel, lines)

def cmd_search (options, channel_name, query) :
    """
        Search the index for events on a specific channel with the given query
    """

    import channels
    
    # open the LogSearchIndex
    index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')

    # open the channel
    channel = channels.channel_list.lookup(channel_name)
    
    # search
    lines = index.search_simple(channel, query)
    
    # display as plaintext
    for line in options.formatter.format_txt(lines) :
        print line

if __name__ == '__main__' :
    from optparse import OptionParser
    import log_formatter
    
    # define parser
    parser = OptionParser(
        usage           = "%prog [options] <command> [ ... ]",
        add_help_option = True,
    )

    # define command-line arguments
    parser.add_option("-I", "--index", dest="index_path", help="Index database path", metavar="PATH", default="logs/index")
    parser.add_option("--create", dest="create_index", help="Create index database", default=False)
    parser.add_option("-f", "--formatter", dest="formatter_name", help="LogFormatter to use", default="irssi")
    parser.add_option("-z", "--timezone", dest="tz_name", help="Timezone for output", metavar="TZ", default="UTC")

    # parse
    options, args = parser.parse_args()

    # postprocess stuff
    options.tz = pytz.timezone(options.tz_name)
    options.formatter = log_formatter.by_name(options.formatter_name)(options.tz)
    
    # pop command
    command = args.pop(0)

    # inspect
    func = globals()['cmd_%s' % command]
    
    # call
    func(options, *args)