log_search.py
author Tero Marttila <terom@fixme.fi>
Mon, 16 Feb 2009 02:09:14 +0200
changeset 134 fbccc1648d79
parent 127 5746705a2719
permissions -rw-r--r--
improved error handling for CGI/FastCGI
"""
    Full-text searching of logs
"""

import datetime, calendar, pytz
import os.path

import HyperEstraier as hype

import log_line, utils, config

class LogSearchError (Exception) :
    """
        General search error
    """

    pass

class SearchIndexError (LogSearchError) :
    """
        Error manipulating the index
    """

    def __init__ (self, msg, db) :
        """
            Build the error from the given message + HyperEstraier.Database
        """

        super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error())))

class NoResultsFound (LogSearchError) :
    """
        No results found
    """

    pass

class LogSearchIndex (object) :
    """
        An index on the logs for a group of channels.

        This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).

        These log documents have the following attributes:
            @uri                - channel/date/line
            channel             - channel code
            type                - the LogType id
            timestamp           - UTC timestamp
            source_nickname     - source nickname
            source_username     - source username
            source_hostname     - source hostname
            source_chanflags    - source channel flags
            target_nickname     - target nickname

        Each document then has a single line of data, which is the log data message
    """

    def __init__ (self, channels, path, mode='r') :
        """
            Open the database at the given path, with the given mode:
                First char:
                    r       - read, error if not exists
                    w       - write, create if not exists
                    a       - write, error if not exists
                    c       - create, error if exists
                
                Additional chars:
                    trunc   - truncate if exists
                    +       - read as well as write
                    ?       - non-blocking lock open, i.e. it fails if already open
            
            Channels is the ChannelList.
        """

        # store
        self.channels = channels
        self.path = path
        self.mode = mode

        # check it does not already exist?
        if mode in 'c' and os.path.exists(path) :
            raise LogSearchError("Index already exists: %s" % (path, ))
        
        # mapping of { mode -> flags }
        mode_to_flag = {
            'r':    hype.Database.DBREADER,
            'w':    hype.Database.DBWRITER | hype.Database.DBCREAT,
            'a':    hype.Database.DBWRITER,
            'c':    hype.Database.DBWRITER | hype.Database.DBCREAT,
        }

        # flags to use, standard modes
        flags = mode_to_flag[mode[0]]
 
        # mode-flags
        if '?' in mode :
            # non-blocking locking
            flags |= hype.Database.DBLCKNB
        
        elif '+' in mode :
            # read
            flags |= hype.Database.DBREADER

        elif 'trunc' in mode :
            # truncate. Dangerous!
            flags |= hype.Database.DBTRUNC
       
        # make instance
        self.db = hype.Database()
        
        # open
        if not self.db.open(path, flags) :
            raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db)
    
    def close (self) :
        """
            Explicitly close the index, this is done automatically on del
        """

        if not self.db.close() :
            raise SearchIndexError("Index close failed", self.db)

    def insert (self, channel, lines) :
        """
            Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items
        """
        
        # count from zero
        count = 0
        
        # iterate
        for line in lines :
            # insert
            self.insert_line(channel, line)

            # count
            count += 1
        
        # return
        return count

    def insert_line (self, channel, line) :
        """
            Adds a single LogLine for the given LogChannel to the index
        """

        # validate the LogChannel
        assert channel.id

        # validate the LogLine
        assert line.offset
        assert line.timestamp

        # create new document
        doc = hype.Document()

        # line date
        date = line.timestamp.date()

        # ensure that it's not 1900
        assert date.year != 1900

        # add URI
        doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))

        # add channel id
        doc.add_attr('channel',     channel.id)

        # add type
        doc.add_attr('type',        str(line.type))

        # add UTC timestamp
        doc.add_attr('timestamp',   str(utils.to_utc_timestamp(line.timestamp)))

        # add source attribute?
        if line.source :
            source_nickname, source_username, source_hostname, source_chanflags = line.source

            if source_nickname :
                doc.add_attr('source_nickname', source_nickname.encode('utf8'))
            
            if source_username :
                doc.add_attr('source_username', source_username.encode('utf8'))

            if source_hostname :
                doc.add_attr('source_hostname', source_hostname.encode('utf8'))

            if source_chanflags :
                doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
        
        # add target attributes?
        if line.target :
            target_nickname = line.target

            if target_nickname :
                doc.add_attr('target_nickname', target_nickname.encode('utf8'))

        # add data
        if line.data :
            doc.add_text(line.data.encode('utf8'))

        # put, "clean up dispensable regions of the overwritten document"
        if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
            raise SearchIndexError("put_doc", self.db)
            
    def search_cond (self, cond) :
        """
            Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results
        """

        # execute search, unused 'flags' arg stays zero
        results = self.db.search(cond, 0)

        # no results?
        if not results :
            raise NoResultsFound()

        # iterate over the document IDs
        for doc_id in results :
            # load document, this throws an exception...
            # option constants are hype.Database.GDNOATTR/GDNOTEXT
            doc = self.db.get_doc(doc_id, 0)

            # load the attributes/text
            channel         = self.channels.lookup(doc.attr('channel'))
            type            = int(doc.attr('type'))
            timestamp       = utils.from_utc_timestamp(int(doc.attr('timestamp')))

            # source
            source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags'))

            # target
            target = doc.attr('target_nickname')
            
            # message text
            message         = doc.cat_texts().decode('utf8')

            # build+yield to as LogLine
            yield log_line.LogLine(channel, None, type, timestamp, source, target, message)
    
    def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) :
        """
            Search with flexible parameters

                options     - bitmask of hype.Condition.*
                channel     - LogChannel object
                attrs       - raw attribute expressions
                phrase      - the search query phrase
                order       - order attribute expression
                max         - number of results to return
                skip        - number of results to skip
        """

        # build condition
        cond = hype.Condition()
        
        if options :
            # set options
            cond.set_options(options)
        
        if channel :
            # add channel attribute
            cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8'))
        
        if attrs :
            # add attributes
            for attr in attrs :
                cond.add_attr(attr.encode('utf8'))

        if phrase :
            # add phrase
            cond.set_phrase(phrase.encode('utf8'))
        
        if order :
            # set order
            cond.set_order(order)
        
        if max :
            # set max
            cond.set_max(max)

        if skip :
            # set skip
            cond.set_skip(skip)

        # execute
        return self.search_cond(cond)

    def search_simple (self, channel, query, count=None, offset=None, search_msg=True, search_nick=False) :
        """
            Search for lines from the given channel for the given simple query.

            The search_* params define which attributes to search for (using fulltext search for the message, STROR for
            attributes).
        """
        
        # search attributes
        attrs = []

        # nickname target query
        if search_nick :
            attrs.append("source_nickname STRINC %s" % query)
#            attrs.append("target_nickname STRINC %s" % query)
        
        # use search(), backwards
        results = list(self.search(
            # simplified phrase
            options     = hype.Condition.SIMPLE,

            # specific channel
            channel     = channel,

            # given phrase
            phrase      = query if search_msg else None,

            # attributes defined above
            attrs       = attrs,

            # order by timestamp, descending (backwards)
            order       = "timestamp NUMD",

            # count/offset
            max         = count,
            skip        = offset,
        ))
        
        # reverse
        return reversed(results)

    def list (self, channel, date, count=None, skip=None) :
        """
            List all indexed log items for the given UTC date
        """

        # start/end dates
        dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0)
        dt_end   = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999)
        
        # search
        return self.search(
            # specific channel
            channel     = channel,

            # specific date range
            attrs       = [
                "timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end))
            ],

            # order correctly
            order       = "timestamp NUMA",

            # max count/offset
            max         = count,
            skip        = skip
        )

def get_index () :
    """
        Returns the default read-only index, suitable for searching
    """
    
    # XXX: no caching, just open it every time
    _index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r')

    # return
    return _index