improve LogSearchIndex error handling, add explicit close() method, and modify get_index to not keep the index open persistently
"""
Full-text searching of logs
"""
import datetime, calendar, pytz
import os.path
import HyperEstraier as hype
import log_line, utils, config
class LogSearchError (Exception) :
"""
General search error
"""
pass
class SearchIndexError (LogSearchError) :
"""
Error manipulating the index
"""
def __init__ (self, msg, db) :
"""
Build the error from the given message + HyperEstraier.Database
"""
super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error())))
class NoResultsFound (LogSearchError) :
"""
No results found
"""
pass
class LogSearchIndex (object) :
"""
An index on the logs for a group of channels.
This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
These log documents have the following attributes:
@uri - channel/date/line
channel - channel code
type - the LogType id
timestamp - UTC timestamp
source_nickname - source nickname
source_username - source username
source_hostname - source hostname
source_chanflags - source channel flags
target_nickname - target nickname
Each document then has a single line of data, which is the log data message
"""
def __init__ (self, channels, path, mode='r') :
"""
Open the database at the given path, with the given mode:
First char:
r - read, error if not exists
w - write, create if not exists
a - write, error if not exists
c - create, error if exists
Additional chars:
trunc - truncate if exists
+ - read as well as write
? - non-blocking lock open, i.e. it fails if already open
Channels is the ChannelList.
"""
# store
self.channels = channels
self.path = path
self.mode = mode
# check it does not already exist?
if mode in 'c' and os.path.exists(path) :
raise LogSearchError("Index already exists: %s" % (path, ))
# mapping of { mode -> flags }
mode_to_flag = {
'r': hype.Database.DBREADER,
'w': hype.Database.DBWRITER | hype.Database.DBCREAT,
'a': hype.Database.DBWRITER,
'c': hype.Database.DBWRITER | hype.Database.DBCREAT,
}
# flags to use, standard modes
flags = mode_to_flag[mode[0]]
# mode-flags
if '?' in mode :
# non-blocking locking
flags |= hype.Database.DBLCKNB
elif '+' in mode :
# read
flags |= hype.Database.DBREADER
elif 'trunc' in mode :
# truncate. Dangerous!
flags |= hype.Database.DBTRUNC
# make instance
self.db = hype.Database()
# open
if not self.db.open(path, flags) :
raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db)
def close (self) :
"""
Explicitly close the index, this is done automatically on del
"""
if not self.db.close() :
raise SearchIndexError("Index close failed", self.db)
def insert (self, channel, lines) :
"""
Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items
"""
# count from zero
count = 0
# iterate
for line in lines :
# insert
self.insert_line(channel, line)
# count
count += 1
# return
return count
def insert_line (self, channel, line) :
"""
Adds a single LogLine for the given LogChannel to the index
"""
# validate the LogChannel
assert channel.id
# validate the LogLine
assert line.offset
assert line.timestamp
# create new document
doc = hype.Document()
# line date
date = line.timestamp.date()
# ensure that it's not 1900
assert date.year != 1900
# add URI
doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
# add channel id
doc.add_attr('channel', channel.id)
# add type
doc.add_attr('type', str(line.type))
# add UTC timestamp
doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp)))
# add source attribute?
if line.source :
source_nickname, source_username, source_hostname, source_chanflags = line.source
if source_nickname :
doc.add_attr('source_nickname', source_nickname.encode('utf8'))
if source_username :
doc.add_attr('source_username', source_username.encode('utf8'))
if source_hostname :
doc.add_attr('source_hostname', source_hostname.encode('utf8'))
if source_chanflags :
doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
# add target attributes?
if line.target :
target_nickname = line.target
if target_nickname :
doc.add_attr('target_nickname', target_nickname.encode('utf8'))
# add data
if line.data :
doc.add_text(line.data.encode('utf8'))
# put, "clean up dispensable regions of the overwritten document"
if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
raise SearchIndexError("put_doc", self.db)
def search_cond (self, cond) :
"""
Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results
"""
# execute search, unused 'flags' arg stays zero
results = self.db.search(cond, 0)
# no results?
if not results :
raise NoResultsFound()
# iterate over the document IDs
for doc_id in results :
# load document, this throws an exception...
# option constants are hype.Database.GDNOATTR/GDNOTEXT
doc = self.db.get_doc(doc_id, 0)
# load the attributes/text
channel = self.channels.lookup(doc.attr('channel'))
type = int(doc.attr('type'))
timestamp = utils.from_utc_timestamp(int(doc.attr('timestamp')))
# source
source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags'))
# target
target = doc.attr('target_nickname')
# message text
message = doc.cat_texts().decode('utf8')
# build+yield to as LogLine
yield log_line.LogLine(channel, None, type, timestamp, source, target, message)
def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) :
"""
Search with flexible parameters
options - bitmask of hype.Condition.*
channel - LogChannel object
attrs - raw attribute expressions
phrase - the search query phrase
order - order attribute expression
max - number of results to return
skip - number of results to skip
"""
# build condition
cond = hype.Condition()
if options :
# set options
cond.set_options(options)
if channel :
# add channel attribute
cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8'))
if attrs :
# add attributes
for attr in attrs :
cond.add_attr(attr.encode('utf8'))
if phrase :
# add phrase
cond.set_phrase(phrase.encode('utf8'))
if order :
# set order
cond.set_order(order)
if max :
# set max
cond.set_max(max)
if skip :
# set skip
cond.set_skip(skip)
# execute
return self.search_cond(cond)
def search_simple (self, channel, query, count=None, offset=None, search_msg=True, search_nick=False) :
"""
Search for lines from the given channel for the given simple query.
The search_* params define which attributes to search for (using fulltext search for the message, STROR for
attributes).
"""
# search attributes
attrs = []
# nickname target query
if search_nick :
attrs.append("source_nickname STRINC %s" % query)
# attrs.append("target_nickname STRINC %s" % query)
# use search(), backwards
results = list(self.search(
# simplified phrase
options = hype.Condition.SIMPLE,
# specific channel
channel = channel,
# given phrase
phrase = query if search_msg else None,
# attributes defined above
attrs = attrs,
# order by timestamp, descending (backwards)
order = "timestamp NUMD",
# count/offset
max = count,
skip = offset,
))
# reverse
return reversed(results)
def list (self, channel, date, count=None, skip=None) :
"""
List all indexed log items for the given UTC date
"""
# start/end dates
dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0)
dt_end = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999)
# search
return self.search(
# specific channel
channel = channel,
# specific date range
attrs = [
"timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end))
],
# order correctly
order = "timestamp NUMA",
# max count/offset
max = count,
skip = skip
)
def get_index () :
"""
Returns the default read-only index, suitable for searching
"""
# XXX: no caching, just open it every time
_index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r')
# return
return _index