diff -r e163794ccf54 -r 154d2d8ae9c0 bin/qmsk-irclogs-search-index --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/qmsk-irclogs-search-index Sun Sep 13 20:08:16 2009 +0300 @@ -0,0 +1,640 @@ +#!/usr/bin/env python2.5 + +""" + Tool for accessing the search index +""" + +# XXX: fix path +import sys; sys.path.insert(0, '.'); sys.path.insert(0, '..') + +import os, os.path, fcntl +import datetime, pytz +import optparse + +# configuration and the LogSearchIndex module +from qmsk.irclogs import config, utils, log_search, channels + +def _open_index (options, open_mode) : + """ + Opens the LogSearchIndex + """ + + return log_search.LogSearchIndex(config.LOG_CHANNELS, options.index_path, open_mode) + + +def _open_index_and_channel (options, channel_name, open_mode) : + """ + Opens+returns a LogSearchIndex and a LogChannel + """ + + # open the LogSearchIndex + index = _open_index(options, open_mode) + + # open the channel + channel = config.LOG_CHANNELS.lookup(channel_name) + + # return + return index, channel + +def _iter_insert_stats (index, channel, lines) : + """ + Insert the given lines into the index. + + Assumes the lines will be in time-order, and yields a series of (date, count) tuples for every date that lines + are inserted for + """ + + # last date + date = None + + # count + count = 0 + + # iter lines + for line in lines : + # next day? + if not date or line.timestamp.date() != date : + if date : + # yield stats + yield date, count + + # reset count + count = 0 + + # timestamp's date + date = line.timestamp.date() + + # insert + index.insert_line(channel, line) + + # count + count += 1 + + # final count? + if date and count : + yield date, count + +def _insert_lines (index, options, channel, lines) : + """ + Insert the given lines into the index. + + Assumes the lines will be in time-order, and prints out as status messages the date and count for the inserted lines + """ + + # iterate insert stats + for date, count in _iter_insert_stats(index, channel, lines) : + # output date header? + if not options.quiet : + print "%s: %s" % (date.strftime('%Y-%m-%d'), count), + +def _load_channel_date (index, options, channel, date) : + """ + Loads the logs for the given date from the channel's LogSource into the given LogSearchIndex + """ + + if not options.quiet : + print "Loading date for channel %s" % channel.id + + try : + # load lines for date + lines = channel.source.get_date(date) + + except Exception, e : + if not options.skip_missing : + raise + + if not options.quiet : + print "\tSkipped: %s" % (e, ) + + else : + # insert + _insert_lines(index, options, channel, lines) + +def _parse_date (options, date_str, tz=None, fmt='%Y-%m-%d') : + """ + Parse the given datetime, using the given timezone(defaults to options.tz) and format + """ + + # default tz + if not tz : + tz = options.timezone + + try : + # parse + return datetime.datetime.strptime(date_str, fmt).replace(tzinfo=tz) + + except Exception, e : + raise CommandError("[ERROR] Invalid date: %s: %s" % (date_str, e)) + +def _output_lines (options, lines) : + """ + Display the formatted LogLines + """ + + # display as plaintext + for line, txt_data in options.formatter.format_txt(lines, full_timestamps=True) : + print txt_data + +class CommandError (Exception) : + """ + Error with command-line arguments + """ + + pass + +def cmd_create (options) : + """ + Creates a new index + """ + + # open index + index = _open_index(options, 'ctrunc' if options.force else 'c') + + # that's all + pass + +def cmd_load (options, channel_name, *dates) : + """ + Loads the logs for a specific channel for the given dates (in terms of the channe logs' timezone) into the index + """ + + # open index/channel + index, channel = _open_index_and_channel(options, channel_name, 'c' if options.create else 'a') + + # handle each date + for date_str in dates : + # prase date + try : + date = _parse_date(options, date_str, channel.source.tz) + + # handle errors + except CommandError, e : + if options.skip_missing : + print "[ERROR] %s" % (date_name, e) + + else : + raise + + # otherwise, load + else : + _load_channel_date(index, options, channel, date) + +def cmd_load_month (options, channel_name, *months) : + """ + Loads the logs for a specific channel for the given months (in terms of the channel's timezone) into the index + """ + + # open index/channel + index, channel = _open_index_and_channel(options, channel_name, 'c' if options.create else 'a') + + # handle each date + for month_str in months : + # prase date + try : + month = _parse_date(options, month_str, channel.source.tz, '%Y-%m') + + # handle errors + except CommandError, e : + # skip? + if options.skip_missing : + if not options.quiet : + print "[ERROR] %s" % (date_name, e) + continue + + else : + raise + + # get the set of days + days = list(channel.source.get_month_days(month)) + + if not options.quiet : + print "Loading %d days of logs:" % (len(days)) + + # load each day + for date in days : + # convert to datetime + dt = datetime.datetime.combine(date, datetime.time(0)).replace(tzinfo=channel.source.tz) + + # load + _load_channel_date(index, options, channel, dt) + +def cmd_search (options, channel_name, query) : + """ + Search the index for events on a specific channel with the given query + """ + + # sanity-check + if options.create : + raise Exception("--create doesn't make sense for 'search'") + + # open index/channel + index, channel = _open_index_and_channel(options, channel_name, 'r') + + # search + lines = index.search_simple(channel, query) + + # display + _output_lines(options, lines) + +def cmd_list (options, channel_name, *dates) : + """ + List the indexed events for a specific date + """ + + # sanity-check + if options.create : + raise Exception("--create doesn't make sense for 'search'") + + # open index/channel + index, channel = _open_index_and_channel(options, channel_name, 'r') + + # ...for each date + for date_str in dates : + # parse date + date = _parse_date(options, date_str) + + # list + lines = index.list(channel, date) + + # display + _output_lines(options, lines) + +def _autoload_reset (options, channels) : + """ + Reset old autoload state + """ + + # warn + if not options.quiet : + print "[WARN] Resetting autoload state for: %s" % ', '.join(channel.id for channel in channels) + + # iter + for channel in channels : + # statefile path + statefile_path = os.path.join(options.autoload_state_path, 'chan-%s' % channel.id) + + # is it present? + if not os.path.exists(statefile_path) : + if not options.quiet : + print "[WARN] No statefile found at %s" % statefile_path + + else : + if not options.quiet : + print "\t%s: " % channel.id, + + # remove the statefile + os.remove(statefile_path) + + if not options.quiet : + print "OK" + +def cmd_autoload (options, *channel_names) : + """ + Automatically loads all channel logs that have not been indexed yet (by logfile mtime) + """ + + # open index, nonblocking + index = _open_index(options, 'c?' if options.create else 'a?') + + # default to all channels + if not channel_names : + channels = config.LOG_CHANNELS + + else : + channels = [config.LOG_CHANNELS.lookup(channel_name) for channel_name in channel_names] + + # reset autoload state? + if options.reset : + _autoload_reset(options, channels) + if not options.quiet : + print + + # iterate channels + for channel in channels : + if not options.quiet : + print "Channel %s:" % channel.id + + # no 'from' by default + after = None + + # path to our state file + statefile_path = os.path.join(options.autoload_state_path, 'chan-%s' % channel.id) + statefile_tmppath = statefile_path + '.tmp' + + # does it exist? + have_tmpfile = os.path.exists(statefile_tmppath) + + # do we have a tempfile from a previous crash? + if have_tmpfile and not options.ignore_resume : + # first, open it... + statefile_tmp = open(statefile_tmppath, 'r+') + + # ... then lock it + fcntl.lockf(statefile_tmp, fcntl.LOCK_EX | fcntl.LOCK_NB) + + # read after timestamp + after_str = statefile_tmp.read().rstrip() + + if after_str : + # parse timestamp + after = utils.from_utc_timestamp(int(after_str)) + + if not options.quiet : + print "\tContinuing earlier progress from %s" % after + + else : + # ignore + if not options.quiet : + print "\t[WARN] Ignoring empty temporary statefile" + + else : + # warn about old tmpfile that was ignored + if have_tmpfile and not options.quiet : + print "\t[WARN] Ignoring old tmpfile state" + + # open new tempfile + statefile_tmp = open(statefile_tmppath, 'w') + + # lock + fcntl.lockf(statefile_tmp, fcntl.LOCK_EX | fcntl.LOCK_NB) + + # override? + if options.reload : + # load all + mtime = None + + if not options.quiet : + print "\tForcing reload!" + + # stat for mtime + else : + # stat for mtime, None if unknown + mtime = utils.mtime(statefile_path, ignore_missing=True) + + if mtime and not options.quiet : + print "\tLast load time was %s" % mtime + + elif not options.quiet : + print "\t[WARN] No previous load state! Loading full logs" + + # only after some specific date? + if options.after : + # use unless read from tempfile + if not after : + after = options.after + + if not options.quiet : + print "\tOnly including dates from %s onwards" % after + + else : + if not options.quiet : + print "\t[WARN] Ignoring --from because we found a tempfile" + + # only up to some specific date? + if options.until : + until = options.until + + if not options.quiet : + print "\tOnly including dates up to (and including) %s" % until + else : + # default to now + until = None + + # get lines + lines = channel.source.get_modified(mtime, after, until) + + # insert + if not options.quiet : + print "\tLoading and inserting..." + print + + # iterate insert() per day to display info and update progress + for date, count in _iter_insert_stats(index, channel, lines) : + # output date header? + if not options.quiet : + print "\t%10s: %d" % (date.strftime('%Y-%m-%d'), count) + + # write temp state + statefile_tmp.seek(0) + statefile_tmp.write(str(utils.to_utc_timestamp(datetime.datetime.combine(date, datetime.time(0))))) + statefile_tmp.flush() + + # write autoload state + open(statefile_path, 'w').close() + + # close+delete tempfile + statefile_tmp.close() + os.remove(statefile_tmppath) + + if not options.quiet : + print + + # done + return + +def cmd_help (options, *args) : + """ + Help about commands + """ + + import inspect + + # general help stuff + options._parser.print_help() + + # specific command? + if args : + # the command name + command, = args + + # XXX: display info about specific command + xxx + + # general + else : + print + print "Available commands:" + + # build list of all cmd_* objects + cmd_objects = [(name, obj) for name, obj in globals().iteritems() if name.startswith('cmd_') and inspect.isfunction(obj)] + + # sort alphabetically + cmd_objects.sort() + + # iterate through all cmd_* objects + for cmd_func_name, cmd_func in cmd_objects : + # remove cmd_ prefix + cmd_name = cmd_func_name[4:] + + # inspect + cmd_args, cmd_varargs, cmd_varkw, cmd_default = inspect.getargspec(cmd_func) + cmd_doc = inspect.getdoc(cmd_func) + + # remove the "options" arg + cmd_args = cmd_args[1:] + + # display + print "\t%10s %-30s : %s" % (cmd_name, inspect.formatargspec(cmd_args, cmd_varargs, None, cmd_default), cmd_doc) + +class MyOption (optparse.Option) : + """ + Our custom types for optparse + """ + + def check_date (option, opt, value) : + """ + Parse a date + """ + + try : + # parse + return datetime.datetime.strptime(value, '%Y-%m-%d') + + # trap -> OptionValueError + except Exception, e : + raise optparse.OptionValueError("option %s: invalid date value: %r" % (opt, value)) + + def check_timezone (option, opt, value) : + """ + Parse a timezone + """ + + try : + # parse + return pytz.timezone(value) + + # trap -> OptionValueError + except Exception, e : + raise optparse.OptionValueError("option %s: invalid timezone: %r" % (opt, value)) + + def take_action (self, action, dest, opt, value, values, parser) : + """ + Override take_action to handle date + """ + + if action == "parse_date" : + # get timezone + tz = values.timezone + + # set timezone + value = value.replace(tzinfo=tz) + + # store + return optparse.Option.take_action(self, 'store', dest, opt, value, values, parser) + + else : + # default + return optparse.Option.take_action(self, action, dest, opt, value, values, parser) + + TYPES = optparse.Option.TYPES + ('date', 'timezone') + TYPE_CHECKER = optparse.Option.TYPE_CHECKER.copy() + TYPE_CHECKER['date'] = check_date + TYPE_CHECKER['timezone'] = check_timezone + ACTIONS = optparse.Option.ACTIONS + ('parse_date', ) + STORE_ACTIONS = optparse.Option.STORE_ACTIONS + ('parse_date', ) + TYPED_ACTIONS = optparse.Option.TYPED_ACTIONS + ('parse_date', ) + ACTIONS = optparse.Option.ACTIONS + ('parse_date', ) + +def main (argv) : + """ + Command-line main, with given argv + """ + + # define parser + parser = optparse.OptionParser( + usage = "%prog [options] [ ... ]", + add_help_option = False, + option_class = MyOption, + ) + + # general options # # # # + general = optparse.OptionGroup(parser, "General Options") + general.add_option('-h', "--help", dest="help", help="Show this help message and exit", + action="store_true" ) + + general.add_option( "--formatter", dest="formatter_name", help="LogFormatter to use", + metavar="FMT", type="choice", default=config.PREF_FORMATTER_DEFAULT.name, + choices=[fmt_name for fmt_name in config.LOG_FORMATTERS.iterkeys()] ) + + general.add_option( "--index", dest="index_path", help="Index database path", + metavar="PATH", default=config.SEARCH_INDEX_PATH ) + + general.add_option( "--timezone", dest="timezone", help="Timezone for output", + metavar="TZ", type="timezone", default=pytz.utc ) + + general.add_option( "--force", dest="force", help="Force dangerous operation", + action="store_true" ) + + general.add_option( "--quiet", dest="quiet", help="Supress status messages", + action="store_true" ) + parser.add_option_group(general) + + + # cmd_load options # # # # + load = optparse.OptionGroup(parser, "Load Options") + load.add_option( "--skip-missing", dest="skip_missing", help="Skip missing logfiles", + action="store_true" ) + + load.add_option( "--create", dest="create", help="Create index database", + action="store_true" ) + parser.add_option_group(load) + + + # cmd_autoload options # # # # + autoload = optparse.OptionGroup(parser, "Autoload Options") + autoload.add_option( "--autoload-state", dest="autoload_state_path", help="Path to autoload state dir", + metavar="PATH", default=config.SEARCH_AUTOINDEX_PATH) + + autoload.add_option( "--from", dest="after", help="Only autoload logfiles from the given date on", + metavar="DATE", type="date", action="parse_date", default=None ) + + autoload.add_option( "--until", dest="until", help="Only autoload logfiles up to (and including) the given date", + metavar="DATE", type="date", action="parse_date", default=None ) + + autoload.add_option( "--reload", dest="reload", help="Force reload lines", + action="store_true" ) + + autoload.add_option( "--reset", dest="reset", help="Reset old autload state", + action="store_true" ) + + autoload.add_option( "--ignore-resume", dest="ignore_resume", help="Do not try and resume interrupted autoload", + action="store_true" ) + parser.add_option_group(autoload) + + # parse + options, args = parser.parse_args(argv[1:]) + + # postprocess stuff + options._parser = parser + options.formatter = config.LOG_FORMATTERS[options.formatter_name](options.timezone, "%H:%M:%S", None, None) + + # special-case --help + if options.help : + return cmd_help(options, *args) + + # must have at least the command argument + if not args : + raise CommandError("Missing command") + + # pop command + command = args.pop(0) + + # get func + func = globals().get('cmd_%s' % command) + + # unknown command? + if not func : + raise CommandError("Unknown command: %s" % command) + + # call + func(options, *args) + +if __name__ == '__main__' : + try : + main(sys.argv) + sys.exit(0) + + except CommandError, e : + print e + sys.exit(1) +