# HG changeset patch # User Tero Marttila # Date 1234310827 -7200 # Node ID 48fca00689e3089d83f5b26ff439de2a91c95884 # Parent 74f6a0b01ddfd323dad9ce244485bf09c98b4df6 implement scripts/search-index autoload diff -r 74f6a0b01ddf -r 48fca00689e3 .hgignore --- a/.hgignore Wed Feb 11 01:21:22 2009 +0200 +++ b/.hgignore Wed Feb 11 02:07:07 2009 +0200 @@ -3,4 +3,4 @@ \.pyc$ ^cache/templates/. ^logs/(old_)?index - +^logs/autoload-state/ diff -r 74f6a0b01ddf -r 48fca00689e3 log_search.py --- a/log_search.py Wed Feb 11 01:21:22 2009 +0200 +++ b/log_search.py Wed Feb 11 02:07:07 2009 +0200 @@ -46,7 +46,7 @@ def __init__ (self, channels, path, mode='r') : """ Open the database at the given path, with the given mode: - r - read-only + r - read, error if not exists w - write, create if not exists a - write, error if not exists c - write, create, error if exists @@ -88,75 +88,86 @@ Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items """ - # validate the LogChannel - assert channel.name - + # count from zero count = 0 # iterate for line in lines : - # validate the LogLine - assert line.offset - assert line.timestamp - - # create new document - doc = hype.Document() - - # line date - date = line.timestamp.date() - - # ensure that it's not 1900 - assert date.year != 1900 - - # add URI - doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) - - # add channel id - doc.add_attr('channel', channel.id) - - # add type - doc.add_attr('type', str(line.type)) - - # add UTC timestamp - doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp))) + # insert + self.insert_line(channel, line) - # add source attribute? - if line.source : - source_nickname, source_username, source_hostname, source_chanflags = line.source - - if source_nickname : - doc.add_attr('source_nickname', source_nickname.encode('utf8')) - - if source_username : - doc.add_attr('source_username', source_username.encode('utf8')) - - if source_hostname : - doc.add_attr('source_hostname', source_hostname.encode('utf8')) - - if source_chanflags : - doc.add_attr('source_chanflags', source_chanflags.encode('utf8')) - - # add target attributes? - if line.target : - target_nickname = line.target - - if target_nickname : - doc.add_attr('target_nickname', target_nickname.encode('utf8')) - - # add data - if line.data : - doc.add_text(line.data.encode('utf8')) - - # put, "clean up dispensable regions of the overwritten document" - if not self.db.put_doc(doc, hype.Database.PDCLEAN) : - raise Exeception("Index put_doc failed") - # count count += 1 # return return count + + + def insert_line (self, channel, line) : + """ + Adds a single LogLine for the given LogChannel to the index + """ + + # validate the LogChannel + assert channel.id + + # validate the LogLine + assert line.offset + assert line.timestamp + + # create new document + doc = hype.Document() + + # line date + date = line.timestamp.date() + + # ensure that it's not 1900 + assert date.year != 1900 + + # add URI + doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) + + # add channel id + doc.add_attr('channel', channel.id) + + # add type + doc.add_attr('type', str(line.type)) + + # add UTC timestamp + doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp))) + + # add source attribute? + if line.source : + source_nickname, source_username, source_hostname, source_chanflags = line.source + + if source_nickname : + doc.add_attr('source_nickname', source_nickname.encode('utf8')) + + if source_username : + doc.add_attr('source_username', source_username.encode('utf8')) + + if source_hostname : + doc.add_attr('source_hostname', source_hostname.encode('utf8')) + + if source_chanflags : + doc.add_attr('source_chanflags', source_chanflags.encode('utf8')) + + # add target attributes? + if line.target : + target_nickname = line.target + + if target_nickname : + doc.add_attr('target_nickname', target_nickname.encode('utf8')) + + # add data + if line.data : + doc.add_text(line.data.encode('utf8')) + + # put, "clean up dispensable regions of the overwritten document" + if not self.db.put_doc(doc, hype.Database.PDCLEAN) : + raise Exeception("Index put_doc failed") + def search_cond (self, cond) : """ Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results diff -r 74f6a0b01ddf -r 48fca00689e3 log_source.py --- a/log_source.py Wed Feb 11 01:21:22 2009 +0200 +++ b/log_source.py Wed Feb 11 02:07:07 2009 +0200 @@ -3,10 +3,10 @@ """ import datetime, calendar, itertools, functools, math -import os, errno +import os, os.path, errno import pytz -import config +import config, utils class LogSourceDecoder (object) : """ @@ -161,6 +161,17 @@ """ abstract + + def get_modified (self, dt=None) : + """ + Returns a sequence of LogLines that may have been *modified* from their old values since the given datetime. + + If the datetime is not given, *all* lines are returned + + The LogLines should be in time order. + """ + + abstract class LogFile (object) : """ @@ -364,12 +375,14 @@ # convert to date and use that return self._get_logfile_date(dtz.date()) - def _get_logfile_date (self, d, load=True) : + def _get_logfile_date (self, d, load=True, stat=True, ignore_missing=True) : """ - Get the logfile corresponding to the given naive date in our timezone. If load is False, only test for the - presence of the logfile, do not actually open it. + Get the logfile corresponding to the given naive date in our timezone. + + If load is False, only test for the presence of the logfile, do not actually open it. If stat is given, + then this returns the stat() result - Returns None if the logfile does not exist. + Returns None if the logfile does not exist, unless ignore_missing is given as False. """ # format filename @@ -383,6 +396,10 @@ # open+return the LogFile return LogFile(path, self.parser, self.decoder, start_date=d, channel=self.channel) + elif stat : + # stat + return os.stat(path) + else : # test return os.path.exists(path) @@ -390,12 +407,33 @@ # XXX: move to LogFile except IOError, e : # return None for missing files - if e.errno == errno.ENOENT : + if e.errno == errno.ENOENT and ignore_missing : return None else : raise + def _iter_logfile_dates (self) : + """ + Yields a series of naive datetime objects representing the logfiles that are available, in time order + """ + + # listdir + filenames = os.listdir(self.path) + + # sort + filenames.sort() + + # iter files + for filename in filenames : + try : + # parse date + yield + yield datetime.datetime.strptime(filename, self.filename_fmt).replace(tzinfo=self.tz) + + except : + # ignore + continue + def _iter_date_reverse (self, dt=None) : """ Yields an infinite series of naive date objects in our timezone, iterating backwards in time starting at the @@ -566,3 +604,27 @@ # valid yield dt.date() + def get_modified (self, dt=None) : + """ + Returns the contents off all logfiles with mtimes past the given date + """ + + # iterate through all available logfiles in date order, as datetimes + for log_date in self._iter_logfile_dates() : + # compare against dt? + if dt : + # stat + st = self._get_logfile_date(log_date, load=False, stat=True) + + # not modified? + if utils.from_utc_timestamp(st.st_mtime) < dt : + # skip + continue + + # open + logfile = self._get_logfile_date(log_date, ignore_missing=False) + + # yield all lines + for line in logfile.read_full() : + yield line + diff -r 74f6a0b01ddf -r 48fca00689e3 scripts/search-index --- a/scripts/search-index Wed Feb 11 01:21:22 2009 +0200 +++ b/scripts/search-index Wed Feb 11 02:07:07 2009 +0200 @@ -7,10 +7,11 @@ # XXX: fix path import sys; sys.path.insert(0, '.'); sys.path.insert(0, '..') +import os, os.path import datetime, pytz # configuration and the LogSearchIndex module -import config, log_search, channels +import config, utils, log_search, channels def _open_index (options, open_mode) : """ @@ -34,13 +35,53 @@ # return return index, channel +def _insert_lines (index, options, channel, lines) : + """ + Insert the given lines into the index. + + Assumes the lines will be in time-order, and prints out as status messages the date and count for the inserted lines + """ + + # last date + date = None + + # count + count = 0 + + # iter lines + for line in lines : + # output new date header? + if not options.quiet and (not date or line.timestamp.date() != date) : + # previous date's line count? + if date : + print "OK: %d lines" % count + + # reset count + count = 0 + + # timestamp's date + date = line.timestamp.date() + + # status header + print "%s:" % (date.strftime('%Y-%m-%d'), ), + + # insert + index.insert_line(channel, line) + + # count + count += 1 + + # final count line + if not options.quiet and date : + print "OK: %d lines" % count + def _load_channel_date (index, options, channel, date) : """ Loads the logs for the given date from the channel's LogSource into the given LogSearchIndex """ if not options.quiet : - print "%s %s..." % (channel.id, date.strftime(channel.source.filename_fmt)), + print "Loading date for channel %s" % channel.id try : # load lines for date @@ -51,15 +92,11 @@ raise if not options.quiet : - print "Skipped: %s" % (e, ) + print "\tSkipped: %s" % (e, ) else : - # insert -> count - count = index.insert(channel, lines) - - if not options.quiet : - print "OK: %d lines" % count - + # insert + _insert_lines(index, options, channel, lines) def _parse_date (options, date_str, tz=None, fmt='%Y-%m-%d') : """ @@ -208,6 +245,65 @@ # display _output_lines(options, lines) +def cmd_autoload (options, *channel_names) : + """ + Automatically loads all channel logs that have not been indexed yet (by logfile mtime) + """ + + # open index + index = _open_index(options, 'c' if options.create else 'a') + + # default to all channels + if not channel_names : + channels = config.LOG_CHANNELS + + else : + channels = [config.LOG_CHANNELS.lookup(channel_name) for channel_name in channel_names] + + # iterate channels + for channel in channels : + if not options.quiet : + print "Channel %s:" % channel.id, + + # path to our state file + statefile_path = os.path.join(options.autoload_state_path, 'chan-%s' % channel.id) + + # override? + if options.reload : + # load all + mtime = None + + if not options.quiet : + print "reloading all:", + + # stat for mtime + # XXX: replace with single utils.mtime() + elif os.path.exists(statefile_path) : + # get last update date for channel + mtime = utils.from_utc_timestamp(os.stat(statefile_path).st_mtime) + + if not options.quiet : + print "last load=%s:" % mtime, + + else : + # unknown, load all + mtime = None + + if not options.quiet : + print "no previous load state:", + + # get lines + lines = channel.source.get_modified(mtime) + + # insert + if not options.quiet : + print "inserting..." + + _insert_lines(index, options, channel, lines) + + # write autoload state + open(statefile_path, 'w').close() + def cmd_help (options, *args) : """ Help about commands @@ -271,10 +367,12 @@ choices=[fmt_name for fmt_name in config.LOG_FORMATTERS.iterkeys()]) parser.add_option('-I', "--index", dest="index_path", help="Index database path", metavar="PATH", default="logs/index") + parser.add_option( "--autoload-state", dest="autoload_state_path", help="Path to autoload state dir", metavar="PATH", default="logs/autoload-state") parser.add_option('-Z', "--timezone", dest="tz_name", help="Timezone for output", metavar="TZ", default="UTC") parser.add_option('-f', "--force", dest="force", help="Force dangerous operation", action="store_true") parser.add_option( "--create", dest="create", help="Create index database", action="store_true") parser.add_option( "--skip-missing", dest="skip_missing", help="Skip missing logfiles", action="store_true") + parser.add_option( "--reload", dest="reload", help="Force reload lines", action="store_true") parser.add_option( "--quiet", dest="quiet", help="Supress status messages", action="store_true") # parse