|
1 """ |
|
2 Full-text searching of logs |
|
3 """ |
|
4 |
|
5 import datetime, calendar, pytz |
|
6 import os.path |
|
7 |
|
8 import HyperEstraier as hype |
|
9 |
|
10 import log_line, utils, config |
|
11 |
|
12 class LogSearchError (Exception) : |
|
13 """ |
|
14 General search error |
|
15 """ |
|
16 |
|
17 pass |
|
18 |
|
19 class SearchIndexError (LogSearchError) : |
|
20 """ |
|
21 Error manipulating the index |
|
22 """ |
|
23 |
|
24 def __init__ (self, msg, db) : |
|
25 """ |
|
26 Build the error from the given message + HyperEstraier.Database |
|
27 """ |
|
28 |
|
29 super(SearchIndexError, self).__init__("%s: %s" % (msg, db.err_msg(db.error()))) |
|
30 |
|
31 class NoResultsFound (LogSearchError) : |
|
32 """ |
|
33 No results found |
|
34 """ |
|
35 |
|
36 pass |
|
37 |
|
38 class LogSearchIndex (object) : |
|
39 """ |
|
40 An index on the logs for a group of channels. |
|
41 |
|
42 This uses Hyper Estraier to handle searching, whereby each log line is a document (yes, I have a powerful server). |
|
43 |
|
44 These log documents have the following attributes: |
|
45 @uri - channel/date/line |
|
46 channel - channel code |
|
47 type - the LogType id |
|
48 timestamp - UTC timestamp |
|
49 source_nickname - source nickname |
|
50 source_username - source username |
|
51 source_hostname - source hostname |
|
52 source_chanflags - source channel flags |
|
53 target_nickname - target nickname |
|
54 |
|
55 Each document then has a single line of data, which is the log data message |
|
56 """ |
|
57 |
|
58 def __init__ (self, channels, path, mode='r') : |
|
59 """ |
|
60 Open the database at the given path, with the given mode: |
|
61 First char: |
|
62 r - read, error if not exists |
|
63 w - write, create if not exists |
|
64 a - write, error if not exists |
|
65 c - create, error if exists |
|
66 |
|
67 Additional chars: |
|
68 trunc - truncate if exists |
|
69 + - read as well as write |
|
70 ? - non-blocking lock open, i.e. it fails if already open |
|
71 |
|
72 Channels is the ChannelList. |
|
73 """ |
|
74 |
|
75 # store |
|
76 self.channels = channels |
|
77 self.path = path |
|
78 self.mode = mode |
|
79 |
|
80 # check it does not already exist? |
|
81 if mode in 'c' and os.path.exists(path) : |
|
82 raise LogSearchError("Index already exists: %s" % (path, )) |
|
83 |
|
84 # mapping of { mode -> flags } |
|
85 mode_to_flag = { |
|
86 'r': hype.Database.DBREADER, |
|
87 'w': hype.Database.DBWRITER | hype.Database.DBCREAT, |
|
88 'a': hype.Database.DBWRITER, |
|
89 'c': hype.Database.DBWRITER | hype.Database.DBCREAT, |
|
90 } |
|
91 |
|
92 # flags to use, standard modes |
|
93 flags = mode_to_flag[mode[0]] |
|
94 |
|
95 # mode-flags |
|
96 if '?' in mode : |
|
97 # non-blocking locking |
|
98 flags |= hype.Database.DBLCKNB |
|
99 |
|
100 elif '+' in mode : |
|
101 # read |
|
102 flags |= hype.Database.DBREADER |
|
103 |
|
104 elif 'trunc' in mode : |
|
105 # truncate. Dangerous! |
|
106 flags |= hype.Database.DBTRUNC |
|
107 |
|
108 # make instance |
|
109 self.db = hype.Database() |
|
110 |
|
111 # open |
|
112 if not self.db.open(path, flags) : |
|
113 raise SearchIndexError("Index open failed: %s, mode=%s, flags=%#06x" % (path, mode, flags), self.db) |
|
114 |
|
115 def close (self) : |
|
116 """ |
|
117 Explicitly close the index, this is done automatically on del |
|
118 """ |
|
119 |
|
120 if not self.db.close() : |
|
121 raise SearchIndexError("Index close failed", self.db) |
|
122 |
|
123 def insert (self, channel, lines) : |
|
124 """ |
|
125 Adds a sequence of LogLines from the given LogChannel to the index, and return the number of added items |
|
126 """ |
|
127 |
|
128 # count from zero |
|
129 count = 0 |
|
130 |
|
131 # iterate |
|
132 for line in lines : |
|
133 # insert |
|
134 self.insert_line(channel, line) |
|
135 |
|
136 # count |
|
137 count += 1 |
|
138 |
|
139 # return |
|
140 return count |
|
141 |
|
142 def insert_line (self, channel, line) : |
|
143 """ |
|
144 Adds a single LogLine for the given LogChannel to the index |
|
145 """ |
|
146 |
|
147 # validate the LogChannel |
|
148 assert channel.id |
|
149 |
|
150 # validate the LogLine |
|
151 assert line.offset |
|
152 assert line.timestamp |
|
153 |
|
154 # create new document |
|
155 doc = hype.Document() |
|
156 |
|
157 # line date |
|
158 date = line.timestamp.date() |
|
159 |
|
160 # ensure that it's not 1900 |
|
161 assert date.year != 1900 |
|
162 |
|
163 # add URI |
|
164 doc.add_attr('@uri', "%s/%s/%d" % (channel.id, date.strftime('%Y-%m-%d'), line.offset)) |
|
165 |
|
166 # add channel id |
|
167 doc.add_attr('channel', channel.id) |
|
168 |
|
169 # add type |
|
170 doc.add_attr('type', str(line.type)) |
|
171 |
|
172 # add UTC timestamp |
|
173 doc.add_attr('timestamp', str(utils.to_utc_timestamp(line.timestamp))) |
|
174 |
|
175 # add source attribute? |
|
176 if line.source : |
|
177 source_nickname, source_username, source_hostname, source_chanflags = line.source |
|
178 |
|
179 if source_nickname : |
|
180 doc.add_attr('source_nickname', source_nickname.encode('utf8')) |
|
181 |
|
182 if source_username : |
|
183 doc.add_attr('source_username', source_username.encode('utf8')) |
|
184 |
|
185 if source_hostname : |
|
186 doc.add_attr('source_hostname', source_hostname.encode('utf8')) |
|
187 |
|
188 if source_chanflags : |
|
189 doc.add_attr('source_chanflags', source_chanflags.encode('utf8')) |
|
190 |
|
191 # add target attributes? |
|
192 if line.target : |
|
193 target_nickname = line.target |
|
194 |
|
195 if target_nickname : |
|
196 doc.add_attr('target_nickname', target_nickname.encode('utf8')) |
|
197 |
|
198 # add data |
|
199 if line.data : |
|
200 doc.add_text(line.data.encode('utf8')) |
|
201 |
|
202 # put, "clean up dispensable regions of the overwritten document" |
|
203 if not self.db.put_doc(doc, hype.Database.PDCLEAN) : |
|
204 raise SearchIndexError("put_doc", self.db) |
|
205 |
|
206 def search_cond (self, cond) : |
|
207 """ |
|
208 Search using a raw hype.Condition. Raises NoResultsFound if there aren't any results |
|
209 """ |
|
210 |
|
211 # execute search, unused 'flags' arg stays zero |
|
212 results = self.db.search(cond, 0) |
|
213 |
|
214 # no results? |
|
215 if not results : |
|
216 raise NoResultsFound() |
|
217 |
|
218 # iterate over the document IDs |
|
219 for doc_id in results : |
|
220 # load document, this throws an exception... |
|
221 # option constants are hype.Database.GDNOATTR/GDNOTEXT |
|
222 doc = self.db.get_doc(doc_id, 0) |
|
223 |
|
224 # load the attributes/text |
|
225 channel = self.channels.lookup(doc.attr('channel')) |
|
226 type = int(doc.attr('type')) |
|
227 timestamp = utils.from_utc_timestamp(int(doc.attr('timestamp'))) |
|
228 |
|
229 # source |
|
230 source = (doc.attr('source_nickname'), doc.attr('source_username'), doc.attr('source_hostname'), doc.attr('source_chanflags')) |
|
231 |
|
232 # target |
|
233 target = doc.attr('target_nickname') |
|
234 |
|
235 # message text |
|
236 message = doc.cat_texts().decode('utf8') |
|
237 |
|
238 # build+yield to as LogLine |
|
239 yield log_line.LogLine(channel, None, type, timestamp, source, target, message) |
|
240 |
|
241 def search (self, options=None, channel=None, attrs=None, phrase=None, order=None, max=None, skip=None) : |
|
242 """ |
|
243 Search with flexible parameters |
|
244 |
|
245 options - bitmask of hype.Condition.* |
|
246 channel - LogChannel object |
|
247 attrs - raw attribute expressions |
|
248 phrase - the search query phrase |
|
249 order - order attribute expression |
|
250 max - number of results to return |
|
251 skip - number of results to skip |
|
252 """ |
|
253 |
|
254 # build condition |
|
255 cond = hype.Condition() |
|
256 |
|
257 if options : |
|
258 # set options |
|
259 cond.set_options(options) |
|
260 |
|
261 if channel : |
|
262 # add channel attribute |
|
263 cond.add_attr(("channel STREQ %s" % channel.id).encode('utf8')) |
|
264 |
|
265 if attrs : |
|
266 # add attributes |
|
267 for attr in attrs : |
|
268 cond.add_attr(attr.encode('utf8')) |
|
269 |
|
270 if phrase : |
|
271 # add phrase |
|
272 cond.set_phrase(phrase.encode('utf8')) |
|
273 |
|
274 if order : |
|
275 # set order |
|
276 cond.set_order(order) |
|
277 |
|
278 if max : |
|
279 # set max |
|
280 cond.set_max(max) |
|
281 |
|
282 if skip : |
|
283 # set skip |
|
284 cond.set_skip(skip) |
|
285 |
|
286 # execute |
|
287 return self.search_cond(cond) |
|
288 |
|
289 def search_simple (self, channel, query, count=None, offset=None, search_msg=True, search_nick=False) : |
|
290 """ |
|
291 Search for lines from the given channel for the given simple query. |
|
292 |
|
293 The search_* params define which attributes to search for (using fulltext search for the message, STROR for |
|
294 attributes). |
|
295 """ |
|
296 |
|
297 # search attributes |
|
298 attrs = [] |
|
299 |
|
300 # nickname target query |
|
301 if search_nick : |
|
302 attrs.append("source_nickname STRINC %s" % query) |
|
303 # attrs.append("target_nickname STRINC %s" % query) |
|
304 |
|
305 # use search(), backwards |
|
306 results = list(self.search( |
|
307 # simplified phrase |
|
308 options = hype.Condition.SIMPLE, |
|
309 |
|
310 # specific channel |
|
311 channel = channel, |
|
312 |
|
313 # given phrase |
|
314 phrase = query if search_msg else None, |
|
315 |
|
316 # attributes defined above |
|
317 attrs = attrs, |
|
318 |
|
319 # order by timestamp, descending (backwards) |
|
320 order = "timestamp NUMD", |
|
321 |
|
322 # count/offset |
|
323 max = count, |
|
324 skip = offset, |
|
325 )) |
|
326 |
|
327 # reverse |
|
328 return reversed(results) |
|
329 |
|
330 def list (self, channel, date, count=None, skip=None) : |
|
331 """ |
|
332 List all indexed log items for the given UTC date |
|
333 """ |
|
334 |
|
335 # start/end dates |
|
336 dt_start = datetime.datetime(date.year, date.month, date.day, 0, 0, 0, 0) |
|
337 dt_end = datetime.datetime(date.year, date.month, date.day, 23, 23, 59, 999999) |
|
338 |
|
339 # search |
|
340 return self.search( |
|
341 # specific channel |
|
342 channel = channel, |
|
343 |
|
344 # specific date range |
|
345 attrs = [ |
|
346 "timestamp NUMBT %d %d" % (utils.to_utc_timestamp(dt_start), utils.to_utc_timestamp(dt_end)) |
|
347 ], |
|
348 |
|
349 # order correctly |
|
350 order = "timestamp NUMA", |
|
351 |
|
352 # max count/offset |
|
353 max = count, |
|
354 skip = skip |
|
355 ) |
|
356 |
|
357 def get_index () : |
|
358 """ |
|
359 Returns the default read-only index, suitable for searching |
|
360 """ |
|
361 |
|
362 # XXX: no caching, just open it every time |
|
363 _index = LogSearchIndex(config.LOG_CHANNELS, config.SEARCH_INDEX_PATH, 'r') |
|
364 |
|
365 # return |
|
366 return _index |
|
367 |