log_search.py
changeset 140 6db2527b67cf
parent 139 9c7769850195
child 141 65c98c9e1716
equal deleted inserted replaced
139:9c7769850195 140:6db2527b67cf
     1 """
       
     2     Full-text searching of logs
       
     3 """
       
     4 
       
     5 import datetime, calendar, pytz
       
     6 import os.path
       
     7 
       
     8 import HyperEstraier as hype
       
     9 
       
    10 import log_line, utils, config
       
    11 
       
    12 class LogSearchError (Exception) :
       
    13     """
       
    14         General search error
       
    15     """
       
    16 
       
    17     pass
       
    18 
       
    19 class SearchIndexError (LogSearchError) :
       
    20     """
       
    21         Error manipulating the index
       
    22     """
       
    23 
       
    24     def __init__ (self, msg, db) :
       
    25         """
       
    26             Build the error from the given message + HyperEstraier.Database
       
    27         """
       
    28 
       
    29         super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error())))
       
    30 
       
    31 class NoResultsFound (LogSearchError) :
       
    32     """
       
    33         No results found
       
    34     """
       
    35 
       
    36     pass
       
    37 
       
    38 class LogSearchIndex (object) :
       
    39     """
       
    40         An index on the logs for a group of channels.
       
    41 
       
    42         This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server).
       
    43 
       
    44         These log documents have the following attributes:
       
    45             @uri                - channel/date/line
       
    46             channel             - channel code
       
    47             type                - the LogType id
       
    48             timestamp           - UTC timestamp
       
    49             source_nickname     - source nickname
       
    50             source_username     - source username
       
    51             source_hostname     - source hostname
       
    52             source_chanflags    - source channel flags
       
    53             target_nickname     - target nickname
       
    54 
       
    55         Each document then has a single line of data, which is the log data message
       
    56     """
       
    57 
       
    58     def __init__ (self, channels, path, mode='r') :
       
    59         """
       
    60             Open the database at the given path, with the given mode:
       
    61                 First char:
       
    62                     r       - read, error if not exists
       
    63                     w       - write, create if not exists
       
    64                     a       - write, error if not exists
       
    65                     c       - create, error if exists
       
    66                 
       
    67                 Additional chars:
       
    68                     trunc   - truncate if exists
       
    69                     +       - read as well as write
       
    70                     ?       - non-blocking lock open, i.e. it fails if already open
       
    71             
       
    72             Channels is the ChannelList.
       
    73         """
       
    74 
       
    75         # store
       
    76         self.channels = channels
       
    77         self.path = path
       
    78         self.mode = mode
       
    79 
       
    80         # check it does not already exist?
       
    81         if mode in 'c' and os.path.exists(path) :
       
    82             raise LogSearchError("Index already exists: %s" % (path, ))
       
    83         
       
    84         # mapping of { mode -> flags }
       
    85         mode_to_flag = {
       
    86             'r':    hype.Database.DBREADER,
       
    87             'w':    hype.Database.DBWRITER | hype.Database.DBCREAT,
       
    88             'a':    hype.Database.DBWRITER,
       
    89             'c':    hype.Database.DBWRITER | hype.Database.DBCREAT,
       
    90         }
       
    91 
       
    92         # flags to use, standard modes
       
    93         flags = mode_to_flag[mode[0]]
       
    94  
       
    95         # mode-flags
       
    96         if '?' in mode :
       
    97             # non-blocking locking
       
    98             flags |= hype.Database.DBLCKNB
       
    99         
       
   100         elif '+' in mode :
       
   101             # read
       
   102             flags |= hype.Database.DBREADER
       
   103 
       
   104         elif 'trunc' in mode :
       
   105             # truncate. Dangerous!
       
   106             flags |= hype.Database.DBTRUNC
       
   107        
       
   108         # make instance
       
   109         self.db = hype.Database()
       
   110         
       
   111         # open
       
   112         if not self.db.open(path, flags) :
       
   113             raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db)
       
   114     
       
   115     def close (self) :
       
   116         """
       
   117             Explicitly close the index, this is done automatically on del
       
   118         """
       
   119 
       
   120         if not self.db.close() :
       
   121             raise SearchIndexError("Index close failed", self.db)
       
   122 
       
   123     def insert (self, channel, lines) :
       
   124         """
       
   125             Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items
       
   126         """
       
   127         
       
   128         # count from zero
       
   129         count = 0
       
   130         
       
   131         # iterate
       
   132         for line in lines :
       
   133             # insert
       
   134             self.insert_line(channel, line)
       
   135 
       
   136             # count
       
   137             count += 1
       
   138         
       
   139         # return
       
   140         return count
       
   141 
       
   142     def insert_line (self, channel, line) :
       
   143         """
       
   144             Adds a single LogLine for the given LogChannel to the index
       
   145         """
       
   146 
       
   147         # validate the LogChannel
       
   148         assert channel.id
       
   149 
       
   150         # validate the LogLine
       
   151         assert line.offset
       
   152         assert line.timestamp
       
   153 
       
   154         # create new document
       
   155         doc = hype.Document()
       
   156 
       
   157         # line date
       
   158         date = line.timestamp.date()
       
   159 
       
   160         # ensure that it's not 1900
       
   161         assert date.year != 1900
       
   162 
       
   163         # add URI
       
   164         doc.add_attr('@uri',        "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset))
       
   165 
       
   166         # add channel id
       
   167         doc.add_attr('channel',     channel.id)
       
   168 
       
   169         # add type
       
   170         doc.add_attr('type',        str(line.type))
       
   171 
       
   172         # add UTC timestamp
       
   173         doc.add_attr('timestamp',   str(utils.to_utc_timestamp(line.timestamp)))
       
   174 
       
   175         # add source attribute?
       
   176         if line.source :
       
   177             source_nickname, source_username, source_hostname, source_chanflags = line.source
       
   178 
       
   179             if source_nickname :
       
   180                 doc.add_attr('source_nickname', source_nickname.encode('utf8'))
       
   181             
       
   182             if source_username :
       
   183                 doc.add_attr('source_username', source_username.encode('utf8'))
       
   184 
       
   185             if source_hostname :
       
   186                 doc.add_attr('source_hostname', source_hostname.encode('utf8'))
       
   187 
       
   188             if source_chanflags :
       
   189                 doc.add_attr('source_chanflags', source_chanflags.encode('utf8'))
       
   190         
       
   191         # add target attributes?
       
   192         if line.target :
       
   193             target_nickname = line.target
       
   194 
       
   195             if target_nickname :
       
   196                 doc.add_attr('target_nickname', target_nickname.encode('utf8'))
       
   197 
       
   198         # add data
       
   199         if line.data :
       
   200             doc.add_text(line.data.encode('utf8'))
       
   201 
       
   202         # put, "clean up dispensable regions of the overwritten document"
       
   203         if not self.db.put_doc(doc, hype.Database.PDCLEAN) :
       
   204             raise SearchIndexError("put_doc", self.db)
       
   205             
       
   206     def search_cond (self, cond) :
       
   207         """
       
   208             Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results
       
   209         """
       
   210 
       
   211         # execute search, unused 'flags' arg stays zero
       
   212         results = self.db.search(cond, 0)
       
   213 
       
   214         # no results?
       
   215         if not results :
       
   216             raise NoResultsFound()
       
   217 
       
   218         # iterate over the document IDs
       
   219         for doc_id in results :
       
   220             # load document, this throws an exception...
       
   221             # option constants are hype.Database.GDNOATTR/GDNOTEXT
       
   222             doc = self.db.get_doc(doc_id, 0)
       
   223 
       
   224             # load the attributes/text
       
   225             channel         = self.channels.lookup(doc.attr('channel'))
       
   226             type            = int(doc.attr('type'))
       
   227             timestamp       = utils.from_utc_timestamp(int(doc.attr('timestamp')))
       
   228 
       
   229             # source
       
   230             source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags'))
       
   231 
       
   232             # target
       
   233             target = doc.attr('target_nickname')
       
   234             
       
   235             # message text
       
   236             message         = doc.cat_texts().decode('utf8')
       
   237 
       
   238             # build+yield to as LogLine
       
   239             yield log_line.LogLine(channel, None, type, timestamp, source, target, message)
       
   240     
       
   241     def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) :
       
   242         """
       
   243             Search with flexible parameters
       
   244 
       
   245                 options     - bitmask of hype.Condition.*
       
   246                 channel     - LogChannel object
       
   247                 attrs       - raw attribute expressions
       
   248                 phrase      - the search query phrase
       
   249                 order       - order attribute expression
       
   250                 max         - number of results to return
       
   251                 skip        - number of results to skip
       
   252         """
       
   253 
       
   254         # build condition
       
   255         cond = hype.Condition()
       
   256         
       
   257         if options :
       
   258             # set options
       
   259             cond.set_options(options)
       
   260         
       
   261         if channel :
       
   262             # add channel attribute
       
   263             cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8'))
       
   264         
       
   265         if attrs :
       
   266             # add attributes
       
   267             for attr in attrs :
       
   268                 cond.add_attr(attr.encode('utf8'))
       
   269 
       
   270         if phrase :
       
   271             # add phrase
       
   272             cond.set_phrase(phrase.encode('utf8'))
       
   273         
       
   274         if order :
       
   275             # set order
       
   276             cond.set_order(order)
       
   277         
       
   278         if max :
       
   279             # set max
       
   280             cond.set_max(max)
       
   281 
       
   282         if skip :
       
   283             # set skip
       
   284             cond.set_skip(skip)
       
   285 
       
   286         # execute
       
   287         return self.search_cond(cond)
       
   288 
       
   289     def search_simple (self, channel, query, count=None, offset=None, search_msg=True, search_nick=False) :
       
   290         """
       
   291             Search for lines from the given channel for the given simple query.
       
   292 
       
   293             The search_* params define which attributes to search for (using fulltext search for the message, STROR for
       
   294             attributes).
       
   295         """
       
   296         
       
   297         # search attributes
       
   298         attrs = []
       
   299 
       
   300         # nickname target query
       
   301         if search_nick :
       
   302             attrs.append("source_nickname STRINC %s" % query)
       
   303 #            attrs.append("target_nickname STRINC %s" % query)
       
   304         
       
   305         # use search(), backwards
       
   306         results = list(self.search(
       
   307             # simplified phrase
       
   308             options     = hype.Condition.SIMPLE,
       
   309 
       
   310             # specific channel
       
   311             channel     = channel,
       
   312 
       
   313             # given phrase
       
   314             phrase      = query if search_msg else None,
       
   315 
       
   316             # attributes defined above
       
   317             attrs       = attrs,
       
   318 
       
   319             # order by timestamp, descending (backwards)
       
   320             order       = "timestamp NUMD",
       
   321 
       
   322             # count/offset
       
   323             max         = count,
       
   324             skip        = offset,
       
   325         ))
       
   326         
       
   327         # reverse
       
   328         return reversed(results)
       
   329 
       
   330     def list (self, channel, date, count=None, skip=None) :
       
   331         """
       
   332             List all indexed log items for the given UTC date
       
   333         """
       
   334 
       
   335         # start/end dates
       
   336         dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0)
       
   337         dt_end   = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999)
       
   338         
       
   339         # search
       
   340         return self.search(
       
   341             # specific channel
       
   342             channel     = channel,
       
   343 
       
   344             # specific date range
       
   345             attrs       = [
       
   346                 "timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end))
       
   347             ],
       
   348 
       
   349             # order correctly
       
   350             order       = "timestamp NUMA",
       
   351 
       
   352             # max count/offset
       
   353             max         = count,
       
   354             skip        = skip
       
   355         )
       
   356 
       
   357 def get_index () :
       
   358     """
       
   359         Returns the default read-only index, suitable for searching
       
   360     """
       
   361     
       
   362     # XXX: no caching, just open it every time
       
   363     _index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r')
       
   364 
       
   365     # return
       
   366     return _index
       
   367