--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/qmsk/irclogs/log_search.py Sun Sep 13 01:15:56 2009 +0300
@@ -0,0 +1,367 @@
+"""
+ Full-text searching of logs
+"""
+
+import datetime, calendar, pytz
+import os.path
+
+import HyperEstraier as hype
+
+import log_line, utils, config
+
+class LogSearchError (Exception) :
+ """
+ General search error
+ """
+
+ pass
+
+class SearchIndexError (LogSearchError) :
+ """
+ Error manipulating the index
+ """
+
+ def __init__ (self, msg, db) :
+ """
+ Build the error from the given message + HyperEstraier.Database
+ """
+
+ super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error())))
+
+class NoResultsFound (LogSearchError) :
+ """
+ No results found
+ """
+
+ pass
+
+class LogSearchIndex (object) :
+ """
+ An index on the logs for a group of channels.
+
+ This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
+
+ These log documents have the following attributes:
+ @uri - channel/date/line
+ channel - channel code
+ type - the LogType id
+ timestamp - UTC timestamp
+ source_nickname - source nickname
+ source_username - source username
+ source_hostname - source hostname
+ source_chanflags - source channel flags
+ target_nickname - target nickname
+
+ Each document then has a single line of data, which is the log data message
+ """
+
+ def __init__ (self, channels, path, mode='r') :
+ """
+ Open the database at the given path, with the given mode:
+ First char:
+ r - read, error if not exists
+ w - write, create if not exists
+ a - write, error if not exists
+ c - create, error if exists
+
+ Additional chars:
+ trunc - truncate if exists
+ + - read as well as write
+ ? - non-blocking lock open, i.e. it fails if already open
+
+ Channels is the ChannelList.
+ """
+
+ # store
+ self.channels = channels
+ self.path = path
+ self.mode = mode
+
+ # check it does not already exist?
+ if mode in 'c' and os.path.exists(path) :
+ raise LogSearchError("Index already exists: %s" % (path, ))
+
+ # mapping of { mode -> flags }
+ mode_to_flag = {
+ 'r': hype.Database.DBREADER,
+ 'w': hype.Database.DBWRITER | hype.Database.DBCREAT,
+ 'a': hype.Database.DBWRITER,
+ 'c': hype.Database.DBWRITER | hype.Database.DBCREAT,
+ }
+
+ # flags to use, standard modes
+ flags = mode_to_flag[mode[0]]
+
+ # mode-flags
+ if '?' in mode :
+ # non-blocking locking
+ flags |= hype.Database.DBLCKNB
+
+ elif '+' in mode :
+ # read
+ flags |= hype.Database.DBREADER
+
+ elif 'trunc' in mode :
+ # truncate. Dangerous!
+ flags |= hype.Database.DBTRUNC
+
+ # make instance
+ self.db = hype.Database()
+
+ # open
+ if not self.db.open(path, flags) :
+ raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db)
+
+ def close (self) :
+ """
+ Explicitly close the index, this is done automatically on del
+ """
+
+ if not self.db.close() :
+ raise SearchIndexError("Index close failed", self.db)
+
+ def insert (self, channel, lines) :
+ """
+ Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items
+ """
+
+ # count from zero
+ count = 0
+
+ # iterate
+ for line in lines :
+ # insert
+ self.insert_line(channel, line)
+
+ # count
+ count += 1
+
+ # return
+ return count
+
+ def insert_line (self, channel, line) :
+ """
+ Adds a single LogLine for the given LogChannel to the index
+ """
+
+ # validate the LogChannel
+ assert channel.id
+
+ # validate the LogLine
+ assert line.offset
+ assert line.timestamp
+
+ # create new document
+ doc = hype.Document()
+
+ # line date
+ date = line.timestamp.date()
+
+ # ensure that it's not 1900
+ assert date.year != 1900
+
+ # add URI
+ doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
+
+ # add channel id
+ doc.add_attr('channel', channel.id)
+
+ # add type
+ doc.add_attr('type', str(line.type))
+
+ # add UTC timestamp
+ doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp)))
+
+ # add source attribute?
+ if line.source :
+ source_nickname, source_username, source_hostname, source_chanflags = line.source
+
+ if source_nickname :
+ doc.add_attr('source_nickname', source_nickname.encode('utf8'))
+
+ if source_username :
+ doc.add_attr('source_username', source_username.encode('utf8'))
+
+ if source_hostname :
+ doc.add_attr('source_hostname', source_hostname.encode('utf8'))
+
+ if source_chanflags :
+ doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
+
+ # add target attributes?
+ if line.target :
+ target_nickname = line.target
+
+ if target_nickname :
+ doc.add_attr('target_nickname', target_nickname.encode('utf8'))
+
+ # add data
+ if line.data :
+ doc.add_text(line.data.encode('utf8'))
+
+ # put, "clean up dispensable regions of the overwritten document"
+ if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
+ raise SearchIndexError("put_doc", self.db)
+
+ def search_cond (self, cond) :
+ """
+ Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results
+ """
+
+ # execute search, unused 'flags' arg stays zero
+ results = self.db.search(cond, 0)
+
+ # no results?
+ if not results :
+ raise NoResultsFound()
+
+ # iterate over the document IDs
+ for doc_id in results :
+ # load document, this throws an exception...
+ # option constants are hype.Database.GDNOATTR/GDNOTEXT
+ doc = self.db.get_doc(doc_id, 0)
+
+ # load the attributes/text
+ channel = self.channels.lookup(doc.attr('channel'))
+ type = int(doc.attr('type'))
+ timestamp = utils.from_utc_timestamp(int(doc.attr('timestamp')))
+
+ # source
+ source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags'))
+
+ # target
+ target = doc.attr('target_nickname')
+
+ # message text
+ message = doc.cat_texts().decode('utf8')
+
+ # build+yield to as LogLine
+ yield log_line.LogLine(channel, None, type, timestamp, source, target, message)
+
+ def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) :
+ """
+ Search with flexible parameters
+
+ options - bitmask of hype.Condition.*
+ channel - LogChannel object
+ attrs - raw attribute expressions
+ phrase - the search query phrase
+ order - order attribute expression
+ max - number of results to return
+ skip - number of results to skip
+ """
+
+ # build condition
+ cond = hype.Condition()
+
+ if options :
+ # set options
+ cond.set_options(options)
+
+ if channel :
+ # add channel attribute
+ cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8'))
+
+ if attrs :
+ # add attributes
+ for attr in attrs :
+ cond.add_attr(attr.encode('utf8'))
+
+ if phrase :
+ # add phrase
+ cond.set_phrase(phrase.encode('utf8'))
+
+ if order :
+ # set order
+ cond.set_order(order)
+
+ if max :
+ # set max
+ cond.set_max(max)
+
+ if skip :
+ # set skip
+ cond.set_skip(skip)
+
+ # execute
+ return self.search_cond(cond)
+
+ def search_simple (self, channel, query, count=None, offset=None, search_msg=True, search_nick=False) :
+ """
+ Search for lines from the given channel for the given simple query.
+
+ The search_* params define which attributes to search for (using fulltext search for the message, STROR for
+ attributes).
+ """
+
+ # search attributes
+ attrs = []
+
+ # nickname target query
+ if search_nick :
+ attrs.append("source_nickname STRINC %s" % query)
+# attrs.append("target_nickname STRINC %s" % query)
+
+ # use search(), backwards
+ results = list(self.search(
+ # simplified phrase
+ options = hype.Condition.SIMPLE,
+
+ # specific channel
+ channel = channel,
+
+ # given phrase
+ phrase = query if search_msg else None,
+
+ # attributes defined above
+ attrs = attrs,
+
+ # order by timestamp, descending (backwards)
+ order = "timestamp NUMD",
+
+ # count/offset
+ max = count,
+ skip = offset,
+ ))
+
+ # reverse
+ return reversed(results)
+
+ def list (self, channel, date, count=None, skip=None) :
+ """
+ List all indexed log items for the given UTC date
+ """
+
+ # start/end dates
+ dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0)
+ dt_end = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999)
+
+ # search
+ return self.search(
+ # specific channel
+ channel = channel,
+
+ # specific date range
+ attrs = [
+ "timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end))
+ ],
+
+ # order correctly
+ order = "timestamp NUMA",
+
+ # max count/offset
+ max = count,
+ skip = skip
+ )
+
+def get_index () :
+ """
+ Returns the default read-only index, suitable for searching
+ """
+
+ # XXX: no caching, just open it every time
+ _index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r')
+
+ # return
+ return _index
+