log_search.py
changeset 64 cdb6403c2498
child 65 8b50694f841e
equal deleted inserted replaced
63:416560b82116 64:cdb6403c2498
       
     1 """
       
     2     Full-text searching of logs
       
     3 """
       
     4 
       
     5 import datetime, calendar, pytz
       
     6 
       
     7 import HyperEstraier as hype
       
     8 
       
     9 import log_line
       
    10 
       
    11 class LogSearchIndex (object) :
       
    12     """
       
    13         An index on the logs for a group of channels.
       
    14 
       
    15         This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
       
    16 
       
    17         These log documents have the following attributes:
       
    18             @uri        - channel/date/line
       
    19             @channel    - channel id
       
    20             @type       - the LogType id
       
    21             @timestamp  - UTC timestamp
       
    22             @source     - nickname
       
    23 
       
    24         Each document then has a single line of data, which is the log message itself
       
    25     """
       
    26 
       
    27     def __init__ (self, path, mode='r') :
       
    28         """
       
    29             Open the database, with the given mode:
       
    30                 r       - read-only
       
    31                 w       - read-write, create if not exists
       
    32                 a       - read-write, do not create
       
    33                 *       - read-write, truncate and create new
       
    34         """
       
    35         
       
    36         # mapping of { mode -> flags }
       
    37         mode_to_flag = {
       
    38             'r':    hype.Database.DBREADER,
       
    39             'w':    hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT,
       
    40             'a':    hype.Database.DBREADER | hype.Database.DBWRITER,
       
    41             '*':    hype.Database.DBREADER | hype.Database.DBWRITER | hype.Database.DBCREAT | hype.Database.DBTRUNC,
       
    42         }
       
    43 
       
    44         # look up flags
       
    45         flags = mode_to_flag[mode]
       
    46         
       
    47         # make instance
       
    48         self.db = hype.Database()
       
    49         
       
    50         # open
       
    51         if not self.db.open(path, flags) :
       
    52             raise Exception("Index open failed: %s" % (path, ))
       
    53 
       
    54     def insert (self, channel, lines) :
       
    55         """
       
    56             Adds a sequence of LogLines from the given LogChannel to the index
       
    57         """
       
    58         
       
    59         # validate the LogChannel
       
    60         assert channel.name
       
    61         
       
    62         # iterate
       
    63         for line in lines :
       
    64             # validate the LogLine
       
    65             assert line.offset
       
    66             assert line.timestamp
       
    67 
       
    68             # create new document
       
    69             doc = hype.Document()
       
    70 
       
    71             # line date
       
    72             date = line.timestamp.date()
       
    73 
       
    74             # convert to UTC timestamp
       
    75             utc_timestamp = calendar.timegm(line.timestamp.utctimetuple())
       
    76 
       
    77             # ensure that it's not 1900
       
    78             assert date.year != 1900
       
    79 
       
    80             # add URI
       
    81             doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
       
    82 
       
    83             # add channel id
       
    84             doc.add_attr('@channel',    channel.id)
       
    85 
       
    86             # add type
       
    87             doc.add_attr('@type',       str(line.type))
       
    88 
       
    89             # add UTC timestamp
       
    90             doc.add_attr('@timestamp',  str(utc_timestamp))
       
    91 
       
    92             # add source attribute?
       
    93             if line.source :
       
    94                 doc.add_attr('@source', str(line.source))
       
    95             
       
    96             # add data text
       
    97             doc.add_text(line.data.encode('utf8'))
       
    98 
       
    99             # put
       
   100             # XXX: what does this flag mean?
       
   101             if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
       
   102                 raise Exeception("Index put_doc failed")
       
   103     
       
   104     def search_cond (self, cond) :
       
   105         """
       
   106             Search using a raw hype.Condition
       
   107         """
       
   108 
       
   109         # execute search, unused 'flags' arg stays zero
       
   110         results = self.db.search(cond, 0)
       
   111 
       
   112         # iterate over the document IDs
       
   113         for doc_id in results :
       
   114             # load document, this throws an exception...
       
   115             # option constants are hype.Database.GDNOATTR/GDNOTEXT
       
   116             doc = self.db.get_doc(doc_id, 0)
       
   117 
       
   118             # load the attributes/text
       
   119             channel_id  = doc.attr('@channel')
       
   120             type        = int(doc.attr('@type'))
       
   121             timestamp   = datetime.datetime.fromtimestamp(int(doc.attr('@timestamp')), pytz.utc)
       
   122             source      = doc.attr('@source')
       
   123             data        = doc.cat_texts()
       
   124 
       
   125             # build+yield to (channel_id, LogLine) tuple
       
   126             yield (channel_id, log_line.LogLine(None, type, timestamp, source, data))
       
   127 
       
   128     def search_simple (self, channel, query) :
       
   129         """
       
   130             Search for lines from the given channel for the given simple query
       
   131         """
       
   132 
       
   133         # build condition
       
   134         cond = hype.Condition()
       
   135 
       
   136         # simplified phrase
       
   137         cond.set_options(hype.Condition.SIMPLE)
       
   138 
       
   139         # add channel attribute
       
   140         cond.add_attr("@channel STREQ %s" % (channel.id, ))
       
   141 
       
   142         # add phrase
       
   143         cond.set_phrase(query)
       
   144 
       
   145         # set order
       
   146         cond.set_order("@timestamp NUMA")
       
   147 
       
   148         # search with cond
       
   149         for channel_id, line in self.search_cond(cond) :
       
   150             assert channel_id == channel.id
       
   151 
       
   152             yield line
       
   153 
       
   154 def cmd_load (options, channel_name, date) :
       
   155     """
       
   156         Loads the logs for a specific channel/date into the index
       
   157     """
       
   158 
       
   159     import channels
       
   160     
       
   161     # open the LogSearchIndex
       
   162     index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')
       
   163 
       
   164     # open the channel
       
   165     channel = channels.channel_list.lookup(channel_name)
       
   166 
       
   167     # parse date
       
   168     date = datetime.datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=channel.source.tz)
       
   169 
       
   170     # load lines for date
       
   171     lines = channel.source.get_date(date)
       
   172 
       
   173     # insert
       
   174     index.insert(channel, lines)
       
   175 
       
   176 def cmd_search (options, channel_name, query) :
       
   177     """
       
   178         Search the index for events on a specific channel with the given query
       
   179     """
       
   180 
       
   181     import channels
       
   182     
       
   183     # open the LogSearchIndex
       
   184     index = LogSearchIndex(options.index_path, '*' if options.create_index else 'a')
       
   185 
       
   186     # open the channel
       
   187     channel = channels.channel_list.lookup(channel_name)
       
   188     
       
   189     # search
       
   190     lines = index.search_simple(channel, query)
       
   191     
       
   192     # display as plaintext
       
   193     for line in options.formatter.format_txt(lines) :
       
   194         print line
       
   195 
       
   196 if __name__ == '__main__' :
       
   197     from optparse import OptionParser
       
   198     import log_formatter
       
   199     
       
   200     # define parser
       
   201     parser = OptionParser(
       
   202         usage           = "%prog [options] <command> [ ... ]",
       
   203         add_help_option = True,
       
   204     )
       
   205 
       
   206     # define command-line arguments
       
   207     parser.add_option("-I", "--index", dest="index_path", help="Index database path", metavar="PATH", default="logs/index")
       
   208     parser.add_option("--create", dest="create_index", help="Create index database", default=False)
       
   209     parser.add_option("-f", "--formatter", dest="formatter_name", help="LogFormatter to use", default="irssi")
       
   210     parser.add_option("-z", "--timezone", dest="tz_name", help="Timezone for output", metavar="TZ", default="UTC")
       
   211 
       
   212     # parse
       
   213     options, args = parser.parse_args()
       
   214 
       
   215     # postprocess stuff
       
   216     options.tz = pytz.timezone(options.tz_name)
       
   217     options.formatter = log_formatter.by_name(options.formatter_name)(options.tz)
       
   218     
       
   219     # pop command
       
   220     command = args.pop(0)
       
   221 
       
   222     # inspect
       
   223     func = globals()['cmd_%s' % command]
       
   224     
       
   225     # call
       
   226     func(options, *args)