diff -r 9c7769850195 -r 6db2527b67cf log_search.py --- a/log_search.py Sun Sep 13 00:49:55 2009 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,367 +0,0 @@ -""" - Full-text searching of logs -""" - -import datetime, calendar, pytz -import os.path - -import HyperEstraier as hype - -import log_line, utils, config - -class LogSearchError (Exception) : - """ - General search error - """ - - pass - -class SearchIndexError (LogSearchError) : - """ - Error manipulating the index - """ - - def __init__ (self, msg, db) : - """ - Build the error from the given message + HyperEstraier.Database - """ - - super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error()))) - -class NoResultsFound (LogSearchError) : - """ - No results found - """ - - pass - -class LogSearchIndex (object) : - """ - An index on the logs for a group of channels. - - This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). - - These log documents have the following attributes: - @uri - channel/date/line - channel - channel code - type - the LogType id - timestamp - UTC timestamp - source_nickname - source nickname - source_username - source username - source_hostname - source hostname - source_chanflags - source channel flags - target_nickname - target nickname - - Each document then has a single line of data, which is the log data message - """ - - def __init__ (self, channels, path, mode='r') : - """ - Open the database at the given path, with the given mode: - First char: - r - read, error if not exists - w - write, create if not exists - a - write, error if not exists - c - create, error if exists - - Additional chars: - trunc - truncate if exists - + - read as well as write - ? - non-blocking lock open, i.e. it fails if already open - - Channels is the ChannelList. - """ - - # store - self.channels = channels - self.path = path - self.mode = mode - - # check it does not already exist? - if mode in 'c' and os.path.exists(path) : - raise LogSearchError("Index already exists: %s" % (path, )) - - # mapping of { mode -> flags } - mode_to_flag = { - 'r': hype.Database.DBREADER, - 'w': hype.Database.DBWRITER | hype.Database.DBCREAT, - 'a': hype.Database.DBWRITER, - 'c': hype.Database.DBWRITER | hype.Database.DBCREAT, - } - - # flags to use, standard modes - flags = mode_to_flag[mode[0]] - - # mode-flags - if '?' in mode : - # non-blocking locking - flags |= hype.Database.DBLCKNB - - elif '+' in mode : - # read - flags |= hype.Database.DBREADER - - elif 'trunc' in mode : - # truncate. Dangerous! - flags |= hype.Database.DBTRUNC - - # make instance - self.db = hype.Database() - - # open - if not self.db.open(path, flags) : - raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db) - - def close (self) : - """ - Explicitly close the index, this is done automatically on del - """ - - if not self.db.close() : - raise SearchIndexError("Index close failed", self.db) - - def insert (self, channel, lines) : - """ - Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items - """ - - # count from zero - count = 0 - - # iterate - for line in lines : - # insert - self.insert_line(channel, line) - - # count - count += 1 - - # return - return count - - def insert_line (self, channel, line) : - """ - Adds a single LogLine for the given LogChannel to the index - """ - - # validate the LogChannel - assert channel.id - - # validate the LogLine - assert line.offset - assert line.timestamp - - # create new document - doc = hype.Document() - - # line date - date = line.timestamp.date() - - # ensure that it's not 1900 - assert date.year != 1900 - - # add URI - doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) - - # add channel id - doc.add_attr('channel', channel.id) - - # add type - doc.add_attr('type', str(line.type)) - - # add UTC timestamp - doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp))) - - # add source attribute? - if line.source : - source_nickname, source_username, source_hostname, source_chanflags = line.source - - if source_nickname : - doc.add_attr('source_nickname', source_nickname.encode('utf8')) - - if source_username : - doc.add_attr('source_username', source_username.encode('utf8')) - - if source_hostname : - doc.add_attr('source_hostname', source_hostname.encode('utf8')) - - if source_chanflags : - doc.add_attr('source_chanflags', source_chanflags.encode('utf8')) - - # add target attributes? - if line.target : - target_nickname = line.target - - if target_nickname : - doc.add_attr('target_nickname', target_nickname.encode('utf8')) - - # add data - if line.data : - doc.add_text(line.data.encode('utf8')) - - # put, "clean up dispensable regions of the overwritten document" - if not self.db.put_doc(doc, hype.Database.PDCLEAN) : - raise SearchIndexError("put_doc", self.db) - - def search_cond (self, cond) : - """ - Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results - """ - - # execute search, unused 'flags' arg stays zero - results = self.db.search(cond, 0) - - # no results? - if not results : - raise NoResultsFound() - - # iterate over the document IDs - for doc_id in results : - # load document, this throws an exception... - # option constants are hype.Database.GDNOATTR/GDNOTEXT - doc = self.db.get_doc(doc_id, 0) - - # load the attributes/text - channel = self.channels.lookup(doc.attr('channel')) - type = int(doc.attr('type')) - timestamp = utils.from_utc_timestamp(int(doc.attr('timestamp'))) - - # source - source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags')) - - # target - target = doc.attr('target_nickname') - - # message text - message = doc.cat_texts().decode('utf8') - - # build+yield to as LogLine - yield log_line.LogLine(channel, None, type, timestamp, source, target, message) - - def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) : - """ - Search with flexible parameters - - options - bitmask of hype.Condition.* - channel - LogChannel object - attrs - raw attribute expressions - phrase - the search query phrase - order - order attribute expression - max - number of results to return - skip - number of results to skip - """ - - # build condition - cond = hype.Condition() - - if options : - # set options - cond.set_options(options) - - if channel : - # add channel attribute - cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8')) - - if attrs : - # add attributes - for attr in attrs : - cond.add_attr(attr.encode('utf8')) - - if phrase : - # add phrase - cond.set_phrase(phrase.encode('utf8')) - - if order : - # set order - cond.set_order(order) - - if max : - # set max - cond.set_max(max) - - if skip : - # set skip - cond.set_skip(skip) - - # execute - return self.search_cond(cond) - - def search_simple (self, channel, query, count=None, offset=None, search_msg=True, search_nick=False) : - """ - Search for lines from the given channel for the given simple query. - - The search_* params define which attributes to search for (using fulltext search for the message, STROR for - attributes). - """ - - # search attributes - attrs = [] - - # nickname target query - if search_nick : - attrs.append("source_nickname STRINC %s" % query) -# attrs.append("target_nickname STRINC %s" % query) - - # use search(), backwards - results = list(self.search( - # simplified phrase - options = hype.Condition.SIMPLE, - - # specific channel - channel = channel, - - # given phrase - phrase = query if search_msg else None, - - # attributes defined above - attrs = attrs, - - # order by timestamp, descending (backwards) - order = "timestamp NUMD", - - # count/offset - max = count, - skip = offset, - )) - - # reverse - return reversed(results) - - def list (self, channel, date, count=None, skip=None) : - """ - List all indexed log items for the given UTC date - """ - - # start/end dates - dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0) - dt_end = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999) - - # search - return self.search( - # specific channel - channel = channel, - - # specific date range - attrs = [ - "timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end)) - ], - - # order correctly - order = "timestamp NUMA", - - # max count/offset - max = count, - skip = skip - ) - -def get_index () : - """ - Returns the default read-only index, suitable for searching - """ - - # XXX: no caching, just open it every time - _index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r') - - # return - return _index -