implement scripts/search-index autoload
authorTero Marttila <terom@fixme.fi>
Wed, 11 Feb 2009 02:07:07 +0200
changeset 93 48fca00689e3
parent 92 74f6a0b01ddf
child 94 6673de9bc911
implement scripts/search-index autoload
.hgignore
log_search.py
log_source.py
logs/autoload-state/.empty_dir
scripts/search-index
--- a/.hgignore	Wed Feb 11 01:21:22 2009 +0200
+++ b/.hgignore	Wed Feb 11 02:07:07 2009 +0200
@@ -3,4 +3,4 @@
 \.pyc$
 ^cache/templates/.
 ^logs/(old_)?index
-
+^logs/autoload-state/
--- a/log_search.py	Wed Feb 11 01:21:22 2009 +0200
+++ b/log_search.py	Wed Feb 11 02:07:07 2009 +0200
@@ -46,7 +46,7 @@
     def __init__ (self, channels, path, mode='r') :
         """
             Open the database at the given path, with the given mode:
-                r       - read-only
+                r       - read, error if not exists
                 w       - write, create if not exists
                 a       - write, error if not exists
                 c       - write, create, error if exists
@@ -88,75 +88,86 @@
             Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items
         """
         
-        # validate the LogChannel
-        assert channel.name
-
+        # count from zero
         count = 0
         
         # iterate
         for line in lines :
-            # validate the LogLine
-            assert line.offset
-            assert line.timestamp
-
-            # create new document
-            doc = hype.Document()
-
-            # line date
-            date = line.timestamp.date()
-
-            # ensure that it's not 1900
-            assert date.year != 1900
-
-            # add URI
-            doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
-
-            # add channel id
-            doc.add_attr('channel',     channel.id)
-
-            # add type
-            doc.add_attr('type',        str(line.type))
-
-            # add UTC timestamp
-            doc.add_attr('timestamp',   str(utils.to_utc_timestamp(line.timestamp)))
+            # insert
+            self.insert_line(channel, line)
 
-            # add source attribute?
-            if line.source :
-                source_nickname, source_username, source_hostname, source_chanflags = line.source
-
-                if source_nickname :
-                    doc.add_attr('source_nickname', source_nickname.encode('utf8'))
-                
-                if source_username :
-                    doc.add_attr('source_username', source_username.encode('utf8'))
-
-                if source_hostname :
-                    doc.add_attr('source_hostname', source_hostname.encode('utf8'))
-
-                if source_chanflags :
-                    doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
-            
-            # add target attributes?
-            if line.target :
-                target_nickname = line.target
-
-                if target_nickname :
-                    doc.add_attr('target_nickname', target_nickname.encode('utf8'))
-
-            # add data
-            if line.data :
-                doc.add_text(line.data.encode('utf8'))
-
-            # put, "clean up dispensable regions of the overwritten document"
-            if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
-                raise Exeception("Index put_doc failed")
-            
             # count
             count += 1
         
         # return
         return count
 
+
+
+    def insert_line (self, channel, line) :
+        """
+            Adds a single LogLine for the given LogChannel to the index
+        """
+
+        # validate the LogChannel
+        assert channel.id
+
+        # validate the LogLine
+        assert line.offset
+        assert line.timestamp
+
+        # create new document
+        doc = hype.Document()
+
+        # line date
+        date = line.timestamp.date()
+
+        # ensure that it's not 1900
+        assert date.year != 1900
+
+        # add URI
+        doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
+
+        # add channel id
+        doc.add_attr('channel',     channel.id)
+
+        # add type
+        doc.add_attr('type',        str(line.type))
+
+        # add UTC timestamp
+        doc.add_attr('timestamp',   str(utils.to_utc_timestamp(line.timestamp)))
+
+        # add source attribute?
+        if line.source :
+            source_nickname, source_username, source_hostname, source_chanflags = line.source
+
+            if source_nickname :
+                doc.add_attr('source_nickname', source_nickname.encode('utf8'))
+            
+            if source_username :
+                doc.add_attr('source_username', source_username.encode('utf8'))
+
+            if source_hostname :
+                doc.add_attr('source_hostname', source_hostname.encode('utf8'))
+
+            if source_chanflags :
+                doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
+        
+        # add target attributes?
+        if line.target :
+            target_nickname = line.target
+
+            if target_nickname :
+                doc.add_attr('target_nickname', target_nickname.encode('utf8'))
+
+        # add data
+        if line.data :
+            doc.add_text(line.data.encode('utf8'))
+
+        # put, "clean up dispensable regions of the overwritten document"
+        if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
+            raise Exeception("Index put_doc failed")
+            
     def search_cond (self, cond) :
         """
             Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results
--- a/log_source.py	Wed Feb 11 01:21:22 2009 +0200
+++ b/log_source.py	Wed Feb 11 02:07:07 2009 +0200
@@ -3,10 +3,10 @@
 """
 
 import datetime, calendar, itertools, functools, math
-import os, errno
+import os, os.path, errno
 import pytz
 
-import config
+import config, utils
 
 class LogSourceDecoder (object) :
     """
@@ -161,6 +161,17 @@
         """
 
         abstract
+    
+    def get_modified (self, dt=None) :
+        """
+            Returns a sequence of LogLines that may have been *modified* from their old values since the given datetime.
+
+            If the datetime is not given, *all* lines are returned
+
+            The LogLines should be in time order.
+        """
+
+        abstract
 
 class LogFile (object) :
     """
@@ -364,12 +375,14 @@
         # convert to date and use that
         return self._get_logfile_date(dtz.date())
 
-    def _get_logfile_date (self, d, load=True) :
+    def _get_logfile_date (self, d, load=True, stat=True, ignore_missing=True) :
         """
-            Get the logfile corresponding to the given naive date in our timezone. If load is False, only test for the
-            presence of the logfile, do not actually open it.
+            Get the logfile corresponding to the given naive date in our timezone. 
+            
+            If load is False, only test for the presence of the logfile, do not actually open it. If stat is given,
+            then this returns the stat() result
 
-            Returns None if the logfile does not exist.
+            Returns None if the logfile does not exist, unless ignore_missing is given as False.
         """
 
         # format filename
@@ -383,6 +396,10 @@
                 # open+return the LogFile
                 return LogFile(path, self.parser, self.decoder, start_date=d, channel=self.channel)
             
+            elif stat :
+                # stat
+                return os.stat(path)
+
             else :
                 # test
                 return os.path.exists(path)
@@ -390,12 +407,33 @@
         # XXX: move to LogFile
         except IOError, e :
             # return None for missing files
-            if e.errno == errno.ENOENT :
+            if e.errno == errno.ENOENT and ignore_missing :
                 return None
 
             else :
                 raise
     
+    def _iter_logfile_dates (self) :
+        """
+            Yields a series of naive datetime objects representing the logfiles that are available, in time order
+        """
+
+        # listdir
+        filenames = os.listdir(self.path)
+
+        # sort
+        filenames.sort()
+
+        # iter files
+        for filename in filenames :
+            try :
+                # parse date + yield
+                yield datetime.datetime.strptime(filename, self.filename_fmt).replace(tzinfo=self.tz)
+            
+            except :
+                # ignore
+                continue
+            
     def _iter_date_reverse (self, dt=None) :
         """
             Yields an infinite series of naive date objects in our timezone, iterating backwards in time starting at the
@@ -566,3 +604,27 @@
                 # valid
                 yield dt.date()
 
+    def get_modified (self, dt=None) :
+        """
+            Returns the contents off all logfiles with mtimes past the given date
+        """
+        
+        # iterate through all available logfiles in date order, as datetimes
+        for log_date in self._iter_logfile_dates() :
+            # compare against dt?
+            if dt :
+                # stat
+                st = self._get_logfile_date(log_date, load=False, stat=True)
+
+                # not modified?
+                if utils.from_utc_timestamp(st.st_mtime) < dt :
+                    # skip
+                    continue
+                
+            # open
+            logfile = self._get_logfile_date(log_date, ignore_missing=False)
+
+            # yield all lines
+            for line in logfile.read_full() :
+                yield line
+
--- a/scripts/search-index	Wed Feb 11 01:21:22 2009 +0200
+++ b/scripts/search-index	Wed Feb 11 02:07:07 2009 +0200
@@ -7,10 +7,11 @@
 # XXX: fix path
 import sys; sys.path.insert(0, '.'); sys.path.insert(0, '..')
 
+import os, os.path
 import datetime, pytz
 
 # configuration and the LogSearchIndex module
-import config, log_search, channels
+import config, utils, log_search, channels
 
 def _open_index (options, open_mode) :
     """
@@ -34,13 +35,53 @@
     # return
     return index, channel
 
+def _insert_lines (index, options, channel, lines) :
+    """
+        Insert the given lines into the index.
+
+        Assumes the lines will be in time-order, and prints out as status messages the date and count for the inserted lines
+    """
+
+    # last date
+    date = None
+
+    # count
+    count = 0
+
+    # iter lines
+    for line in lines :
+        # output new date header?
+        if not options.quiet and (not date or line.timestamp.date() != date) :
+            # previous date's line count?
+            if date :
+                print "OK: %d lines" % count
+            
+            # reset count
+            count = 0
+
+            # timestamp's date
+            date = line.timestamp.date()
+            
+            # status header
+            print "%s:" % (date.strftime('%Y-%m-%d'), ),
+
+        # insert
+        index.insert_line(channel, line)
+
+        # count
+        count += 1
+    
+    # final count line
+    if not options.quiet and date :
+        print "OK: %d lines" % count
+
 def _load_channel_date (index, options, channel, date) :
     """
         Loads the logs for the given date from the channel's LogSource into the given LogSearchIndex
     """
 
     if not options.quiet :
-        print "%s %s..." % (channel.id, date.strftime(channel.source.filename_fmt)),
+        print "Loading date for channel %s" % channel.id
         
     try :
         # load lines for date
@@ -51,15 +92,11 @@
             raise
             
         if not options.quiet :
-            print "Skipped: %s" % (e, )
+            print "\tSkipped: %s" % (e, )
     
     else :
-        # insert -> count
-        count = index.insert(channel, lines)
-
-        if not options.quiet :
-            print "OK: %d lines" % count
-
+        # insert
+        _insert_lines(index, options, channel, lines)
 
 def _parse_date (options, date_str, tz=None, fmt='%Y-%m-%d') :
     """
@@ -208,6 +245,65 @@
         # display
         _output_lines(options, lines)
 
+def cmd_autoload (options, *channel_names) :
+    """
+        Automatically loads all channel logs that have not been indexed yet (by logfile mtime)
+    """
+    
+    # open index
+    index = _open_index(options, 'c' if options.create else 'a')
+
+    # default to all channels
+    if not channel_names :
+        channels = config.LOG_CHANNELS
+    
+    else :
+        channels = [config.LOG_CHANNELS.lookup(channel_name) for channel_name in channel_names]
+
+    # iterate channels
+    for channel in channels :
+        if not options.quiet :
+            print "Channel %s:" % channel.id,
+
+        # path to our state file
+        statefile_path = os.path.join(options.autoload_state_path, 'chan-%s' % channel.id)
+        
+        # override?
+        if options.reload :
+            # load all
+            mtime = None
+
+            if not options.quiet :
+                print "reloading all:",
+
+        # stat for mtime
+        # XXX: replace with single utils.mtime()
+        elif os.path.exists(statefile_path) :
+            # get last update date for channel
+            mtime = utils.from_utc_timestamp(os.stat(statefile_path).st_mtime)
+            
+            if not options.quiet :
+                print "last load=%s:" % mtime,
+
+        else :
+            # unknown, load all
+            mtime = None
+            
+            if not options.quiet :
+                print "no previous load state:",
+        
+        # get lines
+        lines = channel.source.get_modified(mtime)
+        
+        # insert
+        if not options.quiet :
+            print "inserting..."
+        
+        _insert_lines(index, options, channel, lines)
+
+        # write autoload state
+        open(statefile_path, 'w').close()
+
 def cmd_help (options, *args) :
     """
         Help about commands
@@ -271,10 +367,12 @@
         choices=[fmt_name for fmt_name in config.LOG_FORMATTERS.iterkeys()])
 
     parser.add_option('-I', "--index",          dest="index_path",      help="Index database path",                 metavar="PATH", default="logs/index")
+    parser.add_option(      "--autoload-state", dest="autoload_state_path", help="Path to autoload state dir",      metavar="PATH", default="logs/autoload-state")
     parser.add_option('-Z', "--timezone",       dest="tz_name",         help="Timezone for output",                 metavar="TZ",   default="UTC")
     parser.add_option('-f', "--force",          dest="force",           help="Force dangerous operation",           action="store_true")
     parser.add_option(      "--create",         dest="create",          help="Create index database",               action="store_true")
     parser.add_option(      "--skip-missing",   dest="skip_missing",    help="Skip missing logfiles",               action="store_true")
+    parser.add_option(      "--reload",         dest="reload",          help="Force reload lines",                  action="store_true")
     parser.add_option(      "--quiet",          dest="quiet",           help="Supress status messages",             action="store_true")
 
     # parse