"""
Full-text searching of logs
"""
import datetime, calendar, pytz
import HyperEstraier as hype
import log_line
class LogSearchIndex (object) :
"""
An index on the logs for a group of channels.
This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
These log documents have the following attributes:
@uri - channel/date/line
@channel - channel id
@type - the LogType id
@timestamp - UTC timestamp
@source - nickname
Each document then has a single line of data, which is the log message itself
"""
def __init__ (self, path, mode='r') :
"""
Open the database, with the given mode:
r - read-only
w - read-write, create if not exists
a - read-write, do not create
* - read-write, truncate and create new
"""
# mapping of { mode -> flags }
mode_to_flag = {
'r': hype.Database.DBREADER,
'w': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT,
'a': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT,
'*': hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC,
}
# look up flags
flags = mode_to_flag[mode]
# make instance
self.db = hype.Database()
# open
if not self.db.open(path, flags) :
raise Exception("Index open failed: %s, mode=%s, flags=%#06x: %s" % (path, mode, flags, self.db.err_msg(self.db.error())))
def insert (self, channel, lines) :
"""
Adds a sequence of LogLines from the given LogChannel to the index
"""
# validate the LogChannel
assert channel.name
# iterate
for line in lines :
# validate the LogLine
assert line.offset
assert line.timestamp
# create new document
doc = hype.Document()
# line date
date = line.timestamp.date()
# convert to UTC timestamp
utc_timestamp = calendar.timegm(line.timestamp.utctimetuple())
# ensure that it's not 1900
assert date.year != 1900
# add URI
doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
# add channel id
doc.add_attr('@channel', channel.id)
# add type
doc.add_attr('@type', str(line.type))
# add UTC timestamp
doc.add_attr('@timestamp', str(utc_timestamp))
# add source attribute?
if line.source :
doc.add_attr('@source', str(line.source))
# add data text
doc.add_text(line.data.encode('utf8'))
# put
# XXX: what does this flag mean?
if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
raise Exeception("Index put_doc failed")
def search_cond (self, cond) :
"""
Search using a raw hype.Condition
"""
# execute search, unused 'flags' arg stays zero
results = self.db.search(cond, 0)
# iterate over the document IDs
for doc_id in results :
# load document, this throws an exception...
# option constants are hype.Database.GDNOATTR/GDNOTEXT
doc = self.db.get_doc(doc_id, 0)
# load the attributes/text
channel_id = doc.attr('@channel')
type = int(doc.attr('@type'))
timestamp = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc)
source = doc.attr('@source')
data = doc.cat_texts().decode('utf8')
# build+yield to (channel_id, LogLine) tuple
yield (channel_id, log_line.LogLine(None, type, timestamp, source, data))
def search_simple (self, channel, query) :
"""
Search for lines from the given channel for the given simple query
"""
# build condition
cond = hype.Condition()
# simplified phrase
cond.set_options(hype.Condition.SIMPLE)
# add channel attribute
cond.add_attr("@channel STREQ %s" % (channel.id, ))
# add phrase
cond.set_phrase(query)
# set order
cond.set_order("@timestamp NUMA")
# search with cond
for channel_id, line in self.search_cond(cond) :
assert channel_id == channel.id
yield line