scripts/search-index
author Tero Marttila <terom@fixme.fi>
Fri, 13 Feb 2009 00:29:47 +0200
changeset 121 86aebc9cb60b
parent 106 0690d715385d
permissions -rwxr-xr-x
some quickfixes to fix deployment errors
#!/usr/bin/env python2.5

"""
    Tool for accessing the search index
"""

# XXX: fix path
import sys; sys.path.insert(0, '.'); sys.path.insert(0, '..')

import os, os.path, fcntl
import datetime, pytz
import optparse

# configuration and the LogSearchIndex module
import config, utils, log_search, channels

def _open_index (options, open_mode) :
    """
        Opens the LogSearchIndex
    """

    return log_search.LogSearchIndex(config.LOG_CHANNELS, options.index_path, open_mode)


def _open_index_and_channel (options, channel_name, open_mode) :
    """
        Opens+returns a LogSearchIndex and a LogChannel
    """
    
    # open the LogSearchIndex
    index = _open_index(options, open_mode)

    # open the channel
    channel = config.LOG_CHANNELS.lookup(channel_name)
    
    # return
    return index, channel

def _iter_insert_stats (index, channel, lines) :
    """
        Insert the given lines into the index.

        Assumes the lines will be in time-order, and yields a series of (date, count) tuples for every date that lines
        are inserted for
    """

    # last date
    date = None

    # count
    count = 0

    # iter lines
    for line in lines :
        # next day?
        if not date or line.timestamp.date() != date :
            if date :
                # yield stats
                yield date, count

            # reset count
            count = 0

            # timestamp's date
            date = line.timestamp.date()

        # insert
        index.insert_line(channel, line)

        # count
        count += 1
    
    # final count?
    if date and count :
        yield date, count

def _insert_lines (index, options, channel, lines) :
    """
        Insert the given lines into the index.

        Assumes the lines will be in time-order, and prints out as status messages the date and count for the inserted lines
    """
    
    # iterate insert stats
    for date, count in _iter_insert_stats(index, channel, lines) :
        # output date header?
        if not options.quiet :
            print "%s: %s" % (date.strftime('%Y-%m-%d'), count),

def _load_channel_date (index, options, channel, date) :
    """
        Loads the logs for the given date from the channel's LogSource into the given LogSearchIndex
    """

    if not options.quiet :
        print "Loading date for channel %s" % channel.id
        
    try :
        # load lines for date
        lines = channel.source.get_date(date)
    
    except Exception, e :
        if not options.skip_missing :
            raise
            
        if not options.quiet :
            print "\tSkipped: %s" % (e, )
    
    else :
        # insert
        _insert_lines(index, options, channel, lines)

def _parse_date (options, date_str, tz=None, fmt='%Y-%m-%d') :
    """
        Parse the given datetime, using the given timezone(defaults to options.tz) and format
    """

    # default tz
    if not tz :
        tz = options.timezone

    try :
        # parse
        return datetime.datetime.strptime(date_str, fmt).replace(tzinfo=tz)

    except Exception, e :
        raise CommandError("[ERROR] Invalid date: %s: %s" % (date_str, e))

def _output_lines (options, lines) :
    """
        Display the formatted LogLines
    """

    # display as plaintext
    for line, txt_data in options.formatter.format_txt(lines, full_timestamps=True) :
        print txt_data

class CommandError (Exception) :
    """
        Error with command-line arguments
    """

    pass

def cmd_create (options) :
    """
        Creates a new index
    """

    # open index
    index = _open_index(options, 'ctrunc' if options.force else 'c')

    # that's all
    pass

def cmd_load (options, channel_name, *dates) :
    """
        Loads the logs for a specific channel for the given dates (in terms of the channe logs' timezone) into the index
    """

    # open index/channel
    index, channel = _open_index_and_channel(options, channel_name, 'c' if options.create else 'a')
    
    # handle each date
    for date_str in dates :
        # prase date
        try :
            date = _parse_date(options, date_str, channel.source.tz)
        
        # handle errors
        except CommandError, e :
            if options.skip_missing :
                print "[ERROR] %s" % (date_name, e)

            else :
                raise
        
        # otherwise, load
        else :        
            _load_channel_date(index, options, channel, date)

def cmd_load_month (options, channel_name, *months) :
    """
        Loads the logs for a specific channel for the given months (in terms of the channel's timezone) into the index
    """

    # open index/channel
    index, channel = _open_index_and_channel(options, channel_name, 'c' if options.create else 'a')
    
    # handle each date
    for month_str in months :
        # prase date
        try :
            month = _parse_date(options, month_str, channel.source.tz, '%Y-%m')
        
        # handle errors
        except CommandError, e :
            # skip?
            if options.skip_missing :
                if not options.quiet :
                    print "[ERROR] %s" % (date_name, e)
                continue

            else :
                raise
        
        # get the set of days
        days = list(channel.source.get_month_days(month))
        
        if not options.quiet :
            print "Loading %d days of logs:" % (len(days))

        # load each day
        for date in days :
            # convert to datetime
            dt = datetime.datetime.combine(date, datetime.time(0)).replace(tzinfo=channel.source.tz)
            
            # load
            _load_channel_date(index, options, channel, dt)

def cmd_search (options, channel_name, query) :
    """
        Search the index for events on a specific channel with the given query
    """
    
    # sanity-check
    if options.create :
        raise Exception("--create doesn't make sense for 'search'")
    
    # open index/channel
    index, channel = _open_index_and_channel(options, channel_name, 'r')
    
    # search
    lines = index.search_simple(channel, query)
    
    # display
    _output_lines(options, lines)

def cmd_list (options, channel_name, *dates) :
    """
        List the indexed events for a specific date
    """

    # sanity-check
    if options.create :
        raise Exception("--create doesn't make sense for 'search'")
    
    # open index/channel
    index, channel = _open_index_and_channel(options, channel_name, 'r')

    # ...for each date
    for date_str in dates :
        # parse date
        date = _parse_date(options, date_str)

        # list
        lines = index.list(channel, date)
        
        # display
        _output_lines(options, lines)

def _autoload_reset (options, channels) :
    """
        Reset old autoload state
    """
    
    # warn
    if not options.quiet :
        print "[WARN] Resetting autoload state for: %s" % ', '.join(channel.id for channel in channels)
    
    # iter
    for channel in channels :
        # statefile path
        statefile_path = os.path.join(options.autoload_state_path, 'chan-%s' % channel.id)

        # is it present?
        if not os.path.exists(statefile_path) :
            if not options.quiet :
                print "[WARN] No statefile found at %s" % statefile_path
        
        else :
            if not options.quiet :
                print "\t%s: " % channel.id,

            # remove the statefile
            os.remove(statefile_path)
            
            if not options.quiet :
                print "OK"

def cmd_autoload (options, *channel_names) :
    """
        Automatically loads all channel logs that have not been indexed yet (by logfile mtime)
    """
    
    # open index, nonblocking
    index = _open_index(options, 'c?' if options.create else 'a?')

    # default to all channels
    if not channel_names :
        channels = config.LOG_CHANNELS
    
    else :
        channels = [config.LOG_CHANNELS.lookup(channel_name) for channel_name in channel_names]
    
    # reset autoload state?
    if options.reset :
        _autoload_reset(options, channels)
        if not options.quiet :
            print

    # iterate channels
    for channel in channels :
        if not options.quiet :
            print "Channel %s:" % channel.id

        # no 'from' by default
        after = None

        # path to our state file
        statefile_path = os.path.join(options.autoload_state_path, 'chan-%s' % channel.id)
        statefile_tmppath = statefile_path + '.tmp'

        # does it exist?
        have_tmpfile = os.path.exists(statefile_tmppath)
        
        # do we have a tempfile from a previous crash?
        if have_tmpfile and not options.ignore_resume :
            # first, open it...
            statefile_tmp = open(statefile_tmppath, 'r+')

            # ... then lock it
            fcntl.lockf(statefile_tmp, fcntl.LOCK_EX | fcntl.LOCK_NB)
            
            # read after timestamp
            after_str = statefile_tmp.read().rstrip()

            if after_str :
                # parse timestamp
                after = utils.from_utc_timestamp(int(after_str))

                if not options.quiet :
                    print "\tContinuing earlier progress from %s" % after

            else :
                # ignore
                if not options.quiet :
                    print "\t[WARN] Ignoring empty temporary statefile"

        else :
            # warn about old tmpfile that was ignored
            if have_tmpfile and not options.quiet :
                print "\t[WARN] Ignoring old tmpfile state"

            # open new tempfile
            statefile_tmp = open(statefile_tmppath, 'w')
            
            # lock
            fcntl.lockf(statefile_tmp, fcntl.LOCK_EX | fcntl.LOCK_NB)

        # override?
        if options.reload :
            # load all
            mtime = None

            if not options.quiet :
                print "\tForcing reload!"

        # stat for mtime
        else :
            # stat for mtime, None if unknown
            mtime = utils.mtime(statefile_path, ignore_missing=True)

            if mtime and not options.quiet :
                print "\tLast load time was %s" % mtime

            elif not options.quiet :
                print "\t[WARN] No previous load state! Loading full logs"
 
        # only after some specific date?
        if options.after :
            # use unless read from tempfile
            if not after :
                after = options.after
               
                if not options.quiet :
                    print "\tOnly including dates from %s onwards" % after
            
            else :
                if not options.quiet :
                    print "\t[WARN] Ignoring --from because we found a tempfile"
            
        # only up to some specific date?
        if options.until :
            until = options.until

            if not options.quiet :
                print "\tOnly including dates up to (and including) %s" % until
        else :
            # default to now
            until = None

        # get lines
        lines = channel.source.get_modified(mtime, after, until)
        
        # insert
        if not options.quiet :
            print "\tLoading and inserting..."
            print
     
        # iterate insert() per day to display info and update progress
        for date, count in _iter_insert_stats(index, channel, lines) :
            # output date header?
            if not options.quiet :
                print "\t%10s: %d" % (date.strftime('%Y-%m-%d'), count)
            
            # write temp state
            statefile_tmp.seek(0)
            statefile_tmp.write(str(utils.to_utc_timestamp(datetime.datetime.combine(date, datetime.time(0)))))
            statefile_tmp.flush()

        # write autoload state
        open(statefile_path, 'w').close()

        # close+delete tempfile
        statefile_tmp.close()
        os.remove(statefile_tmppath)
        
        if not options.quiet :
            print
    
    # done
    return

def cmd_help (options, *args) :
    """
        Help about commands
    """

    import inspect
    
    # general help stuff
    options._parser.print_help()

    # specific command?
    if args :
        # the command name
        command, = args
        
        # XXX: display info about specific command
        xxx
    
    # general
    else :
        print
        print "Available commands:"

        # build list of all cmd_* objects
        cmd_objects = [(name, obj) for name, obj in globals().iteritems() if name.startswith('cmd_') and inspect.isfunction(obj)]

        # sort alphabetically
        cmd_objects.sort()
        
        # iterate through all cmd_* objects
        for cmd_func_name, cmd_func in cmd_objects :
            # remove cmd_ prefix
            cmd_name = cmd_func_name[4:]

            # inspect
            cmd_args, cmd_varargs, cmd_varkw, cmd_default = inspect.getargspec(cmd_func)
            cmd_doc = inspect.getdoc(cmd_func)

            # remove the "options" arg
            cmd_args = cmd_args[1:]

            # display
            print "\t%10s %-30s : %s" % (cmd_name, inspect.formatargspec(cmd_args, cmd_varargs, None, cmd_default), cmd_doc)

class MyOption (optparse.Option) :
    """
        Our custom types for optparse
    """

    def check_date (option, opt, value) :
        """
            Parse a date
        """

        try :
            # parse
            return datetime.datetime.strptime(value, '%Y-%m-%d')
        
        # trap -> OptionValueError
        except Exception, e :
            raise optparse.OptionValueError("option %s: invalid date value: %r" % (opt, value))
    
    def check_timezone (option, opt, value) :
        """
            Parse a timezone
        """

        try :
            # parse
            return pytz.timezone(value)
        
        # trap -> OptionValueError
        except Exception, e :
            raise optparse.OptionValueError("option %s: invalid timezone: %r" % (opt, value))

    def take_action (self, action, dest, opt, value, values, parser) :
        """
            Override take_action to handle date
        """

        if action == "parse_date" :
            # get timezone
            tz = values.timezone

            # set timezone
            value = value.replace(tzinfo=tz)

            # store
            return optparse.Option.take_action(self, 'store', dest, opt, value, values, parser)

        else :
            # default
            return optparse.Option.take_action(self, action, dest, opt, value, values, parser)

    TYPES = optparse.Option.TYPES + ('date', 'timezone')
    TYPE_CHECKER = optparse.Option.TYPE_CHECKER.copy()
    TYPE_CHECKER['date'] = check_date
    TYPE_CHECKER['timezone'] = check_timezone
    ACTIONS = optparse.Option.ACTIONS + ('parse_date', )
    STORE_ACTIONS = optparse.Option.STORE_ACTIONS + ('parse_date', )
    TYPED_ACTIONS = optparse.Option.TYPED_ACTIONS + ('parse_date', )
    ACTIONS = optparse.Option.ACTIONS + ('parse_date', )

def main (argv) :
    """
        Command-line main, with given argv
    """

    # define parser
    parser = optparse.OptionParser(
        usage           = "%prog [options] <command> [ ... ]",
        add_help_option = False,
        option_class    = MyOption,
    )

    # general options       #                   #                       #                                   #
    general = optparse.OptionGroup(parser, "General Options")
    general.add_option('-h', "--help",          dest="help",            help="Show this help message and exit",     
                                                action="store_true"                                         )

    general.add_option(     "--formatter",      dest="formatter_name",  help="LogFormatter to use",                 
            metavar="FMT",  type="choice",                              default=config.PREF_FORMATTER_DEFAULT.name,
            choices=[fmt_name for fmt_name in config.LOG_FORMATTERS.iterkeys()]                             )

    general.add_option(     "--index",          dest="index_path",      help="Index database path",                 
            metavar="PATH",                                             default=config.SEARCH_INDEX_PATH    )

    general.add_option(     "--timezone",       dest="timezone",        help="Timezone for output",                 
            metavar="TZ",   type="timezone",                            default=pytz.utc                    )

    general.add_option(     "--force",          dest="force",           help="Force dangerous operation",           
                                                action="store_true"                                         )

    general.add_option(     "--quiet",          dest="quiet",           help="Supress status messages",             
                                                action="store_true"                                         )
    parser.add_option_group(general)
    

    # cmd_load options      #                   #                       #                                   #
    load = optparse.OptionGroup(parser, "Load Options")
    load.add_option(        "--skip-missing",   dest="skip_missing",    help="Skip missing logfiles",
                                                action="store_true"                                         )

    load.add_option(        "--create",         dest="create",          help="Create index database", 
                                                action="store_true"                                         )
    parser.add_option_group(load)
    

    # cmd_autoload options  #                   #                       #                                   #
    autoload = optparse.OptionGroup(parser, "Autoload Options")
    autoload.add_option(    "--autoload-state", dest="autoload_state_path", help="Path to autoload state dir",      
            metavar="PATH",                                             default=config.SEARCH_AUTOINDEX_PATH)

    autoload.add_option(    "--from",           dest="after",           help="Only autoload logfiles from the given date on", 
            metavar="DATE", type="date",        action="parse_date",    default=None                        )

    autoload.add_option(    "--until",          dest="until",           help="Only autoload logfiles up to (and including) the given date",  
            metavar="DATE", type="date",        action="parse_date",    default=None                        )

    autoload.add_option(    "--reload",         dest="reload",          help="Force reload lines",
                                                action="store_true"                                         )

    autoload.add_option(    "--reset",          dest="reset",           help="Reset old autload state",
                                                action="store_true"                                         )

    autoload.add_option(    "--ignore-resume",  dest="ignore_resume",   help="Do not try and resume interrupted autoload",  
                                                action="store_true"                                         )
    parser.add_option_group(autoload)

    # parse
    options, args = parser.parse_args(argv[1:])

    # postprocess stuff
    options._parser = parser
    options.formatter = config.LOG_FORMATTERS[options.formatter_name](options.timezone, "%H:%M:%S", None, None)

    # special-case --help
    if options.help :
        return cmd_help(options, *args)
    
    # must have at least the command argument
    if not args :
        raise CommandError("Missing command")
    
    # pop command
    command = args.pop(0)
    
    # get func
    func = globals().get('cmd_%s' % command)
    
    # unknown command?
    if not func :
        raise CommandError("Unknown command: %s" % command)
    
    # call
    func(options, *args)

if __name__ == '__main__' :
    try :
        main(sys.argv)
        sys.exit(0)

    except CommandError, e :
        print e
        sys.exit(1)