"""
Full-text searching of logs
"""
import datetime, calendar, pytz
import os.path
import HyperEstraier as hype
import log_line, utils, config
class LogSearchError (Exception) :
"""
General search error
"""
pass
class SearchIndexError (LogSearchError) :
"""
Error manipulating the index
"""
def __init__ (self, msg, db) :
"""
Build the error from the given message + HyperEstraier.Database
"""
super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error())))
class NoResultsFound (LogSearchError) :
"""
No results found
"""
pass
class LogSearchIndex (object) :
"""
An index on the logs for a group of channels.
This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
These log documents have the following attributes:
@uri - channel/date/line
channel - channel code
type - the LogType id
timestamp - UTC timestamp
source_nickname - source nickname
source_username - source username
source_hostname - source hostname
source_chanflags - source channel flags
target_nickname - target nickname
Each document then has a single line of data, which is the log data message
"""
def __init__ (self, channels, path, mode='r') :
"""
Open the database at the given path, with the given mode:
First char:
r - read, error if not exists
w - write, create if not exists
a - write, error if not exists
c - create, error if exists
Additional chars:
trunc - truncate if exists
+ - read as well as write
? - non-blocking lock open, i.e. it fails if already open
Channels is the ChannelList.
"""
# store
self.channels = channels
self.path = path
self.mode = mode
# check it does not already exist?
if mode in 'c' and os.path.exists(path) :
raise LogSearchError("Index already exists: %s" % (path, ))
# mapping of { mode -> flags }
mode_to_flag = {
'r': hype.Database.DBREADER,
'w': hype.Database.DBWRITER | hype.Database.DBCREAT,
'a': hype.Database.DBWRITER,
'c': hype.Database.DBWRITER | hype.Database.DBCREAT,
}
# flags to use, standard modes
flags = mode_to_flag[mode[0]]
# mode-flags
if '?' in mode :
# non-blocking locking
flags |= hype.Database.DBLCKNB
elif '+' in mode :
# read
flags |= hype.Database.DBREADER
elif 'trunc' in mode :
# truncate. Dangerous!
flags |= hype.Database.DBTRUNC
# make instance
self.db = hype.Database()
# open
if not self.db.open(path, flags) :
raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db)
def close (self) :
"""
Explicitly close the index, this is done automatically on del
"""
if not self.db.close() :
raise SearchIndexError("Index close failed", self.db)
def insert (self, channel, lines) :
"""
Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items
"""
# count from zero
count = 0
# iterate
for line in lines :
# insert
self.insert_line(channel, line)
# count
count += 1
# return
return count
def insert_line (self, channel, line) :
"""
Adds a single LogLine for the given LogChannel to the index
"""
# validate the LogChannel
assert channel.id
# validate the LogLine
assert line.offset
assert line.timestamp
# create new document
doc = hype.Document()
# line date
date = line.timestamp.date()
# ensure that it's not 1900
assert date.year != 1900
# add URI
doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
# add channel id
doc.add_attr('channel', channel.id)
# add type
doc.add_attr('type', str(line.type))
# add UTC timestamp
doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp)))
# add source attribute?
if line.source :
source_nickname, source_username, source_hostname, source_chanflags = line.source
if source_nickname :
doc.add_attr('source_nickname', source_nickname.encode('utf8'))
if source_username :
doc.add_attr('source_username', source_username.encode('utf8'))
if source_hostname :
doc.add_attr('source_hostname', source_hostname.encode('utf8'))
if source_chanflags :
doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
# add target attributes?
if line.target :
target_nickname = line.target
if target_nickname :
doc.add_attr('target_nickname', target_nickname.encode('utf8'))
# add data
if line.data :
doc.add_text(line.data.encode('utf8'))
# put, "clean up dispensable regions of the overwritten document"
if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
raise SearchIndexError("put_doc", self.db)
def search_cond (self, cond) :
"""
Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results
"""
# execute search, unused 'flags' arg stays zero
results = self.db.search(cond, 0)
# no results?
if not results :
raise NoResultsFound()
# iterate over the document IDs
for doc_id in results :
# load document, this throws an exception...
# option constants are hype.Database.GDNOATTR/GDNOTEXT
doc = self.db.get_doc(doc_id, 0)
# load the attributes/text
channel = self.channels.lookup(doc.attr('channel'))
type = int(doc.attr('type'))
timestamp = utils.from_utc_timestamp(int(doc.attr('timestamp')))
# source
source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags'))
# target
target = doc.attr('target_nickname')
# message text
message = doc.cat_texts().decode('utf8')
# build+yield to as LogLine
yield log_line.LogLine(channel, None, type, timestamp, source, target, message)
def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) :
"""
Search with flexible parameters
options - bitmask of hype.Condition.*
channel - LogChannel object
attrs - raw attribute expressions
phrase - the search query phrase
order - order attribute expression
max - number of results to return
skip - number of results to skip
"""
# build condition
cond = hype.Condition()
if options :
# set options
cond.set_options(options)
if channel :
# add channel attribute
cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8'))
if attrs :
# add attributes
for attr in attrs :
cond.add_attr(attr.encode('utf8'))
if phrase :
# add phrase
cond.set_phrase(phrase.encode('utf8'))
if order :
# set order
cond.set_order(order)
if max :
# set max
cond.set_max(max)
if skip :
# set skip
cond.set_skip(skip)
# execute
return self.search_cond(cond)
def search_simple (self, channel, query, count=None, offset=None) :
"""
Search for lines from the given channel for the given simple query.
The given text is searched for in the text of the given channel's entries, and the list of results in
reverse time order is returned.
"""
# search attributes
attrs = []
# use search(), backwards
results = list(self.search(
# simplified phrase
options = hype.Condition.SIMPLE,
# specific channel
channel = channel,
# given phrase
phrase = query,
# attributes defined above
attrs = attrs,
# order by timestamp, descending (backwards)
order = "timestamp NUMD",
# count/offset
max = count,
skip = offset,
))
# reverse
return reversed(results)
def search_advanced (self, channel, phrase=None, nick_query=None, count=None, offset=None) :
"""
Search for lines from the given channel for the given full-featured query.
The given phrase is used to build the condition, or alternatively, the given extra *_query parameters can
be used to specific additional attributes to search.
"""
attrs = []
if nick_query :
# search for messages from specific nickname
attrs.append("source_nickname STRINC %s" % nick_query)
# use search(), backwards
results = list(self.search(
# specific channel
channel = channel,
# given phrase
phrase = phrase,
# attributes defined above
attrs = attrs,
# order by timestamp, descending (backwards)
order = "timestamp NUMD",
# count/offset
max = count,
skip = offset,
))
# reverse
return reversed(results)
def list (self, channel, date, count=None, skip=None) :
"""
List all indexed log items for the given UTC date
"""
# start/end dates
dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0)
dt_end = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999)
# search
return self.search(
# specific channel
channel = channel,
# specific date range
attrs = [
"timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end))
],
# order correctly
order = "timestamp NUMA",
# max count/offset
max = count,
skip = skip
)
def get_index () :
"""
Returns the default read-only index, suitable for searching
"""
# XXX: no caching, just open it every time
_index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r')
# return
return _index