qmsk/irclogs/log_search.py
author Tero Marttila <terom@fixme.fi>
Sun, 13 Sep 2009 18:47:00 +0300
changeset 141 65c98c9e1716
parent 140 6db2527b67cf
permissions -rw-r--r--
improved search - separate q=/nick= fields
"""
    Full-text searching of logs
"""

import datetime, calendar, pytz
import os.path

import HyperEstraier as hype

import log_line, utils, config

class LogSearchError (Exception) :
    """
        General search error
    """

    pass

class SearchIndexError (LogSearchError) :
    """
        Error manipulating the index
    """

    def __init__ (self, msg, db) :
        """
            Build the error from the given message + HyperEstraier.Database
        """

        super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error())))

class NoResultsFound (LogSearchError) :
    """
        No results found
    """

    pass

class LogSearchIndex (object) :
    """
        An index on the logs for a group of channels.

        This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).

        These log documents have the following attributes:
            @uri                - channel/date/line
            channel             - channel code
            type                - the LogType id
            timestamp           - UTC timestamp
            source_nickname     - source nickname
            source_username     - source username
            source_hostname     - source hostname
            source_chanflags    - source channel flags
            target_nickname     - target nickname

        Each document then has a single line of data, which is the log data message
    """

    def __init__ (self, channels, path, mode='r') :
        """
            Open the database at the given path, with the given mode:
                First char:
                    r       - read, error if not exists
                    w       - write, create if not exists
                    a       - write, error if not exists
                    c       - create, error if exists
                
                Additional chars:
                    trunc   - truncate if exists
                    +       - read as well as write
                    ?       - non-blocking lock open, i.e. it fails if already open
            
            Channels is the ChannelList.
        """

        # store
        self.channels = channels
        self.path = path
        self.mode = mode

        # check it does not already exist?
        if mode in 'c' and os.path.exists(path) :
            raise LogSearchError("Index already exists: %s" % (path, ))
        
        # mapping of { mode -> flags }
        mode_to_flag = {
            'r':    hype.Database.DBREADER,
            'w':    hype.Database.DBWRITER | hype.Database.DBCREAT,
            'a':    hype.Database.DBWRITER,
            'c':    hype.Database.DBWRITER | hype.Database.DBCREAT,
        }

        # flags to use, standard modes
        flags = mode_to_flag[mode[0]]
 
        # mode-flags
        if '?' in mode :
            # non-blocking locking
            flags |= hype.Database.DBLCKNB
        
        elif '+' in mode :
            # read
            flags |= hype.Database.DBREADER

        elif 'trunc' in mode :
            # truncate. Dangerous!
            flags |= hype.Database.DBTRUNC
       
        # make instance
        self.db = hype.Database()
        
        # open
        if not self.db.open(path, flags) :
            raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db)
    
    def close (self) :
        """
            Explicitly close the index, this is done automatically on del
        """

        if not self.db.close() :
            raise SearchIndexError("Index close failed", self.db)

    def insert (self, channel, lines) :
        """
            Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items
        """
        
        # count from zero
        count = 0
        
        # iterate
        for line in lines :
            # insert
            self.insert_line(channel, line)

            # count
            count += 1
        
        # return
        return count

    def insert_line (self, channel, line) :
        """
            Adds a single LogLine for the given LogChannel to the index
        """

        # validate the LogChannel
        assert channel.id

        # validate the LogLine
        assert line.offset
        assert line.timestamp

        # create new document
        doc = hype.Document()

        # line date
        date = line.timestamp.date()

        # ensure that it's not 1900
        assert date.year != 1900

        # add URI
        doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))

        # add channel id
        doc.add_attr('channel',     channel.id)

        # add type
        doc.add_attr('type',        str(line.type))

        # add UTC timestamp
        doc.add_attr('timestamp',   str(utils.to_utc_timestamp(line.timestamp)))

        # add source attribute?
        if line.source :
            source_nickname, source_username, source_hostname, source_chanflags = line.source

            if source_nickname :
                doc.add_attr('source_nickname', source_nickname.encode('utf8'))
            
            if source_username :
                doc.add_attr('source_username', source_username.encode('utf8'))

            if source_hostname :
                doc.add_attr('source_hostname', source_hostname.encode('utf8'))

            if source_chanflags :
                doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
        
        # add target attributes?
        if line.target :
            target_nickname = line.target

            if target_nickname :
                doc.add_attr('target_nickname', target_nickname.encode('utf8'))

        # add data
        if line.data :
            doc.add_text(line.data.encode('utf8'))

        # put, "clean up dispensable regions of the overwritten document"
        if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
            raise SearchIndexError("put_doc", self.db)
            
    def search_cond (self, cond) :
        """
            Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results
        """

        # execute search, unused 'flags' arg stays zero
        results = self.db.search(cond, 0)

        # no results?
        if not results :
            raise NoResultsFound()

        # iterate over the document IDs
        for doc_id in results :
            # load document, this throws an exception...
            # option constants are hype.Database.GDNOATTR/GDNOTEXT
            doc = self.db.get_doc(doc_id, 0)

            # load the attributes/text
            channel         = self.channels.lookup(doc.attr('channel'))
            type            = int(doc.attr('type'))
            timestamp       = utils.from_utc_timestamp(int(doc.attr('timestamp')))

            # source
            source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags'))

            # target
            target = doc.attr('target_nickname')
            
            # message text
            message         = doc.cat_texts().decode('utf8')

            # build+yield to as LogLine
            yield log_line.LogLine(channel, None, type, timestamp, source, target, message)
    
    def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) :
        """
            Search with flexible parameters

                options     - bitmask of hype.Condition.*
                channel     - LogChannel object
                attrs       - raw attribute expressions
                phrase      - the search query phrase
                order       - order attribute expression
                max         - number of results to return
                skip        - number of results to skip
        """

        # build condition
        cond = hype.Condition()
        
        if options :
            # set options
            cond.set_options(options)
        
        if channel :
            # add channel attribute
            cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8'))
        
        if attrs :
            # add attributes
            for attr in attrs :
                cond.add_attr(attr.encode('utf8'))

        if phrase :
            # add phrase
            cond.set_phrase(phrase.encode('utf8'))
        
        if order :
            # set order
            cond.set_order(order)
        
        if max :
            # set max
            cond.set_max(max)

        if skip :
            # set skip
            cond.set_skip(skip)

        # execute
        return self.search_cond(cond)

    def search_simple (self, channel, query, count=None, offset=None) :
        """
            Search for lines from the given channel for the given simple query.

            The given text is searched for in the text of the given channel's entries, and the list of results in
            reverse time order is returned.
        """
        
        # search attributes
        attrs = []

        # use search(), backwards
        results = list(self.search(
            # simplified phrase
            options     = hype.Condition.SIMPLE,

            # specific channel
            channel     = channel,

            # given phrase
            phrase      = query,

            # attributes defined above
            attrs       = attrs,

            # order by timestamp, descending (backwards)
            order       = "timestamp NUMD",

            # count/offset
            max         = count,
            skip        = offset,
        ))
        
        # reverse
        return reversed(results)

    def search_advanced (self, channel, phrase=None, nick_query=None, count=None, offset=None) :
        """
            Search for lines from the given channel for the given full-featured query.

            The given phrase is used to build the condition, or alternatively, the given extra *_query parameters can
            be used to specific additional attributes to search.
        """

        attrs = []

        if nick_query :
            # search for messages from specific nickname
            attrs.append("source_nickname STRINC %s" % nick_query)

        # use search(), backwards
        results = list(self.search(
            # specific channel
            channel     = channel,

            # given phrase
            phrase      = phrase,

            # attributes defined above
            attrs       = attrs,

            # order by timestamp, descending (backwards)
            order       = "timestamp NUMD",

            # count/offset
            max         = count,
            skip        = offset,
        ))
        
        # reverse
        return reversed(results)

    def list (self, channel, date, count=None, skip=None) :
        """
            List all indexed log items for the given UTC date
        """

        # start/end dates
        dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0)
        dt_end   = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999)
        
        # search
        return self.search(
            # specific channel
            channel     = channel,

            # specific date range
            attrs       = [
                "timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end))
            ],

            # order correctly
            order       = "timestamp NUMA",

            # max count/offset
            max         = count,
            skip        = skip
        )

def get_index () :
    """
        Returns the default read-only index, suitable for searching
    """
    
    # XXX: no caching, just open it every time
    _index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r')

    # return
    return _index